broadcast shares in serial
[p2pool.git] / p2pool / main.py
index 828051b..4073f7f 100644 (file)
@@ -133,24 +133,24 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print "Loading shares..."
         for i, (mode, contents) in enumerate(ss.get_shares()):
             if mode == 'share':
-                if contents.hash in tracker.shares:
+                if contents.hash in tracker.items:
                     continue
                 shared_share_hashes.add(contents.hash)
                 contents.time_seen = 0
                 tracker.add(contents)
-                if len(tracker.shares) % 1000 == 0 and tracker.shares:
-                    print "    %i" % (len(tracker.shares),)
+                if len(tracker.items) % 1000 == 0 and tracker.items:
+                    print "    %i" % (len(tracker.items),)
             elif mode == 'verified_hash':
                 known_verified.add(contents)
             else:
                 raise AssertionError()
         print "    ...inserting %i verified shares..." % (len(known_verified),)
         for h in known_verified:
-            if h not in tracker.shares:
+            if h not in tracker.items:
                 ss.forget_verified_share(h)
                 continue
-            tracker.verified.add(tracker.shares[h])
-        print "    ...done loading %i shares!" % (len(tracker.shares),)
+            tracker.verified.add(tracker.items[h])
+        print "    ...done loading %i shares!" % (len(tracker.items),)
         print
         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
@@ -252,7 +252,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 
                 new_count = 0
                 for share in shares:
-                    if share.hash in tracker.shares:
+                    if share.hash in tracker.items:
                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
                         continue
                     
@@ -269,13 +269,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     set_best_share()
                 
                 if len(shares) > 5:
-                    print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
+                    print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
             
             def handle_share_hashes(self, hashes, peer):
                 t = time.time()
                 get_hashes = []
                 for share_hash in hashes:
-                    if share_hash in tracker.shares:
+                    if share_hash in tracker.items:
                         continue
                     last_request_time, count = requested.get(share_hash, (None, 0))
                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
@@ -391,6 +391,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             for peer in p2p_node.peers.itervalues():
                 peer.send_bestblock(header=header)
         
+        @defer.inlineCallbacks
         def broadcast_share(share_hash):
             shares = []
             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
@@ -399,8 +400,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 shared_share_hashes.add(share.hash)
                 shares.append(share)
             
-            for peer in p2p_node.peers.itervalues():
-                peer.sendShares([share for share in shares if share.peer is not peer])
+            for peer in list(p2p_node.peers.itervalues()):
+                yield peer.sendShares([share for share in shares if share.peer is not peer])
         
         # send share when the chain changes to their chain
         best_share_var.changed.watch(broadcast_share)
@@ -408,7 +409,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         def save_shares():
             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
                 ss.add_share(share)
-                if share.hash in tracker.verified.shares:
+                if share.hash in tracker.verified.items:
                     ss.add_verified_hash(share.hash)
         task.LoopingCall(save_shares).start(60)
         
@@ -438,7 +439,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
         
-        wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
+        wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
@@ -521,8 +522,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     height = tracker.get_height(best_share_var.value)
                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
                         height,
-                        len(tracker.verified.shares),
-                        len(tracker.shares),
+                        len(tracker.verified.items),
+                        len(tracker.items),
                         len(p2p_node.peers),
                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
@@ -533,7 +534,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                         math.format(int(my_att_s)),
                         math.format_dt(dt),
                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
-                        math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
+                        math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
                     )
                     
                     if height > 2:
@@ -649,6 +650,7 @@ def run():
     
     if args.debug:
         p2pool.DEBUG = True
+        defer.setDebugging(True)
     
     net_name = args.net_name + ('_testnet' if args.testnet else '')
     net = networks.nets[net_name]