simplified transaction preforwarding by requiring shares to be accompanied by only...
authorForrest Voight <forrest@forre.st>
Sun, 30 Jun 2013 15:32:59 +0000 (11:32 -0400)
committerForrest Voight <forrest@forre.st>
Wed, 3 Jul 2013 18:31:03 +0000 (14:31 -0400)
p2pool/node.py
p2pool/p2p.py

index 1c98866..d73a0f2 100644 (file)
@@ -25,7 +25,11 @@ class P2PNode(p2p.Node):
             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
         
         new_count = 0
-        for share in shares:
+        all_new_txs = {}
+        for share, new_txs in shares:
+            if new_txs is not None:
+                all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
+            
             if share.hash in self.node.tracker.items:
                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
                 continue
@@ -36,6 +40,10 @@ class P2PNode(p2p.Node):
             
             self.node.tracker.add(share)
         
+        new_known_txs = dict(self.node.known_txs_var.value)
+        new_known_txs.update(all_new_txs)
+        self.node.known_txs_var.set(new_known_txs)
+        
         if new_count:
             self.node.set_best_share()
         
@@ -56,7 +64,7 @@ class P2PNode(p2p.Node):
         except:
             log.err(None, 'in handle_share_hashes:')
         else:
-            self.handle_shares(shares, peer)
+            self.handle_shares([(share, []) for share in shares], peer)
     
     def handle_get_shares(self, hashes, parents, stops, peer):
         parents = min(parents, 1000//len(hashes))
@@ -124,7 +132,7 @@ class P2PNode(p2p.Node):
                 if not shares:
                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
                     continue
-                self.handle_shares(shares, peer)
+                self.handle_shares([(share, []) for share in shares], peer)
         
         
         @self.node.best_block_header.changed.watch
index 189fd2c..d116e40 100644 (file)
@@ -252,11 +252,43 @@ class Protocol(p2protocol.Protocol):
         ('shares', pack.ListType(p2pool_data.share_type)),
     ])
     def handle_shares(self, shares):
-        self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self.addr) for share in shares if share['type'] >= 9], self)
+        result = []
+        for wrappedshare in shares:
+            if wrappedshare['type'] < 9: continue
+            share = p2pool_data.load_share(wrappedshare, self.node.net, self.addr)
+            if wrappedshare['type'] >= 13:
+                txs = []
+                for tx_hash in share.share_info['new_transaction_hashes']:
+                    if tx_hash in self.node.known_txs_var.value:
+                        tx = self.node.known_txs_var.value[tx_hash]
+                    else:
+                        for cache in self.known_txs_cache.itervalues():
+                            if tx_hash in cache:
+                                tx = cache[tx_hash]
+                                print 'Transaction %064x rescued from peer latency cache!' % (tx_hash,)
+                                break
+                        else:
+                            print >>sys.stderr, 'Peer referenced unknown transaction %064x, disconnecting' % (tx_hash,)
+                            self.disconnect()
+                            return
+                    txs.append(tx)
+            else:
+                txs = None
+            
+            result.append((share, txs))
+            
+        self.node.handle_shares(result, self)
     
     def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):
         tx_hashes = set()
         for share in shares:
+            if share.VERSION >= 13:
+                # send full transaction for every new_transaction_hash that peer does not know
+                for tx_hash in share.share_info['new_transaction_hashes']:
+                    assert tx_hash in known_txs, 'tried to broadcast transaction without knowing all its new transactions'
+                    if tx_hash not in self.remote_tx_hashes:
+                        tx_hashes.add(tx_hash)
+                continue
             if share.hash in include_txs_with:
                 x = share.get_other_tx_hashes(tracker)
                 if x is not None: