rewrote share requesting more procedurally using P2P rpc-like "sharereq" command
authorForrest Voight <forrest.voight@gmail.com>
Sun, 19 Aug 2012 23:15:02 +0000 (19:15 -0400)
committerForrest Voight <forrest.voight@gmail.com>
Mon, 20 Aug 2012 07:36:01 +0000 (03:36 -0400)
p2pool/main.py
p2pool/p2p.py
p2pool/util/deferral.py

index 76f406f..4f16daa 100644 (file)
@@ -22,7 +22,7 @@ from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
 from bitcoin import worker_interface, height_tracker
-from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
+from util import fixargparse, jsonrpc, variable, deferral, math, logging
 from . import p2p, networks, web, work
 import p2pool, p2pool.data as p2pool_data
 
@@ -220,46 +220,17 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # BEST SHARE
         
         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
-        requested = expiring_dict.ExpiringDict(300)
-        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
         
         best_share_var = variable.Variable(None)
+        desired_var = variable.Variable(None)
         def set_best_share():
             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
             
             best_share_var.set(best)
-            
-            t = time.time()
-            for peer2, share_hash in desired:
-                if share_hash not in tracker.tails: # was received in the time tracker.think was running
-                    continue
-                last_request_time, count = requested.get(share_hash, (None, 0))
-                if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
-                    continue
-                potential_peers = set()
-                for head in tracker.tails[share_hash]:
-                    potential_peers.update(peer_heads.get(head, set()))
-                potential_peers = [peer for peer in potential_peers if peer.connected2]
-                if count == 0 and peer2 is not None and peer2.connected2:
-                    peer = peer2
-                else:
-                    peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
-                    if peer is None:
-                        continue
-                
-                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
-                peer.send_getshares(
-                    hashes=[share_hash],
-                    parents=2000,
-                    stops=list(set(tracker.heads) | set(
-                        tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
-                    ))[:100],
-                )
-                requested[share_hash] = t, count + 1
+            desired_var.set(desired)
         bitcoind_work.changed.watch(lambda _: set_best_share())
         set_best_share()
         
-        
         print '    ...success!'
         print
         
@@ -282,32 +253,27 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     
                     tracker.add(share)
                 
-                if shares and peer is not None:
-                    peer_heads.setdefault(shares[0].hash, set()).add(peer)
-                
                 if new_count:
                     set_best_share()
                 
                 if len(shares) > 5:
                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
             
+            @defer.inlineCallbacks
             def handle_share_hashes(self, hashes, peer):
-                t = time.time()
-                get_hashes = []
-                for share_hash in hashes:
-                    if share_hash in tracker.items:
-                        continue
-                    last_request_time, count = requested.get(share_hash, (None, 0))
-                    if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
-                        continue
-                    print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
-                    get_hashes.append(share_hash)
-                    requested[share_hash] = t, count + 1
-                
-                if hashes and peer is not None:
-                    peer_heads.setdefault(hashes[0], set()).add(peer)
-                if get_hashes:
-                    peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
+                new_hashes = [x for x in hashes if x not in tracker.items]
+                if not new_hashes:
+                    return
+                try:
+                    shares = yield peer.get_shares(
+                        hashes=new_hashes,
+                        parents=0,
+                        stops=[],
+                    )
+                except:
+                    log.err(None, 'in handle_share_hashes:')
+                else:
+                    self.handle_shares(shares, peer)
             
             def handle_get_shares(self, hashes, parents, stops, peer):
                 parents = min(parents, 1000//len(hashes))
@@ -436,6 +402,30 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     ss.add_verified_hash(share.hash)
         task.LoopingCall(save_shares).start(60)
         
+        @apply
+        @defer.inlineCallbacks
+        def download_shares():
+            while True:
+                desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
+                peer2, share_hash = random.choice(desired)
+                
+                if len(p2p_node.peers) == 0:
+                    yield deferral.sleep(1)
+                    continue
+                peer = random.choice(p2p_node.peers.values())
+                
+                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
+                try:
+                    shares = yield peer.get_shares(
+                        hashes=[share_hash],
+                        parents=500,
+                        stops=[],
+                    )
+                except:
+                    log.err(None, 'in download_shares:')
+                else:
+                    p2p_node.handle_shares(shares, peer)
+        
         print '    ...success!'
         print
         
index 00c1539..c5a3ef6 100644 (file)
@@ -23,8 +23,6 @@ class Protocol(p2protocol.Protocol):
         
         self.other_version = None
         self.connected2 = False
-        
-        self.get_shares = deferral.GenericDeferrer(2**256, lambda id, hashes, parents, stops: self.send_sharereq(id=id, hashes=hashes, parents=parents, stops=stops))
     
     def connectionMade(self):
         p2protocol.Protocol.connectionMade(self)
@@ -53,6 +51,13 @@ class Protocol(p2protocol.Protocol):
         )
         
         self.timeout_delayed = reactor.callLater(10, self._connect_timeout)
+        
+        self.get_shares = deferral.GenericDeferrer(
+            max_id=2**256,
+            func=lambda id, hashes, parents, stops: self.send_sharereq(id=id, hashes=hashes, parents=parents, stops=stops),
+            timeout=15,
+            on_timeout=self.transport.loseConnection,
+        )
     
     def _connect_timeout(self):
         self.timeout_delayed = None
@@ -239,7 +244,7 @@ class Protocol(p2protocol.Protocol):
     ])
     def handle_sharereply(self, id, result, shares):
         if result == 'good':
-            res = shares
+            res = [p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]]
         else:
             res = failure.Failure("sharereply result: " + result)
         self.get_shares.got_response(id, res)
index dfcd370..dfc9515 100644 (file)
@@ -88,10 +88,11 @@ class GenericDeferrer(object):
     Converts query with identifier/got response interface to deferred interface
     '''
     
-    def __init__(self, max_id, func, timeout=5):
+    def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
         self.max_id = max_id
         self.func = func
         self.timeout = timeout
+        self.on_timeout = on_timeout
         self.map = {}
     
     def __call__(self, *args, **kwargs):
@@ -103,6 +104,7 @@ class GenericDeferrer(object):
         def timeout():
             self.map.pop(id)
             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+            self.on_timeout()
         timer = reactor.callLater(self.timeout, timeout)
         self.func(id, *args, **kwargs)
         self.map[id] = df, timer
@@ -117,7 +119,7 @@ class GenericDeferrer(object):
     
     def respond_all(self, resp):
         while self.map:
-            df, timer = self.map.popitem()
+            id, (df, timer) = self.map.popitem()
             timer.cancel()
             df.errback(resp)