broadcast shares in serial
[p2pool.git] / p2pool / main.py
index 24cc047..4073f7f 100644 (file)
@@ -39,13 +39,15 @@ def getwork(bitcoind):
     packed_transactions = [x.decode('hex') for x in work['transactions']]
     defer.returnValue(dict(
         version=work['version'],
-        previous_block_hash=int(work['previousblockhash'], 16),
+        previous_block=int(work['previousblockhash'], 16),
         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
         subsidy=work['coinbasevalue'],
         time=work['time'],
         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
+        clock_offset=time.time() - work['time'],
+        last_update=time.time(),
     ))
 
 @defer.inlineCallbacks
@@ -61,7 +63,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         @deferral.retry('Error while checking Bitcoin connection:', 1)
         @defer.inlineCallbacks
         def check():
-            if not (yield net.PARENT.RPC_CHECK)(bitcoind):
+            if not (yield net.PARENT.RPC_CHECK(bitcoind)):
                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
                 raise deferral.RetrySilentlyException()
             temp_work = yield getwork(bitcoind)
@@ -79,7 +81,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         task.LoopingCall(poll_height).start(60*60)
         
         print '    ...success!'
-        print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
+        print '    Current block hash: %x' % (temp_work['previous_block'],)
         print '    Current block height: %i' % (block_height_var.value,)
         print
         
@@ -131,24 +133,24 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print "Loading shares..."
         for i, (mode, contents) in enumerate(ss.get_shares()):
             if mode == 'share':
-                if contents.hash in tracker.shares:
+                if contents.hash in tracker.items:
                     continue
                 shared_share_hashes.add(contents.hash)
                 contents.time_seen = 0
                 tracker.add(contents)
-                if len(tracker.shares) % 1000 == 0 and tracker.shares:
-                    print "    %i" % (len(tracker.shares),)
+                if len(tracker.items) % 1000 == 0 and tracker.items:
+                    print "    %i" % (len(tracker.items),)
             elif mode == 'verified_hash':
                 known_verified.add(contents)
             else:
                 raise AssertionError()
         print "    ...inserting %i verified shares..." % (len(known_verified),)
         for h in known_verified:
-            if h not in tracker.shares:
+            if h not in tracker.items:
                 ss.forget_verified_share(h)
                 continue
-            tracker.verified.add(tracker.shares[h])
-        print "    ...done loading %i shares!" % (len(tracker.shares),)
+            tracker.verified.add(tracker.items[h])
+        print "    ...done loading %i shares!" % (len(tracker.items),)
         print
         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
@@ -159,31 +161,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         # BITCOIND WORK
         
-        bitcoind_work = variable.Variable(None)
-        
-        @defer.inlineCallbacks
-        def poll_bitcoind():
-            work = yield getwork(bitcoind)
-            bitcoind_work.set(dict(
-                version=work['version'],
-                previous_block=work['previous_block_hash'],
-                bits=work['bits'],
-                coinbaseflags=work['coinbaseflags'],
-                time=work['time'],
-                transactions=work['transactions'],
-                merkle_link=work['merkle_link'],
-                subsidy=work['subsidy'],
-                clock_offset=time.time() - work['time'],
-                last_update=time.time(),
-            ))
-        yield poll_bitcoind()
-        
+        bitcoind_work = variable.Variable((yield getwork(bitcoind)))
         @defer.inlineCallbacks
         def work_poller():
             while True:
                 flag = factory.new_block.get_deferred()
                 try:
-                    yield poll_bitcoind()
+                    bitcoind_work.set((yield getwork(bitcoind)))
                 except:
                     log.err()
                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
@@ -268,7 +252,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 
                 new_count = 0
                 for share in shares:
-                    if share.hash in tracker.shares:
+                    if share.hash in tracker.items:
                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
                         continue
                     
@@ -285,13 +269,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     set_best_share()
                 
                 if len(shares) > 5:
-                    print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
+                    print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
             
             def handle_share_hashes(self, hashes, peer):
                 t = time.time()
                 get_hashes = []
                 for share_hash in hashes:
-                    if share_hash in tracker.shares:
+                    if share_hash in tracker.items:
                         continue
                     last_request_time, count = requested.get(share_hash, (None, 0))
                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
@@ -407,6 +391,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             for peer in p2p_node.peers.itervalues():
                 peer.send_bestblock(header=header)
         
+        @defer.inlineCallbacks
         def broadcast_share(share_hash):
             shares = []
             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
@@ -415,8 +400,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 shared_share_hashes.add(share.hash)
                 shares.append(share)
             
-            for peer in p2p_node.peers.itervalues():
-                peer.sendShares([share for share in shares if share.peer is not peer])
+            for peer in list(p2p_node.peers.itervalues()):
+                yield peer.sendShares([share for share in shares if share.peer is not peer])
         
         # send share when the chain changes to their chain
         best_share_var.changed.watch(broadcast_share)
@@ -424,7 +409,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         def save_shares():
             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
                 ss.add_share(share)
-                if share.hash in tracker.verified.shares:
+                if share.hash in tracker.verified.items:
                     ss.add_verified_hash(share.hash)
         task.LoopingCall(save_shares).start(60)
         
@@ -454,7 +439,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
         
-        wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
+        wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
@@ -537,8 +522,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     height = tracker.get_height(best_share_var.value)
                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
                         height,
-                        len(tracker.verified.shares),
-                        len(tracker.shares),
+                        len(tracker.verified.items),
+                        len(tracker.items),
                         len(p2p_node.peers),
                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
@@ -549,7 +534,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                         math.format(int(my_att_s)),
                         math.format_dt(dt),
                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
-                        math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
+                        math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
                     )
                     
                     if height > 2:
@@ -586,7 +571,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         log.err(None, 'Fatal error:')
 
 def run():
-    realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
+    realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
     
     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
     parser.add_argument('--version', action='version', version=p2pool.__version__)
@@ -665,6 +650,7 @@ def run():
     
     if args.debug:
         p2pool.DEBUG = True
+        defer.setDebugging(True)
     
     net_name = args.net_name + ('_testnet' if args.testnet else '')
     net = networks.nets[net_name]