X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fnode.py;h=ef60fb1a6bc1762269a139f58f84b9492ac04ed6;hb=2cb4d8381e179f71ea2075cdce948ea83cf0dc55;hp=406787c9fe62aef1d8a72ad7865014a3adc8ae8a;hpb=764a90c3ab7c0fe29a85fd588333d17a1138ac6a;p=p2pool.git diff --git a/p2pool/node.py b/p2pool/node.py index 406787c..ef60fb1 100644 --- a/p2pool/node.py +++ b/p2pool/node.py @@ -1,5 +1,6 @@ import random import sys +import time from twisted.internet import defer, reactor, task from twisted.python import log @@ -10,18 +11,14 @@ from p2pool.util import deferral, variable class P2PNode(p2p.Node): - def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs): + def __init__(self, node, **kwargs): self.node = node p2p.Node.__init__(self, best_share_hash_func=lambda: node.best_share_var.value, - port=p2pool_port, net=node.net, - addr_store=addrs, - connect_addrs=connect_addrs, - max_incoming_conns=p2pool_conns, known_txs_var=node.known_txs_var, mining_txs_var=node.mining_txs_var, - ) + **kwargs) def handle_shares(self, shares, peer): if len(shares) > 5: @@ -173,7 +170,7 @@ class Node(object): self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind))) @defer.inlineCallbacks def work_poller(): - while True: + while stop_signal.times == 0: flag = self.factory.new_block.get_deferred() try: self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate']))) @@ -203,6 +200,8 @@ class Node(object): self.handle_header = handle_header @defer.inlineCallbacks def poll_header(): + if self.factory.conn.value is None: + return handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block']))) self.bitcoind_work.changed.watch(lambda _: poll_header()) yield deferral.retry('Error while requesting best block header:')(poll_header)() @@ -241,6 +240,8 @@ class Node(object): @defer.inlineCallbacks def _(before, after): yield deferral.sleep(random.expovariate(1/1)) + if self.factory.conn.value is None: + return for tx_hash in set(after) - set(before): self.factory.conn.value.send_tx(tx=after[tx_hash]) @@ -269,13 +270,68 @@ class Node(object): if tx_hash in self.known_txs_var.value: new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash] self.known_txs_var.set(new_known_txs) - task.LoopingCall(forget_old_txs).start(10) + t = task.LoopingCall(forget_old_txs) + t.start(10) + stop_signal.watch(t.stop) + + t = task.LoopingCall(self.clean_tracker) + t.start(5) + stop_signal.watch(t.stop) def set_best_share(self): - best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value) + best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value) self.best_share_var.set(best) self.desired_var.set(desired) def get_current_txouts(self): return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net) + + def clean_tracker(self): + best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value) + + # eat away at heads + if decorated_heads: + for i in xrange(1000): + to_remove = set() + for share_hash, tail in self.tracker.heads.iteritems(): + if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]: + #print 1 + continue + if self.tracker.items[share_hash].time_seen > time.time() - 300: + #print 2 + continue + if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid + #print 3 + continue + to_remove.add(share_hash) + if not to_remove: + break + for share_hash in to_remove: + if share_hash in self.tracker.verified.items: + self.tracker.verified.remove(share_hash) + self.tracker.remove(share_hash) + #print "_________", to_remove + + # drop tails + for i in xrange(1000): + to_remove = set() + for tail, heads in self.tracker.tails.iteritems(): + if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10: + continue + to_remove.update(self.tracker.reverse.get(tail, set())) + if not to_remove: + break + # if removed from this, it must be removed from verified + #start = time.time() + for aftertail in to_remove: + if self.tracker.items[aftertail].previous_hash not in self.tracker.tails: + print "erk", aftertail, self.tracker.items[aftertail].previous_hash + continue + if aftertail in self.tracker.verified.items: + self.tracker.verified.remove(aftertail) + self.tracker.remove(aftertail) + #end = time.time() + #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove)) + + self.set_best_share()