stratum support
authorForrest Voight <forrest.voight@gmail.com>
Fri, 28 Dec 2012 09:32:16 +0000 (04:32 -0500)
committerForrest Voight <forrest.voight@gmail.com>
Sat, 29 Dec 2012 22:39:17 +0000 (17:39 -0500)
p2pool/bitcoin/stratum.py [new file with mode: 0644]
p2pool/bitcoin/worker_interface.py
p2pool/main.py
p2pool/util/jsonrpc.py
p2pool/util/switchprotocol.py [new file with mode: 0644]
p2pool/work.py

diff --git a/p2pool/bitcoin/stratum.py b/p2pool/bitcoin/stratum.py
new file mode 100644 (file)
index 0000000..6fcc3e7
--- /dev/null
@@ -0,0 +1,79 @@
+import random
+
+from twisted.internet import protocol, reactor
+
+from p2pool.bitcoin import data as bitcoin_data, getwork
+from p2pool.util import expiring_dict, jsonrpc, pack
+
+
+class StratumRPCMiningProvider(object):
+    def __init__(self, wb, other):
+        self.wb = wb
+        self.other = other
+        
+        self.username = None
+        self.handler_map = expiring_dict.ExpiringDict(300)
+        
+        self.watch_id = self.wb.new_work_event.watch(self._send_work)
+    
+    def rpc_subscribe(self):
+        return [
+            ["mining.notify", "ae6812eb4cd7735a302a8a9dd95cf71f"], # subscription details
+            "", # extranonce1
+            4, # extranonce2_size
+        ]
+    
+    def rpc_authorize(self, username, password):
+        self.username = username
+        
+        reactor.callLater(0, self._send_work)
+    
+    def _send_work(self):
+        if self.username is None: # authorize hasn't been received yet
+            return
+        
+        x, got_response = self.wb.get_work(*self.wb.get_user_details(self.username)[1:])
+        jobid = str(random.randrange(2**128))
+        self.other.svc_mining.rpc_set_difficulty(bitcoin_data.target_to_difficulty(x['share_target'])).addErrback(lambda err: None)
+        self.other.svc_mining.rpc_notify(
+            jobid, # jobid
+            getwork._swap4(pack.IntType(256).pack(x['previous_block'])).encode('hex'), # prevhash
+            x['coinb1'].encode('hex'), # coinb1
+            x['coinb2'].encode('hex'), # coinb2
+            [pack.IntType(256).pack(s).encode('hex') for s in x['merkle_link']['branch']], # merkle_branch
+            getwork._swap4(pack.IntType(32).pack(x['version'])).encode('hex'), # version
+            getwork._swap4(pack.IntType(32).pack(x['bits'].bits)).encode('hex'), # nbits
+            getwork._swap4(pack.IntType(32).pack(x['timestamp'])).encode('hex'), # ntime
+            True, # clean_jobs
+        ).addErrback(lambda err: None)
+        self.handler_map[jobid] = x, got_response
+    
+    def rpc_submit(self, worker_name, job_id, extranonce2, ntime, nonce):
+        x, got_response = self.handler_map[job_id]
+        coinb_nonce = pack.IntType(32).unpack(extranonce2.decode('hex'))
+        new_packed_gentx = x['coinb1'] + pack.IntType(32).pack(coinb_nonce) + x['coinb2']
+        header = dict(
+            version=x['version'],
+            previous_block=x['previous_block'],
+            merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(new_packed_gentx), x['merkle_link']),
+            timestamp=pack.IntType(32).unpack(getwork._swap4(ntime.decode('hex'))),
+            bits=x['bits'],
+            nonce=pack.IntType(32).unpack(getwork._swap4(nonce.decode('hex'))),
+        )
+        return got_response(header, worker_name, coinb_nonce)
+    
+    def close(self):
+        self.wb.new_work_event.unwatch(self.watch_id)
+
+class StratumProtocol(jsonrpc.LineBasedPeer):
+    def connectionMade(self):
+        self.svc_mining = StratumRPCMiningProvider(self.factory.wb, self.other)
+    
+    def connectionLost(self, reason):
+        self.svc_mining.close()
+
+class StratumServerFactory(protocol.ServerFactory):
+    protocol = StratumProtocol
+    
+    def __init__(self, wb):
+        self.wb = wb
index 2f49a4e..e75fbac 100644 (file)
@@ -8,8 +8,8 @@ import sys
 from twisted.internet import defer
 
 import p2pool
-from p2pool.bitcoin import getwork
-from p2pool.util import expiring_dict, jsonrpc, variable
+from p2pool.bitcoin import data as bitcoin_data, getwork
+from p2pool.util import expiring_dict, jsonrpc, pack, variable
 
 class _Provider(object):
     def __init__(self, parent, long_poll):
@@ -59,13 +59,15 @@ class WorkerInterface(object):
         request.setHeader('X-Long-Polling', '/long-polling')
         request.setHeader('X-Roll-NTime', 'expire=10')
         request.setHeader('X-Is-P2Pool', 'true')
+        if request.getHeader('Host') is not None:
+            request.setHeader('X-Stratum', 'stratum+tcp://' + request.getHeader('Host'))
         
         if data is not None:
             header = getwork.decode_data(data)
             if header['merkle_root'] not in self.merkle_root_to_handler:
                 print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!'''
                 defer.returnValue(False)
-            defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request.getUser() if request.getUser() is not None else ''))
+            defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request.getUser() if request.getUser() is not None else '', 0))
         
         if p2pool.DEBUG:
             id = random.randrange(1000, 10000)
@@ -91,7 +93,15 @@ class WorkerInterface(object):
         if key in self.work_cache:
             res, orig_timestamp, handler = self.work_cache.pop(key)
         else:
-            res, handler = self.worker_bridge.get_work(*key)
+            x, handler = self.worker_bridge.get_work(*key)
+            res = getwork.BlockAttempt(
+                version=x['version'],
+                previous_block=x['previous_block'],
+                merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(x['coinb1'] + pack.IntType(32).pack(0) + x['coinb2']), x['merkle_link']),
+                timestamp=x['timestamp'],
+                bits=x['bits'],
+                share_target=x['share_target'],
+            )
             assert res.merkle_root not in self.merkle_root_to_handler
             orig_timestamp = res.timestamp
         
index 3f0bc0c..ee94079 100644 (file)
@@ -19,8 +19,8 @@ from twisted.python import log
 from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
-from bitcoin import worker_interface, helper
-from util import fixargparse, jsonrpc, variable, deferral, math, logging
+from bitcoin import stratum, worker_interface, helper
+from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol
 from . import networks, web, work
 import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
 
@@ -214,8 +214,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee)
         web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
+        web_serverfactory = server.Site(web_root)
         
-        deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
+        
+        serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(wb)}, web_serverfactory)
+        deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0])
         
         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
             pass
index c124d03..d810ada 100644 (file)
@@ -4,10 +4,11 @@ import json
 import weakref
 
 from twisted.internet import defer
+from twisted.protocols import basic
 from twisted.python import failure, log
 from twisted.web import client, error
 
-from p2pool.util import deferred_resource, memoize
+from p2pool.util import deferral, deferred_resource, memoize
 
 class Error(Exception):
     def __init__(self, code, message, data=None):
@@ -145,3 +146,19 @@ class HTTPServer(deferred_resource.DeferredResource):
         request.setHeader('Content-Type', 'application/json')
         request.setHeader('Content-Length', len(data))
         request.write(data)
+
+class LineBasedPeer(basic.LineOnlyReceiver):
+    delimiter = '\n'
+    
+    def __init__(self):
+        #basic.LineOnlyReceiver.__init__(self)
+        self._matcher = deferral.GenericDeferrer(max_id=2**30, func=lambda id, method, params: self.sendLine(json.dumps({
+            'jsonrpc': '2.0',
+            'method': method,
+            'params': params,
+            'id': id,
+        })))
+        self.other = Proxy(self._matcher)
+    
+    def lineReceived(self, line):
+        _handle(line, self, response_handler=self._matcher.got_response).addCallback(lambda line2: self.sendLine(line2) if line2 is not None else None)
diff --git a/p2pool/util/switchprotocol.py b/p2pool/util/switchprotocol.py
new file mode 100644 (file)
index 0000000..29d05e6
--- /dev/null
@@ -0,0 +1,29 @@
+from twisted.internet import protocol
+
+class FirstByteSwitchProtocol(protocol.Protocol):
+    p = None
+    def dataReceived(self, data):
+        if self.p is None:
+            if not data: return
+            serverfactory = self.factory.first_byte_to_serverfactory.get(data[0], self.factory.default_serverfactory)
+            self.p = serverfactory.buildProtocol(self.transport.getPeer())
+            self.p.makeConnection(self.transport)
+        self.p.dataReceived(data)
+    def connectionLost(self, reason):
+        if self.p is not None:
+            self.p.connectionLost(reason)
+
+class FirstByteSwitchFactory(protocol.ServerFactory):
+    protocol = FirstByteSwitchProtocol
+    
+    def __init__(self, first_byte_to_serverfactory, default_serverfactory):
+        self.first_byte_to_serverfactory = first_byte_to_serverfactory
+        self.default_serverfactory = default_serverfactory
+    
+    def startFactory(self):
+        for f in list(self.first_byte_to_serverfactory.values()) + [self.default_serverfactory]:
+            f.doStart()
+    
+    def stopFactory(self):
+        for f in list(self.first_byte_to_serverfactory.values()) + [self.default_serverfactory]:
+            f.doStop()
index 5f4f2a4..d93a0c0 100644 (file)
@@ -264,10 +264,12 @@ class WorkerBridge(worker_interface.WorkerBridge):
             len(self.current_work.value['transactions']),
         )
         
-        ba = bitcoin_getwork.BlockAttempt(
+        ba = dict(
             version=min(self.current_work.value['version'], 2),
             previous_block=self.current_work.value['previous_block'],
-            merkle_root=bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_gentx), merkle_link),
+            merkle_link=merkle_link,
+            coinb1=packed_gentx[:-4-4],
+            coinb2=packed_gentx[-4:],
             timestamp=self.current_work.value['time'],
             bits=self.current_work.value['bits'],
             share_target=target,
@@ -275,12 +277,15 @@ class WorkerBridge(worker_interface.WorkerBridge):
         
         received_header_hashes = set()
         
-        def got_response(header, user):
+        def got_response(header, user, last_txout_nonce):
+            new_packed_gentx = packed_gentx[:-4-4] + pack.IntType(32).pack(last_txout_nonce) + packed_gentx[-4:] if last_txout_nonce != 0 else packed_gentx
+            new_gentx = bitcoin_data.tx_type.unpack(new_packed_gentx) if last_txout_nonce != 0 else gentx
+            
             header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
             pow_hash = self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
             try:
                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
-                    helper.submit_block(dict(header=header, txs=[gentx] + other_transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net)
+                    helper.submit_block(dict(header=header, txs=[new_gentx] + other_transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net)
                     if pow_hash <= header['bits'].target:
                         print
                         print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
@@ -289,9 +294,9 @@ class WorkerBridge(worker_interface.WorkerBridge):
                 log.err(None, 'Error while processing potential block:')
             
             user, _, _, _ = self.get_user_details(user)
-            assert header['previous_block'] == ba.previous_block
-            assert header['merkle_root'] == ba.merkle_root
-            assert header['bits'] == ba.bits
+            assert header['previous_block'] == ba['previous_block']
+            assert header['merkle_root'] == bitcoin_data.check_merkle_link(bitcoin_data.hash256(new_packed_gentx), merkle_link)
+            assert header['bits'] == ba['bits']
             
             on_time = self.new_work_event.times == lp_count
             
@@ -302,7 +307,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
                             pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
                             bitcoin_data.aux_pow_type.pack(dict(
                                 merkle_tx=dict(
-                                    tx=gentx,
+                                    tx=new_gentx,
                                     block_hash=header_hash,
                                     merkle_link=merkle_link,
                                 ),
@@ -323,7 +328,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
                     log.err(None, 'Error while processing merged mining POW:')
             
             if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
-                share = get_share(header)
+                share = get_share(header, last_txout_nonce)
                 
                 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
                     user,