From: Forrest Voight Date: Fri, 28 Dec 2012 09:32:16 +0000 (-0500) Subject: stratum support X-Git-Tag: 10.0~1 X-Git-Url: https://git.novaco.in/?a=commitdiff_plain;h=22fcee7f4461ee93defd0b5c334ce6b8c607e477;hp=30e7373221a198d7a3bc5464e99c6328a4eeb061;p=p2pool.git stratum support --- diff --git a/p2pool/bitcoin/stratum.py b/p2pool/bitcoin/stratum.py new file mode 100644 index 0000000..6fcc3e7 --- /dev/null +++ b/p2pool/bitcoin/stratum.py @@ -0,0 +1,79 @@ +import random + +from twisted.internet import protocol, reactor + +from p2pool.bitcoin import data as bitcoin_data, getwork +from p2pool.util import expiring_dict, jsonrpc, pack + + +class StratumRPCMiningProvider(object): + def __init__(self, wb, other): + self.wb = wb + self.other = other + + self.username = None + self.handler_map = expiring_dict.ExpiringDict(300) + + self.watch_id = self.wb.new_work_event.watch(self._send_work) + + def rpc_subscribe(self): + return [ + ["mining.notify", "ae6812eb4cd7735a302a8a9dd95cf71f"], # subscription details + "", # extranonce1 + 4, # extranonce2_size + ] + + def rpc_authorize(self, username, password): + self.username = username + + reactor.callLater(0, self._send_work) + + def _send_work(self): + if self.username is None: # authorize hasn't been received yet + return + + x, got_response = self.wb.get_work(*self.wb.get_user_details(self.username)[1:]) + jobid = str(random.randrange(2**128)) + self.other.svc_mining.rpc_set_difficulty(bitcoin_data.target_to_difficulty(x['share_target'])).addErrback(lambda err: None) + self.other.svc_mining.rpc_notify( + jobid, # jobid + getwork._swap4(pack.IntType(256).pack(x['previous_block'])).encode('hex'), # prevhash + x['coinb1'].encode('hex'), # coinb1 + x['coinb2'].encode('hex'), # coinb2 + [pack.IntType(256).pack(s).encode('hex') for s in x['merkle_link']['branch']], # merkle_branch + getwork._swap4(pack.IntType(32).pack(x['version'])).encode('hex'), # version + getwork._swap4(pack.IntType(32).pack(x['bits'].bits)).encode('hex'), # nbits + getwork._swap4(pack.IntType(32).pack(x['timestamp'])).encode('hex'), # ntime + True, # clean_jobs + ).addErrback(lambda err: None) + self.handler_map[jobid] = x, got_response + + def rpc_submit(self, worker_name, job_id, extranonce2, ntime, nonce): + x, got_response = self.handler_map[job_id] + coinb_nonce = pack.IntType(32).unpack(extranonce2.decode('hex')) + new_packed_gentx = x['coinb1'] + pack.IntType(32).pack(coinb_nonce) + x['coinb2'] + header = dict( + version=x['version'], + previous_block=x['previous_block'], + merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(new_packed_gentx), x['merkle_link']), + timestamp=pack.IntType(32).unpack(getwork._swap4(ntime.decode('hex'))), + bits=x['bits'], + nonce=pack.IntType(32).unpack(getwork._swap4(nonce.decode('hex'))), + ) + return got_response(header, worker_name, coinb_nonce) + + def close(self): + self.wb.new_work_event.unwatch(self.watch_id) + +class StratumProtocol(jsonrpc.LineBasedPeer): + def connectionMade(self): + self.svc_mining = StratumRPCMiningProvider(self.factory.wb, self.other) + + def connectionLost(self, reason): + self.svc_mining.close() + +class StratumServerFactory(protocol.ServerFactory): + protocol = StratumProtocol + + def __init__(self, wb): + self.wb = wb diff --git a/p2pool/bitcoin/worker_interface.py b/p2pool/bitcoin/worker_interface.py index 2f49a4e..e75fbac 100644 --- a/p2pool/bitcoin/worker_interface.py +++ b/p2pool/bitcoin/worker_interface.py @@ -8,8 +8,8 @@ import sys from twisted.internet import defer import p2pool -from p2pool.bitcoin import getwork -from p2pool.util import expiring_dict, jsonrpc, variable +from p2pool.bitcoin import data as bitcoin_data, getwork +from p2pool.util import expiring_dict, jsonrpc, pack, variable class _Provider(object): def __init__(self, parent, long_poll): @@ -59,13 +59,15 @@ class WorkerInterface(object): request.setHeader('X-Long-Polling', '/long-polling') request.setHeader('X-Roll-NTime', 'expire=10') request.setHeader('X-Is-P2Pool', 'true') + if request.getHeader('Host') is not None: + request.setHeader('X-Stratum', 'stratum+tcp://' + request.getHeader('Host')) if data is not None: header = getwork.decode_data(data) if header['merkle_root'] not in self.merkle_root_to_handler: print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!''' defer.returnValue(False) - defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request.getUser() if request.getUser() is not None else '')) + defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request.getUser() if request.getUser() is not None else '', 0)) if p2pool.DEBUG: id = random.randrange(1000, 10000) @@ -91,7 +93,15 @@ class WorkerInterface(object): if key in self.work_cache: res, orig_timestamp, handler = self.work_cache.pop(key) else: - res, handler = self.worker_bridge.get_work(*key) + x, handler = self.worker_bridge.get_work(*key) + res = getwork.BlockAttempt( + version=x['version'], + previous_block=x['previous_block'], + merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(x['coinb1'] + pack.IntType(32).pack(0) + x['coinb2']), x['merkle_link']), + timestamp=x['timestamp'], + bits=x['bits'], + share_target=x['share_target'], + ) assert res.merkle_root not in self.merkle_root_to_handler orig_timestamp = res.timestamp diff --git a/p2pool/main.py b/p2pool/main.py index 3f0bc0c..ee94079 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -19,8 +19,8 @@ from twisted.python import log from nattraverso import portmapper, ipdiscover import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data -from bitcoin import worker_interface, helper -from util import fixargparse, jsonrpc, variable, deferral, math, logging +from bitcoin import stratum, worker_interface, helper +from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol from . import networks, web, work import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node @@ -214,8 +214,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee) web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var) worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/')) + web_serverfactory = server.Site(web_root) - deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0]) + + serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(wb)}, web_serverfactory) + deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0]) with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f: pass diff --git a/p2pool/util/jsonrpc.py b/p2pool/util/jsonrpc.py index c124d03..d810ada 100644 --- a/p2pool/util/jsonrpc.py +++ b/p2pool/util/jsonrpc.py @@ -4,10 +4,11 @@ import json import weakref from twisted.internet import defer +from twisted.protocols import basic from twisted.python import failure, log from twisted.web import client, error -from p2pool.util import deferred_resource, memoize +from p2pool.util import deferral, deferred_resource, memoize class Error(Exception): def __init__(self, code, message, data=None): @@ -145,3 +146,19 @@ class HTTPServer(deferred_resource.DeferredResource): request.setHeader('Content-Type', 'application/json') request.setHeader('Content-Length', len(data)) request.write(data) + +class LineBasedPeer(basic.LineOnlyReceiver): + delimiter = '\n' + + def __init__(self): + #basic.LineOnlyReceiver.__init__(self) + self._matcher = deferral.GenericDeferrer(max_id=2**30, func=lambda id, method, params: self.sendLine(json.dumps({ + 'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': id, + }))) + self.other = Proxy(self._matcher) + + def lineReceived(self, line): + _handle(line, self, response_handler=self._matcher.got_response).addCallback(lambda line2: self.sendLine(line2) if line2 is not None else None) diff --git a/p2pool/util/switchprotocol.py b/p2pool/util/switchprotocol.py new file mode 100644 index 0000000..29d05e6 --- /dev/null +++ b/p2pool/util/switchprotocol.py @@ -0,0 +1,29 @@ +from twisted.internet import protocol + +class FirstByteSwitchProtocol(protocol.Protocol): + p = None + def dataReceived(self, data): + if self.p is None: + if not data: return + serverfactory = self.factory.first_byte_to_serverfactory.get(data[0], self.factory.default_serverfactory) + self.p = serverfactory.buildProtocol(self.transport.getPeer()) + self.p.makeConnection(self.transport) + self.p.dataReceived(data) + def connectionLost(self, reason): + if self.p is not None: + self.p.connectionLost(reason) + +class FirstByteSwitchFactory(protocol.ServerFactory): + protocol = FirstByteSwitchProtocol + + def __init__(self, first_byte_to_serverfactory, default_serverfactory): + self.first_byte_to_serverfactory = first_byte_to_serverfactory + self.default_serverfactory = default_serverfactory + + def startFactory(self): + for f in list(self.first_byte_to_serverfactory.values()) + [self.default_serverfactory]: + f.doStart() + + def stopFactory(self): + for f in list(self.first_byte_to_serverfactory.values()) + [self.default_serverfactory]: + f.doStop() diff --git a/p2pool/work.py b/p2pool/work.py index 5f4f2a4..d93a0c0 100644 --- a/p2pool/work.py +++ b/p2pool/work.py @@ -264,10 +264,12 @@ class WorkerBridge(worker_interface.WorkerBridge): len(self.current_work.value['transactions']), ) - ba = bitcoin_getwork.BlockAttempt( + ba = dict( version=min(self.current_work.value['version'], 2), previous_block=self.current_work.value['previous_block'], - merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_gentx), merkle_link), + merkle_link=merkle_link, + coinb1=packed_gentx[:-4-4], + coinb2=packed_gentx[-4:], timestamp=self.current_work.value['time'], bits=self.current_work.value['bits'], share_target=target, @@ -275,12 +277,15 @@ class WorkerBridge(worker_interface.WorkerBridge): received_header_hashes = set() - def got_response(header, user): + def got_response(header, user, last_txout_nonce): + new_packed_gentx = packed_gentx[:-4-4] + pack.IntType(32).pack(last_txout_nonce) + packed_gentx[-4:] if last_txout_nonce != 0 else packed_gentx + new_gentx = bitcoin_data.tx_type.unpack(new_packed_gentx) if last_txout_nonce != 0 else gentx + header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) pow_hash = self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) try: if pow_hash <= header['bits'].target or p2pool.DEBUG: - helper.submit_block(dict(header=header, txs=[gentx] + other_transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net) + helper.submit_block(dict(header=header, txs=[new_gentx] + other_transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net) if pow_hash <= header['bits'].target: print print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash) @@ -289,9 +294,9 @@ class WorkerBridge(worker_interface.WorkerBridge): log.err(None, 'Error while processing potential block:') user, _, _, _ = self.get_user_details(user) - assert header['previous_block'] == ba.previous_block - assert header['merkle_root'] == ba.merkle_root - assert header['bits'] == ba.bits + assert header['previous_block'] == ba['previous_block'] + assert header['merkle_root'] == bitcoin_data.check_merkle_link(bitcoin_data.hash256(new_packed_gentx), merkle_link) + assert header['bits'] == ba['bits'] on_time = self.new_work_event.times == lp_count @@ -302,7 +307,7 @@ class WorkerBridge(worker_interface.WorkerBridge): pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'), bitcoin_data.aux_pow_type.pack(dict( merkle_tx=dict( - tx=gentx, + tx=new_gentx, block_hash=header_hash, merkle_link=merkle_link, ), @@ -323,7 +328,7 @@ class WorkerBridge(worker_interface.WorkerBridge): log.err(None, 'Error while processing merged mining POW:') if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes: - share = get_share(header) + share = get_share(header, last_txout_nonce) print 'GOT SHARE! %s %s prev %s age %.2fs%s' % ( user,