rewrote share requesting more procedurally using P2P rpc-like "sharereq" command
[p2pool.git] / p2pool / main.py
index 76f406f..4f16daa 100644 (file)
@@ -22,7 +22,7 @@ from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
 from bitcoin import worker_interface, height_tracker
-from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
+from util import fixargparse, jsonrpc, variable, deferral, math, logging
 from . import p2p, networks, web, work
 import p2pool, p2pool.data as p2pool_data
 
@@ -220,46 +220,17 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # BEST SHARE
         
         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
-        requested = expiring_dict.ExpiringDict(300)
-        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
         
         best_share_var = variable.Variable(None)
+        desired_var = variable.Variable(None)
         def set_best_share():
             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
             
             best_share_var.set(best)
-            
-            t = time.time()
-            for peer2, share_hash in desired:
-                if share_hash not in tracker.tails: # was received in the time tracker.think was running
-                    continue
-                last_request_time, count = requested.get(share_hash, (None, 0))
-                if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
-                    continue
-                potential_peers = set()
-                for head in tracker.tails[share_hash]:
-                    potential_peers.update(peer_heads.get(head, set()))
-                potential_peers = [peer for peer in potential_peers if peer.connected2]
-                if count == 0 and peer2 is not None and peer2.connected2:
-                    peer = peer2
-                else:
-                    peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
-                    if peer is None:
-                        continue
-                
-                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
-                peer.send_getshares(
-                    hashes=[share_hash],
-                    parents=2000,
-                    stops=list(set(tracker.heads) | set(
-                        tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
-                    ))[:100],
-                )
-                requested[share_hash] = t, count + 1
+            desired_var.set(desired)
         bitcoind_work.changed.watch(lambda _: set_best_share())
         set_best_share()
         
-        
         print '    ...success!'
         print
         
@@ -282,32 +253,27 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     
                     tracker.add(share)
                 
-                if shares and peer is not None:
-                    peer_heads.setdefault(shares[0].hash, set()).add(peer)
-                
                 if new_count:
                     set_best_share()
                 
                 if len(shares) > 5:
                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
             
+            @defer.inlineCallbacks
             def handle_share_hashes(self, hashes, peer):
-                t = time.time()
-                get_hashes = []
-                for share_hash in hashes:
-                    if share_hash in tracker.items:
-                        continue
-                    last_request_time, count = requested.get(share_hash, (None, 0))
-                    if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
-                        continue
-                    print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
-                    get_hashes.append(share_hash)
-                    requested[share_hash] = t, count + 1
-                
-                if hashes and peer is not None:
-                    peer_heads.setdefault(hashes[0], set()).add(peer)
-                if get_hashes:
-                    peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
+                new_hashes = [x for x in hashes if x not in tracker.items]
+                if not new_hashes:
+                    return
+                try:
+                    shares = yield peer.get_shares(
+                        hashes=new_hashes,
+                        parents=0,
+                        stops=[],
+                    )
+                except:
+                    log.err(None, 'in handle_share_hashes:')
+                else:
+                    self.handle_shares(shares, peer)
             
             def handle_get_shares(self, hashes, parents, stops, peer):
                 parents = min(parents, 1000//len(hashes))
@@ -436,6 +402,30 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     ss.add_verified_hash(share.hash)
         task.LoopingCall(save_shares).start(60)
         
+        @apply
+        @defer.inlineCallbacks
+        def download_shares():
+            while True:
+                desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
+                peer2, share_hash = random.choice(desired)
+                
+                if len(p2p_node.peers) == 0:
+                    yield deferral.sleep(1)
+                    continue
+                peer = random.choice(p2p_node.peers.values())
+                
+                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
+                try:
+                    shares = yield peer.get_shares(
+                        hashes=[share_hash],
+                        parents=500,
+                        stops=[],
+                    )
+                except:
+                    log.err(None, 'in download_shares:')
+                else:
+                    p2p_node.handle_shares(shares, peer)
+        
         print '    ...success!'
         print