Do not print "Sending 0 shares"
[p2pool.git] / p2pool / node.py
index 406787c..1b7b94e 100644 (file)
@@ -1,5 +1,6 @@
 import random
 import sys
+import time
 
 from twisted.internet import defer, reactor, task
 from twisted.python import log
@@ -10,18 +11,14 @@ from p2pool.util import deferral, variable
 
 
 class P2PNode(p2p.Node):
-    def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
+    def __init__(self, node, **kwargs):
         self.node = node
         p2p.Node.__init__(self,
             best_share_hash_func=lambda: node.best_share_var.value,
-            port=p2pool_port,
             net=node.net,
-            addr_store=addrs,
-            connect_addrs=connect_addrs,
-            max_incoming_conns=p2pool_conns,
             known_txs_var=node.known_txs_var,
             mining_txs_var=node.mining_txs_var,
-        )
+        **kwargs)
     
     def handle_shares(self, shares, peer):
         if len(shares) > 5:
@@ -70,7 +67,8 @@ class P2PNode(p2p.Node):
                 if share.hash in stops:
                     break
                 shares.append(share)
-        print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
+        if len(shares) > 0:
+            print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
         return shares
     
     def handle_bestblock(self, header, peer):
@@ -113,8 +111,13 @@ class P2PNode(p2p.Node):
                     shares = yield peer.get_shares(
                         hashes=[share_hash],
                         parents=500,
-                        stops=[],
+                        stops=list(set(self.node.tracker.heads) | set(
+                            self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
+                        ))[:100],
                     )
+                except defer.TimeoutError:
+                    print 'Share request timed out!'
+                    continue
                 except:
                     log.err(None, 'in download_shares:')
                     continue
@@ -173,7 +176,7 @@ class Node(object):
         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
         @defer.inlineCallbacks
         def work_poller():
-            while True:
+            while stop_signal.times == 0:
                 flag = self.factory.new_block.get_deferred()
                 try:
                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
@@ -203,6 +206,8 @@ class Node(object):
         self.handle_header = handle_header
         @defer.inlineCallbacks
         def poll_header():
+            if self.factory.conn.value is None:
+                return
             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
         self.bitcoind_work.changed.watch(lambda _: poll_header())
         yield deferral.retry('Error while requesting best block header:')(poll_header)()
@@ -241,6 +246,8 @@ class Node(object):
         @defer.inlineCallbacks
         def _(before, after):
             yield deferral.sleep(random.expovariate(1/1))
+            if self.factory.conn.value is None:
+                return
             for tx_hash in set(after) - set(before):
                 self.factory.conn.value.send_tx(tx=after[tx_hash])
         
@@ -259,6 +266,8 @@ class Node(object):
             print
         
         def forget_old_txs():
+            print "KNOWN:", sum(bitcoin_data.tx_type.packed_size(tx) for tx in self.known_txs_var.value.itervalues())
+            print "MINING:", sum(bitcoin_data.tx_type.packed_size(tx) for tx in self.mining_txs_var.value.itervalues())
             new_known_txs = {}
             if self.p2p_node is not None:
                 for peer in self.p2p_node.peers.itervalues():
@@ -269,13 +278,68 @@ class Node(object):
                     if tx_hash in self.known_txs_var.value:
                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
             self.known_txs_var.set(new_known_txs)
-        task.LoopingCall(forget_old_txs).start(10)
+        t = task.LoopingCall(forget_old_txs)
+        t.start(10)
+        stop_signal.watch(t.stop)
+        
+        t = task.LoopingCall(self.clean_tracker)
+        t.start(5)
+        stop_signal.watch(t.stop)
     
     def set_best_share(self):
-        best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+        best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
         
         self.best_share_var.set(best)
         self.desired_var.set(desired)
     
     def get_current_txouts(self):
         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
+    
+    def clean_tracker(self):
+        best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+        
+        # eat away at heads
+        if decorated_heads:
+            for i in xrange(1000):
+                to_remove = set()
+                for share_hash, tail in self.tracker.heads.iteritems():
+                    if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
+                        #print 1
+                        continue
+                    if self.tracker.items[share_hash].time_seen > time.time() - 300:
+                        #print 2
+                        continue
+                    if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
+                        #print 3
+                        continue
+                    to_remove.add(share_hash)
+                if not to_remove:
+                    break
+                for share_hash in to_remove:
+                    if share_hash in self.tracker.verified.items:
+                        self.tracker.verified.remove(share_hash)
+                    self.tracker.remove(share_hash)
+                #print "_________", to_remove
+        
+        # drop tails
+        for i in xrange(1000):
+            to_remove = set()
+            for tail, heads in self.tracker.tails.iteritems():
+                if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
+                    continue
+                to_remove.update(self.tracker.reverse.get(tail, set()))
+            if not to_remove:
+                break
+            # if removed from this, it must be removed from verified
+            #start = time.time()
+            for aftertail in to_remove:
+                if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
+                    print "erk", aftertail, self.tracker.items[aftertail].previous_hash
+                    continue
+                if aftertail in self.tracker.verified.items:
+                    self.tracker.verified.remove(aftertail)
+                self.tracker.remove(aftertail)
+            #end = time.time()
+            #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
+        
+        self.set_best_share()