From 627f0f5baba95134a943970107afa0c6f95a9dbf Mon Sep 17 00:00:00 2001 From: Forrest Voight Date: Fri, 12 Oct 2012 16:01:53 -0400 Subject: [PATCH 1/1] penalization and sending txs ahead of shares --- p2pool/data.py | 36 ++++++++++++++++++++++++++---------- p2pool/main.py | 26 +++++++++++++++++--------- p2pool/p2p.py | 43 ++++++++++++++++++++++++++++--------------- p2pool/util/variable.py | 3 +++ 4 files changed, 74 insertions(+), 34 deletions(-) diff --git a/p2pool/data.py b/p2pool/data.py index 6ab7126..7823e41 100644 --- a/p2pool/data.py +++ b/p2pool/data.py @@ -269,7 +269,19 @@ class Share(object): raise ValueError('''gentx doesn't match hash_link''') return gentx # only used by as_block - def as_block(self, tracker): + def get_other_tx_hashes(self, tracker): + return [] + + def get_other_txs(self, tracker, known_txs): + return [] + + def get_other_txs_size(self, tracker, known_txs): + return 0 + + def get_new_txs_size(self, known_txs): + return 0 + + def as_block(self, tracker, known_txs): if self.other_txs is None: raise ValueError('share does not contain all txs') return dict(header=self.header, txs=[self.check(tracker)] + self.other_txs) @@ -500,16 +512,16 @@ class NewShare(object): return gentx # only used by as_block - def as_block(self, tracker): - other_tx_hashes = [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']] + def get_other_tx_hashes(self, tracker): + return [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']] + + def as_block(self, tracker, known_txs): + other_tx_hashes = self.get_other_tx_hashes(tracker) + if not all(tx_hash in known_txs for tx_hash in other_tx_hashes): + return None # not all txs present - print [tx_hash in self.peer.remembered_txs for tx_hash in other_tx_hashes] - txs = [self.check(tracker)] + [self.peer.remembered_txs[tx_hash] for tx_hash in other_tx_hashes] - print - print 'SUCCESS' - print - return dict(header=self.header, txs=txs) + return dict(header=self.header, txs=[self.check(tracker)] + [known_txs[tx_hash] for tx_hash in other_tx_hashes]) class WeightsSkipList(forest.TrackerSkipList): @@ -580,7 +592,7 @@ class OkayTracker(forest.Tracker): self.verified.add(share) return True - def think(self, block_rel_height_func, previous_block, bits): + def think(self, block_rel_height_func, previous_block, bits, known_txs): desired = set() # O(len(self.heads)) @@ -646,6 +658,7 @@ class OkayTracker(forest.Tracker): #self.items[h].peer is None, self.items[h].pow_hash <= self.items[h].header['bits'].target, # is block solution (self.items[h].header['previous_block'], self.items[h].header['bits']) == (previous_block, bits) or self.items[h].peer is None, + self.items[h].as_block(self, known_txs) is not None, -self.items[h].time_seen, ), h) for h in self.verified.tails.get(best_tail, [])) if p2pool.DEBUG: @@ -708,6 +721,9 @@ class OkayTracker(forest.Tracker): if p2pool.DEBUG: print 'Stale detected! %x < %x' % (best_share.header['previous_block'], previous_block) best = best_share.previous_hash + elif best_share.as_block(self, known_txs) is None: + print 'Share with incomplete transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash)) + best = best_share.previous_hash timestamp_cutoff = min(int(time.time()), best_share.timestamp) - 3600 target_cutoff = 2**256//(self.net.SHARE_PERIOD*best_tail_score[1] + 1) * 2 if best_tail_score[1] is not None else 2**256-1 diff --git a/p2pool/main.py b/p2pool/main.py index 14e41ff..e01041c 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -226,12 +226,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # BEST SHARE + known_txs_var = variable.Variable({}) # hash -> tx + mining_txs_var = variable.Variable({}) # hash -> tx get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net) best_share_var = variable.Variable(None) desired_var = variable.Variable(None) def set_best_share(): - best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits']) + best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value) best_share_var.set(best) desired_var.set(desired) @@ -243,15 +245,17 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # setup p2p logic and join p2pool network - known_txs_var = variable.Variable({}) # hash -> tx - mining_txs_var = variable.Variable({}) # hash -> tx # update mining_txs according to getwork results - @bitcoind_work.changed.watch - def _(work): + @bitcoind_work.changed.run_and_watch + def _(_=None): new_mining_txs = {} - for tx in work['transactions']: - new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx + new_known_txs = dict(known_txs_var.value) + for tx in bitcoind_work.value['transactions']: + tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) + new_mining_txs[tx_hash] = tx + new_known_txs[tx_hash] = tx mining_txs_var.set(new_mining_txs) + known_txs_var.set(new_known_txs) # forward transactions seen to bitcoind @known_txs_var.transitioned.watch def _(before, after): @@ -341,7 +345,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): @tracker.verified.added.watch def _(share): if share.pow_hash <= share.header['bits'].target: - submit_block(share.as_block(tracker), ignore_failure=True) + block = share.as_block(tracker, known_txs_var.value) + if block is None: + print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) + return + submit_block(block, ignore_failure=True) print print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) print @@ -417,7 +425,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): shares.append(share) for peer in list(p2p_node.peers.itervalues()): - yield peer.sendShares([share for share in shares if share.peer is not peer]) + yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash]) # send share when the chain changes to their chain best_share_var.changed.watch(broadcast_share) diff --git a/p2pool/p2p.py b/p2pool/p2p.py index 4af2391..49a239d 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -15,6 +15,14 @@ from p2pool.util import deferral, p2protocol, pack, variable class PeerMisbehavingError(Exception): pass + +def fragment(f, **kwargs): + try: + return f(**kwargs) + except p2protocol.TooLong: + att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) + return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) + class Protocol(p2protocol.Protocol): def __init__(self, node, incoming): p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened) @@ -160,13 +168,13 @@ class Protocol(p2protocol.Protocol): added = set(after) - set(before) removed = set(before) - set(after) if added: - self.send_remember_tx(tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes]) + fragment(self.send_remember_tx, tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes]) if removed: self.send_forget_tx(tx_hashes=removed) watch_id2 = self.node.mining_txs_var.transitioned.watch(update_remote_view_of_my_mining_txs) self.connection_lost_event.watch(lambda: self.node.mining_txs_var.transitioned.unwatch(watch_id2)) - self.send_remember_tx(tx_hashes=[], txs=self.node.mining_txs_var.value.values()) + fragment(self.send_remember_tx, tx_hashes=[], txs=self.node.mining_txs_var.value.values()) message_ping = pack.ComposedType([]) def handle_ping(self): @@ -231,17 +239,22 @@ class Protocol(p2protocol.Protocol): def handle_shares(self, shares): self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]], self) - def sendShares(self, shares): - def att(f, **kwargs): - try: - return f(**kwargs) - except p2protocol.TooLong: - att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) - return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) - if shares: - return att(self.send_shares, shares=[share.as_share() for share in shares]) - else: + def sendShares(self, shares, tracker, known_txs, include_txs_with=[]): + if not shares: return defer.succeed(None) + + tx_hashes = set() + for share in shares: + if share.hash in include_txs_with: + tx_hashes.update(share.get_other_tx_hashes(tracker)) + + hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value] + + fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes and x in known_txs]) # last one required? + fragment(self.send_shares, shares=[share.as_share() for share in shares]) + res = self.send_forget_tx(tx_hashes=hashes_to_send) + + return res message_sharereq = pack.ComposedType([ @@ -475,16 +488,16 @@ class SingleClientFactory(protocol.ReconnectingClientFactory): self.node.lost_conn(proto, reason) class Node(object): - def __init__(self, best_share_hash_func, port, net, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({}), addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event()): + def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net - self.known_txs_var = known_txs_var - self.mining_txs_var = mining_txs_var self.addr_store = dict(addr_store) self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage self.traffic_happened = traffic_happened + self.known_txs_var = known_txs_var + self.mining_txs_var = mining_txs_var self.nonce = random.randrange(2**64) self.peers = {} diff --git a/p2pool/util/variable.py b/p2pool/util/variable.py index f11b94f..f9af6db 100644 --- a/p2pool/util/variable.py +++ b/p2pool/util/variable.py @@ -10,6 +10,9 @@ class Event(object): self._once = None self.times = 0 + def run_and_watch(self, func): + func() + return self.watch(func) def watch(self, func): id = self.id_generator.next() self.observers[id] = func -- 1.7.1