Fix PoW function call
[p2pool.git] / p2pool / node.py
index c0ff82c..a3914d5 100644 (file)
@@ -2,7 +2,7 @@ import random
 import sys
 import time
 
-from twisted.internet import defer, reactor, task
+from twisted.internet import defer, reactor
 from twisted.python import log
 
 from p2pool import data as p2pool_data, p2p
@@ -11,35 +11,39 @@ from p2pool.util import deferral, variable
 
 
 class P2PNode(p2p.Node):
-    def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
+    def __init__(self, node, **kwargs):
         self.node = node
         p2p.Node.__init__(self,
             best_share_hash_func=lambda: node.best_share_var.value,
-            port=p2pool_port,
             net=node.net,
-            addr_store=addrs,
-            connect_addrs=connect_addrs,
-            max_incoming_conns=p2pool_conns,
             known_txs_var=node.known_txs_var,
             mining_txs_var=node.mining_txs_var,
-        )
+        **kwargs)
     
     def handle_shares(self, shares, peer):
         if len(shares) > 5:
             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
         
         new_count = 0
-        for share in shares:
+        all_new_txs = {}
+        for share, new_txs in shares:
+            if new_txs is not None:
+                all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
+            
             if share.hash in self.node.tracker.items:
                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
                 continue
             
             new_count += 1
             
-            #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
+            #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer_addr)
             
             self.node.tracker.add(share)
         
+        new_known_txs = dict(self.node.known_txs_var.value)
+        new_known_txs.update(all_new_txs)
+        self.node.known_txs_var.set(new_known_txs)
+        
         if new_count:
             self.node.set_best_share()
         
@@ -60,7 +64,7 @@ class P2PNode(p2p.Node):
         except:
             log.err(None, 'in handle_share_hashes:')
         else:
-            self.handle_shares(shares, peer)
+            self.handle_shares([(share, []) for share in shares], peer)
     
     def handle_get_shares(self, hashes, parents, stops, peer):
         parents = min(parents, 1000//len(hashes))
@@ -71,15 +75,15 @@ class P2PNode(p2p.Node):
                 if share.hash in stops:
                     break
                 shares.append(share)
-        print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
+        if len(shares) > 0:
+            print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
         return shares
     
     def handle_bestblock(self, header, peer):
-        if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
+        if bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
             raise p2p.PeerMisbehavingError('received block header fails PoW test')
         self.node.handle_header(header)
     
-    @defer.inlineCallbacks
     def broadcast_share(self, share_hash):
         shares = []
         for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
@@ -88,8 +92,8 @@ class P2PNode(p2p.Node):
             self.shared_share_hashes.add(share.hash)
             shares.append(share)
         
-        for peer in list(self.peers.itervalues()):
-            yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
+        for peer in self.peers.itervalues():
+            peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
     
     def start(self):
         p2p.Node.start(self)
@@ -102,7 +106,7 @@ class P2PNode(p2p.Node):
         def download_shares():
             while True:
                 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
-                peer2, share_hash = random.choice(desired)
+                peer_addr, share_hash = random.choice(desired)
                 
                 if len(self.peers) == 0:
                     yield deferral.sleep(1)
@@ -113,9 +117,14 @@ class P2PNode(p2p.Node):
                 try:
                     shares = yield peer.get_shares(
                         hashes=[share_hash],
-                        parents=500,
-                        stops=[],
+                        parents=random.randrange(500), # randomize parents so that we eventually get past a too large block of shares
+                        stops=list(set(self.node.tracker.heads) | set(
+                            self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
+                        ))[:100],
                     )
+                except defer.TimeoutError:
+                    print 'Share request timed out!'
+                    continue
                 except:
                     log.err(None, 'in download_shares:')
                     continue
@@ -123,7 +132,7 @@ class P2PNode(p2p.Node):
                 if not shares:
                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
                     continue
-                self.handle_shares(shares, peer)
+                self.handle_shares([(share, []) for share in shares], peer)
         
         
         @self.node.best_block_header.changed.watch
@@ -136,6 +145,9 @@ class P2PNode(p2p.Node):
         
         @self.node.tracker.verified.added.watch
         def _(share):
+            if share.timestamp < share.min_header['timestamp']:
+                return
+
             if not (share.pow_hash <= share.header['bits'].target):
                 return
             
@@ -174,36 +186,42 @@ class Node(object):
         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
         @defer.inlineCallbacks
         def work_poller():
-            while True:
+            while stop_signal.times == 0:
                 flag = self.factory.new_block.get_deferred()
                 try:
                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
                 except:
                     log.err()
-                yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
+                yield defer.DeferredList([flag, deferral.sleep(5)], fireOnOneCallback=True)
         work_poller()
         
         # PEER WORK
         
         self.best_block_header = variable.Variable(None)
+        self.pow_subsidy = 0
         def handle_header(new_header):
+            self.pow_bits = self.bitcoind_work.value['bits']
+            self.pow_subsidy = self.bitcoind_work.value['subsidy']
+
             # check that header matches current target
-            if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
+            if not (bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
                 return
             bitcoind_best_block = self.bitcoind_work.value['previous_block']
             if (self.best_block_header.value is None
                 or (
                     new_header['previous_block'] == bitcoind_best_block and
-                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
+                    bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
                 ) # new is child of current and previous is current
                 or (
-                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
+                    bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
                     self.best_block_header.value['previous_block'] != bitcoind_best_block
                 )): # new is current and previous is not a child of current
                 self.best_block_header.set(new_header)
         self.handle_header = handle_header
         @defer.inlineCallbacks
         def poll_header():
+            if self.factory.conn.value is None:
+                return
             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
         self.bitcoind_work.changed.watch(lambda _: poll_header())
         yield deferral.retry('Error while requesting best block header:')(poll_header)()
@@ -234,6 +252,10 @@ class Node(object):
         # add p2p transactions from bitcoind to known_txs
         @self.factory.new_tx.watch
         def _(tx):
+            if tx.timestamp > time.time() + 3600:
+                return
+            if tx.timestamp > self.bitcoind_work.value['txn_timestamp']:
+                self.bitcoind_work.value['txn_timestamp'] = tx.timestamp
             new_known_txs = dict(self.known_txs_var.value)
             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
             self.known_txs_var.set(new_known_txs)
@@ -272,21 +294,32 @@ class Node(object):
                     if tx_hash in self.known_txs_var.value:
                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
             self.known_txs_var.set(new_known_txs)
-        task.LoopingCall(forget_old_txs).start(10)
+        t = deferral.RobustLoopingCall(forget_old_txs)
+        t.start(10)
+        stop_signal.watch(t.stop)
         
-        task.LoopingCall(self.clean_tracker).start(5)
+        t = deferral.RobustLoopingCall(self.clean_tracker)
+        t.start(5)
+        stop_signal.watch(t.stop)
     
     def set_best_share(self):
-        best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+        best, desired, decorated_heads, bad_peer_addresses = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
         
         self.best_share_var.set(best)
         self.desired_var.set(desired)
+        if self.p2p_node is not None:
+            for bad_peer_address in bad_peer_addresses:
+                # XXX O(n)
+                for peer in self.p2p_node.peers.itervalues():
+                    if peer.addr == bad_peer_address:
+                        peer.badPeerHappened()
+                        break
     
     def get_current_txouts(self):
         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
     
     def clean_tracker(self):
-        best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+        best, desired, decorated_heads, bad_peer_addresses = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
         
         # eat away at heads
         if decorated_heads:
@@ -331,3 +364,5 @@ class Node(object):
                 self.tracker.remove(aftertail)
             #end = time.time()
             #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
+        
+        self.set_best_share()