X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fnode.py;h=2b25fbe55bf157278fcf204f2b620915f0cacc7e;hb=0a3493d6873cfef4fb189d39e64dfbc6e162e2a7;hp=2d1d80734da5160c9fd6b4e7919d2d6cc8e36d44;hpb=50c7e919c05d4a30a18bcff0acf1cb32e9fb3ada;p=p2pool.git diff --git a/p2pool/node.py b/p2pool/node.py index 2d1d807..2b25fbe 100644 --- a/p2pool/node.py +++ b/p2pool/node.py @@ -2,7 +2,7 @@ import random import sys import time -from twisted.internet import defer, reactor, task +from twisted.internet import defer, reactor from twisted.python import log from p2pool import data as p2pool_data, p2p @@ -11,35 +11,39 @@ from p2pool.util import deferral, variable class P2PNode(p2p.Node): - def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs): + def __init__(self, node, **kwargs): self.node = node p2p.Node.__init__(self, best_share_hash_func=lambda: node.best_share_var.value, - port=p2pool_port, net=node.net, - addr_store=addrs, - connect_addrs=connect_addrs, - max_incoming_conns=p2pool_conns, known_txs_var=node.known_txs_var, mining_txs_var=node.mining_txs_var, - ) + **kwargs) def handle_shares(self, shares, peer): if len(shares) > 5: print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None) new_count = 0 - for share in shares: + all_new_txs = {} + for share, new_txs in shares: + if new_txs is not None: + all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs) + if share.hash in self.node.tracker.items: #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),) continue new_count += 1 - #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None) + #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer_addr) self.node.tracker.add(share) + new_known_txs = dict(self.node.known_txs_var.value) + new_known_txs.update(all_new_txs) + self.node.known_txs_var.set(new_known_txs) + if new_count: self.node.set_best_share() @@ -60,7 +64,7 @@ class P2PNode(p2p.Node): except: log.err(None, 'in handle_share_hashes:') else: - self.handle_shares(shares, peer) + self.handle_shares([(share, []) for share in shares], peer) def handle_get_shares(self, hashes, parents, stops, peer): parents = min(parents, 1000//len(hashes)) @@ -71,7 +75,8 @@ class P2PNode(p2p.Node): if share.hash in stops: break shares.append(share) - print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1]) + if len(shares) > 0: + print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1]) return shares def handle_bestblock(self, header, peer): @@ -79,7 +84,6 @@ class P2PNode(p2p.Node): raise p2p.PeerMisbehavingError('received block header fails PoW test') self.node.handle_header(header) - @defer.inlineCallbacks def broadcast_share(self, share_hash): shares = [] for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))): @@ -88,8 +92,8 @@ class P2PNode(p2p.Node): self.shared_share_hashes.add(share.hash) shares.append(share) - for peer in list(self.peers.itervalues()): - yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) + for peer in self.peers.itervalues(): + peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) def start(self): p2p.Node.start(self) @@ -102,7 +106,7 @@ class P2PNode(p2p.Node): def download_shares(): while True: desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0) - peer2, share_hash = random.choice(desired) + peer_addr, share_hash = random.choice(desired) if len(self.peers) == 0: yield deferral.sleep(1) @@ -113,9 +117,14 @@ class P2PNode(p2p.Node): try: shares = yield peer.get_shares( hashes=[share_hash], - parents=500, - stops=[], + parents=random.randrange(500), # randomize parents so that we eventually get past a too large block of shares + stops=list(set(self.node.tracker.heads) | set( + self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads + ))[:100], ) + except defer.TimeoutError: + print 'Share request timed out!' + continue except: log.err(None, 'in download_shares:') continue @@ -123,7 +132,7 @@ class P2PNode(p2p.Node): if not shares: yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has continue - self.handle_shares(shares, peer) + self.handle_shares([(share, []) for share in shares], peer) @self.node.best_block_header.changed.watch @@ -174,7 +183,7 @@ class Node(object): self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind))) @defer.inlineCallbacks def work_poller(): - while True: + while stop_signal.times == 0: flag = self.factory.new_block.get_deferred() try: self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate']))) @@ -204,6 +213,8 @@ class Node(object): self.handle_header = handle_header @defer.inlineCallbacks def poll_header(): + if self.factory.conn.value is None: + return handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block']))) self.bitcoind_work.changed.watch(lambda _: poll_header()) yield deferral.retry('Error while requesting best block header:')(poll_header)() @@ -272,9 +283,13 @@ class Node(object): if tx_hash in self.known_txs_var.value: new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash] self.known_txs_var.set(new_known_txs) - task.LoopingCall(forget_old_txs).start(10) + t = deferral.RobustLoopingCall(forget_old_txs) + t.start(10) + stop_signal.watch(t.stop) - task.LoopingCall(self.clean_tracker).start(5) + t = deferral.RobustLoopingCall(self.clean_tracker) + t.start(5) + stop_signal.watch(t.stop) def set_best_share(self): best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value) @@ -317,11 +332,7 @@ class Node(object): for tail, heads in self.tracker.tails.iteritems(): if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10: continue - for aftertail in self.tracker.reverse.get(tail, set()): - if len(self.tracker.reverse[self.tracker.items[aftertail].previous_hash]) > 1: # XXX - print "raw" - continue - to_remove.add(aftertail) + to_remove.update(self.tracker.reverse.get(tail, set())) if not to_remove: break # if removed from this, it must be removed from verified @@ -335,3 +346,5 @@ class Node(object): self.tracker.remove(aftertail) #end = time.time() #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove)) + + self.set_best_share()