handle work results by using saved closures instead of large tuples
authorForrest Voight <forrest@forre.st>
Wed, 8 Feb 2012 23:25:57 +0000 (18:25 -0500)
committerForrest Voight <forrest@forre.st>
Wed, 8 Feb 2012 23:25:57 +0000 (18:25 -0500)
p2pool/bitcoin/worker_interface.py
p2pool/main.py

index 96c609a..6a8df25 100644 (file)
@@ -3,12 +3,13 @@ from __future__ import division
 import StringIO
 import json
 import random
+import sys
 
 from twisted.internet import defer
 
 import p2pool
 from p2pool.bitcoin import getwork
-from p2pool.util import jsonrpc, variable
+from p2pool.util import expiring_dict, jsonrpc, variable
 
 class _Page(jsonrpc.Server):
     def __init__(self, parent, long_poll):
@@ -32,9 +33,6 @@ class WorkerBridge(object):
     
     def get_work(self, request):
         raise NotImplementedError()
-    
-    def got_response(self, block_header):
-        print self.got_response, "called with", block_header
 
 class WorkerInterface(object):
     def __init__(self, worker_bridge):
@@ -44,6 +42,8 @@ class WorkerInterface(object):
         
         self.work_cache = {} # request_process_func(request) -> blockattempt
         self.work_cache_times = self.worker_bridge.new_work_event.times
+        
+        self.merkle_root_to_handler = expiring_dict.ExpiringDict(300)
     
     def attach_to(self, res):
         res.putChild('', _Page(self, long_poll=False))
@@ -56,7 +56,11 @@ class WorkerInterface(object):
         request.setHeader('X-Is-P2Pool', 'true')
         
         if data is not None:
-            defer.returnValue(self.worker_bridge.got_response(getwork.decode_data(data), request))
+            header = getwork.decode_data(data)
+            if header['merkle_root'] not in self.merkle_root_to_handler:
+                print >>sys.stderr, '''Couldn't link returned work's merkle root with its handler. This should only happen if this process was recently restarted!'''
+                defer.returnValue(False)
+            defer.returnValue(self.merkle_root_to_handler[header['merkle_root']](header, request))
         
         if p2pool.DEBUG:
             id = random.randrange(1000, 10000)
@@ -82,7 +86,9 @@ class WorkerInterface(object):
         if key in self.work_cache:
             res, orig_timestamp = self.work_cache.pop(key)
         else:
-            res = self.worker_bridge.get_work(*key)
+            res, handler = self.worker_bridge.get_work(*key)
+            assert res.merkle_root not in self.merkle_root_to_handler
+            self.merkle_root_to_handler[res.merkle_root] = handler
             orig_timestamp = res.timestamp
         
         if res.timestamp + 12 < orig_timestamp + 600:
index 5560873..048f1af 100644 (file)
@@ -450,8 +450,6 @@ def main(args, net, datadir_path, merged_urls):
             def __init__(self):
                 worker_interface.WorkerBridge.__init__(self)
                 self.new_work_event = current_work.changed
-                
-                self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
                 self.recent_shares_ts_work = []
             
             def _get_payout_script_from_username(self, user):
@@ -520,7 +518,9 @@ def main(args, net, datadir_path, merged_urls):
                 
                 transactions = [generate_tx] + list(current_work2.value['transactions'])
                 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
-                self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
+                
+                getwork_time = time.time()
+                merkle_branch = current_work2.value['merkle_branch']
                 
                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
                     bitcoin_data.target_to_difficulty(target),
@@ -529,7 +529,7 @@ def main(args, net, datadir_path, merged_urls):
                     len(current_work2.value['transactions']),
                 )
                 
-                return bitcoin_getwork.BlockAttempt(
+                ba = bitcoin_getwork.BlockAttempt(
                     version=current_work.value['version'],
                     previous_block=current_work.value['previous_block'],
                     merkle_root=merkle_root,
@@ -537,102 +537,100 @@ def main(args, net, datadir_path, merged_urls):
                     bits=current_work.value['bits'],
                     share_target=target,
                 )
-            
-            def got_response(self, header, request):
-                # match up with transactions
-                if header['merkle_root'] not in self.merkle_root_to_transactions:
-                    print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
-                    return False
-                share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
-                
-                pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
-                on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
-                
-                try:
-                    if pow_hash <= header['bits'].target or p2pool.DEBUG:
-                        if factory.conn.value is not None:
-                            factory.conn.value.send_block(block=dict(header=header, txs=transactions))
-                        else:
-                            print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
-                        if pow_hash <= header['bits'].target:
-                            print
-                            print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
-                            print
-                            recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
-                except:
-                    log.err(None, 'Error while processing potential block:')
                 
-                for aux_work, index, hashes in mm_later:
-                    try:
-                        if pow_hash <= aux_work['target'] or p2pool.DEBUG:
-                            df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
-                                pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
-                                bitcoin_data.aux_pow_type.pack(dict(
-                                    merkle_tx=dict(
-                                        tx=transactions[0],
-                                        block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
-                                        merkle_branch=merkle_branch,
-                                        index=0,
-                                    ),
-                                    merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
-                                    index=index,
-                                    parent_block_header=header,
-                                )).encode('hex'),
-                            )
-                            @df.addCallback
-                            def _(result):
-                                if result != (pow_hash <= aux_work['target']):
-                                    print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
-                                else:
-                                    print 'Merged block submittal result: %s' % (result,)
-                            @df.addErrback
-                            def _(err):
-                                log.err(err, 'Error submitting merged block:')
-                    except:
-                        log.err(None, 'Error while processing merged mining POW:')
-                
-                if pow_hash <= share_info['bits'].target:
-                    share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
-                    print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
-                        request.getUser(),
-                        p2pool_data.format_hash(share.hash),
-                        p2pool_data.format_hash(share.previous_hash),
-                        time.time() - getwork_time,
-                        ' DEAD ON ARRIVAL' if not on_time else '',
-                    )
-                    my_share_hashes.add(share.hash)
-                    if not on_time:
-                        my_doa_share_hashes.add(share.hash)
+                def got_response(header, request):
+                    assert header['merkle_root'] == merkle_root
                     
-                    tracker.add(share)
-                    if not p2pool.DEBUG:
-                        tracker.verified.add(share)
-                    set_real_work2()
+                    pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
+                    on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
                     
                     try:
                         if pow_hash <= header['bits'].target or p2pool.DEBUG:
-                            for peer in p2p_node.peers.itervalues():
-                                peer.sendShares([share])
-                            shared_share_hashes.add(share.hash)
+                            if factory.conn.value is not None:
+                                factory.conn.value.send_block(block=dict(header=header, txs=transactions))
+                            else:
+                                print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
+                            if pow_hash <= header['bits'].target:
+                                print
+                                print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
+                                print
+                                recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
                     except:
-                        log.err(None, 'Error forwarding block solution:')
-                
-                if pow_hash <= target:
-                    reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
-                    if request.getPassword() == vip_pass:
-                        reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
-                    self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
-                    while len(self.recent_shares_ts_work) > 50:
-                        self.recent_shares_ts_work.pop(0)
-                    recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
-                
-                
-                if pow_hash > target:
-                    print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
-                    print '    Hash:   %56x' % (pow_hash,)
-                    print '    Target: %56x' % (target,)
+                        log.err(None, 'Error while processing potential block:')
+                    
+                    for aux_work, index, hashes in mm_later:
+                        try:
+                            if pow_hash <= aux_work['target'] or p2pool.DEBUG:
+                                df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
+                                    pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
+                                    bitcoin_data.aux_pow_type.pack(dict(
+                                        merkle_tx=dict(
+                                            tx=transactions[0],
+                                            block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
+                                            merkle_branch=merkle_branch,
+                                            index=0,
+                                        ),
+                                        merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
+                                        index=index,
+                                        parent_block_header=header,
+                                    )).encode('hex'),
+                                )
+                                @df.addCallback
+                                def _(result):
+                                    if result != (pow_hash <= aux_work['target']):
+                                        print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
+                                    else:
+                                        print 'Merged block submittal result: %s' % (result,)
+                                @df.addErrback
+                                def _(err):
+                                    log.err(err, 'Error submitting merged block:')
+                        except:
+                            log.err(None, 'Error while processing merged mining POW:')
+                    
+                    if pow_hash <= share_info['bits'].target:
+                        share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
+                        print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
+                            request.getUser(),
+                            p2pool_data.format_hash(share.hash),
+                            p2pool_data.format_hash(share.previous_hash),
+                            time.time() - getwork_time,
+                            ' DEAD ON ARRIVAL' if not on_time else '',
+                        )
+                        my_share_hashes.add(share.hash)
+                        if not on_time:
+                            my_doa_share_hashes.add(share.hash)
+                        
+                        tracker.add(share)
+                        if not p2pool.DEBUG:
+                            tracker.verified.add(share)
+                        set_real_work2()
+                        
+                        try:
+                            if pow_hash <= header['bits'].target or p2pool.DEBUG:
+                                for peer in p2p_node.peers.itervalues():
+                                    peer.sendShares([share])
+                                shared_share_hashes.add(share.hash)
+                        except:
+                            log.err(None, 'Error forwarding block solution:')
+                    
+                    if pow_hash <= target:
+                        reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
+                        if request.getPassword() == vip_pass:
+                            reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
+                        self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
+                        while len(self.recent_shares_ts_work) > 50:
+                            self.recent_shares_ts_work.pop(0)
+                        recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
+                    
+                    
+                    if pow_hash > target:
+                        print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
+                        print '    Hash:   %56x' % (pow_hash,)
+                        print '    Target: %56x' % (target,)
+                    
+                    return on_time
                 
-                return on_time
+                return ba, got_response
         
         web_root = resource.Resource()
         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)