penalization and sending txs ahead of shares
authorForrest Voight <forrest@forre.st>
Fri, 12 Oct 2012 20:01:53 +0000 (16:01 -0400)
committerForrest Voight <forrest@forre.st>
Mon, 15 Oct 2012 06:15:29 +0000 (02:15 -0400)
p2pool/data.py
p2pool/main.py
p2pool/p2p.py
p2pool/util/variable.py

index 6ab7126..7823e41 100644 (file)
@@ -269,7 +269,19 @@ class Share(object):
             raise ValueError('''gentx doesn't match hash_link''')
         return gentx # only used by as_block
     
-    def as_block(self, tracker):
+    def get_other_tx_hashes(self, tracker):
+        return []
+    
+    def get_other_txs(self, tracker, known_txs):
+        return []
+    
+    def get_other_txs_size(self, tracker, known_txs):
+        return 0
+    
+    def get_new_txs_size(self, known_txs):
+        return 0
+    
+    def as_block(self, tracker, known_txs):
         if self.other_txs is None:
             raise ValueError('share does not contain all txs')
         return dict(header=self.header, txs=[self.check(tracker)] + self.other_txs)
@@ -500,16 +512,16 @@ class NewShare(object):
         
         return gentx # only used by as_block
     
-    def as_block(self, tracker):
-        other_tx_hashes = [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']]
+    def get_other_tx_hashes(self, tracker):
+        return [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']]
+    
+    def as_block(self, tracker, known_txs):
+        other_tx_hashes = self.get_other_tx_hashes(tracker)
         
+        if not all(tx_hash in known_txs for tx_hash in other_tx_hashes):
+            return None # not all txs present
         
-        print [tx_hash in self.peer.remembered_txs for tx_hash in other_tx_hashes]
-        txs = [self.check(tracker)] + [self.peer.remembered_txs[tx_hash] for tx_hash in other_tx_hashes]
-        print
-        print 'SUCCESS'
-        print
-        return dict(header=self.header, txs=txs)
+        return dict(header=self.header, txs=[self.check(tracker)] + [known_txs[tx_hash] for tx_hash in other_tx_hashes])
 
 
 class WeightsSkipList(forest.TrackerSkipList):
@@ -580,7 +592,7 @@ class OkayTracker(forest.Tracker):
             self.verified.add(share)
             return True
     
-    def think(self, block_rel_height_func, previous_block, bits):
+    def think(self, block_rel_height_func, previous_block, bits, known_txs):
         desired = set()
         
         # O(len(self.heads))
@@ -646,6 +658,7 @@ class OkayTracker(forest.Tracker):
             #self.items[h].peer is None,
             self.items[h].pow_hash <= self.items[h].header['bits'].target, # is block solution
             (self.items[h].header['previous_block'], self.items[h].header['bits']) == (previous_block, bits) or self.items[h].peer is None,
+            self.items[h].as_block(self, known_txs) is not None,
             -self.items[h].time_seen,
         ), h) for h in self.verified.tails.get(best_tail, []))
         if p2pool.DEBUG:
@@ -708,6 +721,9 @@ class OkayTracker(forest.Tracker):
                 if p2pool.DEBUG:
                     print 'Stale detected! %x < %x' % (best_share.header['previous_block'], previous_block)
                 best = best_share.previous_hash
+            elif best_share.as_block(self, known_txs) is None:
+                print 'Share with incomplete transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash))
+                best = best_share.previous_hash
             
             timestamp_cutoff = min(int(time.time()), best_share.timestamp) - 3600
             target_cutoff = 2**256//(self.net.SHARE_PERIOD*best_tail_score[1] + 1) * 2 if best_tail_score[1] is not None else 2**256-1
index 14e41ff..e01041c 100644 (file)
@@ -226,12 +226,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         # BEST SHARE
         
+        known_txs_var = variable.Variable({}) # hash -> tx
+        mining_txs_var = variable.Variable({}) # hash -> tx
         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
         
         best_share_var = variable.Variable(None)
         desired_var = variable.Variable(None)
         def set_best_share():
-            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
+            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
             
             best_share_var.set(best)
             desired_var.set(desired)
@@ -243,15 +245,17 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         # setup p2p logic and join p2pool network
         
-        known_txs_var = variable.Variable({}) # hash -> tx
-        mining_txs_var = variable.Variable({}) # hash -> tx
         # update mining_txs according to getwork results
-        @bitcoind_work.changed.watch
-        def _(work):
+        @bitcoind_work.changed.run_and_watch
+        def _(_=None):
             new_mining_txs = {}
-            for tx in work['transactions']:
-                new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
+            new_known_txs = dict(known_txs_var.value)
+            for tx in bitcoind_work.value['transactions']:
+                tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))
+                new_mining_txs[tx_hash] = tx
+                new_known_txs[tx_hash] = tx
             mining_txs_var.set(new_mining_txs)
+            known_txs_var.set(new_known_txs)
         # forward transactions seen to bitcoind
         @known_txs_var.transitioned.watch
         def _(before, after):
@@ -341,7 +345,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         @tracker.verified.added.watch
         def _(share):
             if share.pow_hash <= share.header['bits'].target:
-                submit_block(share.as_block(tracker), ignore_failure=True)
+                block = share.as_block(tracker, known_txs_var.value)
+                if block is None:
+                    print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+                    return
+                submit_block(block, ignore_failure=True)
                 print
                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
                 print
@@ -417,7 +425,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 shares.append(share)
             
             for peer in list(p2p_node.peers.itervalues()):
-                yield peer.sendShares([share for share in shares if share.peer is not peer])
+                yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
         
         # send share when the chain changes to their chain
         best_share_var.changed.watch(broadcast_share)
index 4af2391..49a239d 100644 (file)
@@ -15,6 +15,14 @@ from p2pool.util import deferral, p2protocol, pack, variable
 class PeerMisbehavingError(Exception):
     pass
 
+
+def fragment(f, **kwargs):
+    try:
+        return f(**kwargs)
+    except p2protocol.TooLong:
+        att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
+        return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
+
 class Protocol(p2protocol.Protocol):
     def __init__(self, node, incoming):
         p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened)
@@ -160,13 +168,13 @@ class Protocol(p2protocol.Protocol):
             added = set(after) - set(before)
             removed = set(before) - set(after)
             if added:
-                self.send_remember_tx(tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes])
+                fragment(self.send_remember_tx, tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes])
             if removed:
                 self.send_forget_tx(tx_hashes=removed)
         watch_id2 = self.node.mining_txs_var.transitioned.watch(update_remote_view_of_my_mining_txs)
         self.connection_lost_event.watch(lambda: self.node.mining_txs_var.transitioned.unwatch(watch_id2))
         
-        self.send_remember_tx(tx_hashes=[], txs=self.node.mining_txs_var.value.values())
+        fragment(self.send_remember_tx, tx_hashes=[], txs=self.node.mining_txs_var.value.values())
     
     message_ping = pack.ComposedType([])
     def handle_ping(self):
@@ -231,17 +239,22 @@ class Protocol(p2protocol.Protocol):
     def handle_shares(self, shares):
         self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]], self)
     
-    def sendShares(self, shares):
-        def att(f, **kwargs):
-            try:
-                return f(**kwargs)
-            except p2protocol.TooLong:
-                att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
-                return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
-        if shares:
-            return att(self.send_shares, shares=[share.as_share() for share in shares])
-        else:
+    def sendShares(self, shares, tracker, known_txs, include_txs_with=[]):
+        if not shares:
             return defer.succeed(None)
+        
+        tx_hashes = set()
+        for share in shares:
+            if share.hash in include_txs_with:
+                tx_hashes.update(share.get_other_tx_hashes(tracker))
+        
+        hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value]
+        
+        fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes and x in known_txs]) # last one required?
+        fragment(self.send_shares, shares=[share.as_share() for share in shares])
+        res = self.send_forget_tx(tx_hashes=hashes_to_send)
+        
+        return res
     
     
     message_sharereq = pack.ComposedType([
@@ -475,16 +488,16 @@ class SingleClientFactory(protocol.ReconnectingClientFactory):
         self.node.lost_conn(proto, reason)
 
 class Node(object):
-    def __init__(self, best_share_hash_func, port, net, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event()):
+    def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})):
         self.best_share_hash_func = best_share_hash_func
         self.port = port
         self.net = net
-        self.known_txs_var = known_txs_var
-        self.mining_txs_var = mining_txs_var
         self.addr_store = dict(addr_store)
         self.connect_addrs = connect_addrs
         self.preferred_storage = preferred_storage
         self.traffic_happened = traffic_happened
+        self.known_txs_var = known_txs_var
+        self.mining_txs_var = mining_txs_var
         
         self.nonce = random.randrange(2**64)
         self.peers = {}
index f11b94f..f9af6db 100644 (file)
@@ -10,6 +10,9 @@ class Event(object):
         self._once = None
         self.times = 0
     
+    def run_and_watch(self, func):
+        func()
+        return self.watch(func)
     def watch(self, func):
         id = self.id_generator.next()
         self.observers[id] = func