print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
new_count = 0
- for share in shares:
+ all_new_txs = {}
+ for share, new_txs in shares:
+ if new_txs is not None:
+ all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
+
if share.hash in self.node.tracker.items:
#print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
continue
self.node.tracker.add(share)
+ new_known_txs = dict(self.node.known_txs_var.value)
+ new_known_txs.update(all_new_txs)
+ self.node.known_txs_var.set(new_known_txs)
+
if new_count:
self.node.set_best_share()
except:
log.err(None, 'in handle_share_hashes:')
else:
- self.handle_shares(shares, peer)
+ self.handle_shares([(share, []) for share in shares], peer)
def handle_get_shares(self, hashes, parents, stops, peer):
parents = min(parents, 1000//len(hashes))
if not shares:
yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
continue
- self.handle_shares(shares, peer)
+ self.handle_shares([(share, []) for share in shares], peer)
@self.node.best_block_header.changed.watch
('shares', pack.ListType(p2pool_data.share_type)),
])
def handle_shares(self, shares):
- self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self.addr) for share in shares if share['type'] >= 9], self)
+ result = []
+ for wrappedshare in shares:
+ if wrappedshare['type'] < 9: continue
+ share = p2pool_data.load_share(wrappedshare, self.node.net, self.addr)
+ if wrappedshare['type'] >= 13:
+ txs = []
+ for tx_hash in share.share_info['new_transaction_hashes']:
+ if tx_hash in self.node.known_txs_var.value:
+ tx = self.node.known_txs_var.value[tx_hash]
+ else:
+ for cache in self.known_txs_cache.itervalues():
+ if tx_hash in cache:
+ tx = cache[tx_hash]
+ print 'Transaction %064x rescued from peer latency cache!' % (tx_hash,)
+ break
+ else:
+ print >>sys.stderr, 'Peer referenced unknown transaction %064x, disconnecting' % (tx_hash,)
+ self.disconnect()
+ return
+ txs.append(tx)
+ else:
+ txs = None
+
+ result.append((share, txs))
+
+ self.node.handle_shares(result, self)
def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):
tx_hashes = set()
for share in shares:
+ if share.VERSION >= 13:
+ # send full transaction for every new_transaction_hash that peer does not know
+ for tx_hash in share.share_info['new_transaction_hashes']:
+ assert tx_hash in known_txs, 'tried to broadcast transaction without knowing all its new transactions'
+ if tx_hash not in self.remote_tx_hashes:
+ tx_hashes.add(tx_hash)
+ continue
if share.hash in include_txs_with:
x = share.get_other_tx_hashes(tracker)
if x is not None: