From: Forrest Voight Date: Mon, 15 Oct 2012 06:35:01 +0000 (-0400) Subject: Merge remote-tracking branch 'origin/master' into tx_preforwarding X-Git-Tag: 8.0^2~2 X-Git-Url: https://git.novaco.in/?a=commitdiff_plain;h=414e981a3859c911f459027fa7869bcd8666469a;hp=0ca9aa7494df862a242b479d50d48e84fafb0de7;p=p2pool.git Merge remote-tracking branch 'origin/master' into tx_preforwarding --- diff --git a/p2pool/bitcoin/p2p.py b/p2pool/bitcoin/p2p.py index 4d04d54..fd403f1 100644 --- a/p2pool/bitcoin/p2p.py +++ b/p2pool/bitcoin/p2p.py @@ -53,7 +53,6 @@ class Protocol(p2protocol.Protocol): def handle_verack(self): self.get_block = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)])) self.get_block_header = deferral.ReplyMatcher(lambda hash: self.send_getheaders(version=1, have=[], last=hash)) - self.get_tx = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='tx', hash=hash)])) if hasattr(self.factory, 'resetDelay'): self.factory.resetDelay() @@ -72,7 +71,7 @@ class Protocol(p2protocol.Protocol): def handle_inv(self, invs): for inv in invs: if inv['type'] == 'tx': - self.factory.new_tx.happened(inv['hash']) + self.send_getdata(requests=[inv]) elif inv['type'] == 'block': self.factory.new_block.happened(inv['hash']) else: @@ -110,7 +109,7 @@ class Protocol(p2protocol.Protocol): ('tx', bitcoin_data.tx_type), ]) def handle_tx(self, tx): - self.get_tx.got_response(bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)), tx) + self.factory.new_tx.happened(tx) message_block = pack.ComposedType([ ('block', bitcoin_data.block_type), diff --git a/p2pool/data.py b/p2pool/data.py index bba0323..6315b11 100644 --- a/p2pool/data.py +++ b/p2pool/data.py @@ -33,10 +33,6 @@ def check_hash_link(hash_link, data, const_ending=''): # shares -# type: -# 2: share1a -# 3: share1b - share_type = pack.ComposedType([ ('type', pack.VarIntType()), ('contents', pack.VarStrType()), @@ -51,6 +47,8 @@ def load_share(share, net, peer): elif share['type'] == 5: share1b = Share.share1b_type.unpack(share['contents']) return Share(net, peer, merkle_link=bitcoin_data.calculate_merkle_link([0] + [bitcoin_data.hash256(bitcoin_data.tx_type.pack(x)) for x in share1b['other_txs']], 0), **share1b) + elif share['type'] == NewShare.VERSION: + return NewShare(net, peer, NewShare.share_type.unpack(share['contents'])) else: raise ValueError('unknown share type: %r' % (share['type'],)) @@ -113,7 +111,7 @@ class Share(object): gentx_before_refhash = pack.VarStrType().pack(DONATION_SCRIPT) + pack.IntType(64).pack(0) + pack.VarStrType().pack('\x20' + pack.IntType(256).pack(0))[:2] @classmethod - def generate_transaction(cls, tracker, share_data, block_target, desired_timestamp, desired_target, ref_merkle_link, desired_other_transaction_hashes, net): + def generate_transaction(cls, tracker, share_data, block_target, desired_timestamp, desired_target, ref_merkle_link, desired_other_transaction_hashes, net, known_txs=None): previous_share = tracker.items[share_data['previous_share_hash']] if share_data['previous_share_hash'] is not None else None height, last = tracker.get_height_and_last(share_data['previous_share_hash']) @@ -189,7 +187,7 @@ class Share(object): share_info=share_info, ))), ref_merkle_link)) - __slots__ = 'net peer common min_header share_info hash_link merkle_link other_txs hash share_data max_target target timestamp previous_hash new_script desired_version gentx_hash header pow_hash header_hash time_seen'.split(' ') + __slots__ = 'net peer common min_header share_info hash_link merkle_link other_txs hash share_data max_target target timestamp previous_hash new_script desired_version gentx_hash header pow_hash header_hash new_transaction_hashes time_seen'.split(' ') def __init__(self, net, peer, common, merkle_link, other_txs): self.net = net @@ -239,6 +237,8 @@ class Share(object): if other_txs is None and self.pow_hash <= self.header['bits'].target: raise ValueError('other_txs not provided when a block solution') + self.new_transaction_hashes = [] + # XXX eww self.time_seen = time.time() @@ -258,6 +258,11 @@ class Share(object): return self.as_share1b() def check(self, tracker): + if self.share_data['previous_share_hash'] is not None: + previous_share = tracker.items[self.share_data['previous_share_hash']] + if isinstance(previous_share, NewShare): + from p2pool import p2p + raise p2p.PeerMisbehavingError('''Share can't follow NewShare''') share_info, gentx, other_transaction_hashes, get_share = self.generate_transaction(tracker, self.share_info['share_data'], self.header['bits'].target, self.share_info['timestamp'], self.share_info['bits'].target, self.common['ref_merkle_link'], [], self.net) # ok because desired_other_transaction_hashes is only used in get_share if share_info != self.share_info: raise ValueError('share_info invalid') @@ -265,11 +270,298 @@ class Share(object): raise ValueError('''gentx doesn't match hash_link''') return gentx # only used by as_block - def as_block(self, tracker): + def get_other_tx_hashes(self, tracker): + return [] + + def get_other_txs(self, tracker, known_txs): + return [] + + def get_other_txs_size(self, tracker, known_txs): + return 0 + + def get_new_txs_size(self, known_txs): + return 0 + + def as_block(self, tracker, known_txs): if self.other_txs is None: raise ValueError('share does not contain all txs') return dict(header=self.header, txs=[self.check(tracker)] + self.other_txs) +class NewShare(object): + VERSION = 8 + + other_txs = None + + small_block_header_type = pack.ComposedType([ + ('version', pack.VarIntType()), + ('previous_block', pack.PossiblyNoneType(0, pack.IntType(256))), + ('timestamp', pack.IntType(32)), + ('bits', bitcoin_data.FloatingIntegerType()), + ('nonce', pack.IntType(32)), + ]) + + share_info_type = pack.ComposedType([ + ('share_data', pack.ComposedType([ + ('previous_share_hash', pack.PossiblyNoneType(0, pack.IntType(256))), + ('coinbase', pack.VarStrType()), + ('nonce', pack.IntType(32)), + ('pubkey_hash', pack.IntType(160)), + ('subsidy', pack.IntType(64)), + ('donation', pack.IntType(16)), + ('stale_info', pack.EnumType(pack.IntType(8), dict((k, {0: None, 253: 'orphan', 254: 'doa'}.get(k, 'unk%i' % (k,))) for k in xrange(256)))), + ('desired_version', pack.VarIntType()), + ])), + ('new_transaction_hashes', pack.ListType(pack.IntType(256))), + ('transaction_hash_refs', pack.ListType(pack.ComposedType([ # compressed by referencing previous shares' hashes + ('share_count', pack.VarIntType()), + ('tx_count', pack.VarIntType()), + ]))), + ('far_share_hash', pack.PossiblyNoneType(0, pack.IntType(256))), + ('max_bits', bitcoin_data.FloatingIntegerType()), + ('bits', bitcoin_data.FloatingIntegerType()), + ('timestamp', pack.IntType(32)), + ]) + + share_type = pack.ComposedType([ + ('min_header', small_block_header_type), + ('share_info', share_info_type), + ('ref_merkle_link', pack.ComposedType([ + ('branch', pack.ListType(pack.IntType(256))), + ('index', pack.IntType(0)), + ])), + ('hash_link', hash_link_type), + ('merkle_link', pack.ComposedType([ + ('branch', pack.ListType(pack.IntType(256))), + ('index', pack.IntType(0)), # it will always be 0 + ])), + ]) + + ref_type = pack.ComposedType([ + ('identifier', pack.FixedStrType(64//8)), + ('share_info', share_info_type), + ]) + + gentx_before_refhash = pack.VarStrType().pack(DONATION_SCRIPT) + pack.IntType(64).pack(0) + pack.VarStrType().pack('\x20' + pack.IntType(256).pack(0))[:2] + + @classmethod + def generate_transaction(cls, tracker, share_data, block_target, desired_timestamp, desired_target, ref_merkle_link, desired_other_transaction_hashes, net, known_txs=None): + previous_share = tracker.items[share_data['previous_share_hash']] if share_data['previous_share_hash'] is not None else None + + height, last = tracker.get_height_and_last(share_data['previous_share_hash']) + assert height >= net.REAL_CHAIN_LENGTH or last is None + if height < net.TARGET_LOOKBEHIND: + pre_target3 = net.MAX_TARGET + else: + attempts_per_second = get_pool_attempts_per_second(tracker, share_data['previous_share_hash'], net.TARGET_LOOKBEHIND, min_work=True, integer=True) + pre_target = 2**256//(net.SHARE_PERIOD*attempts_per_second) - 1 if attempts_per_second else 2**256-1 + pre_target2 = math.clip(pre_target, (previous_share.max_target*9//10, previous_share.max_target*11//10)) + pre_target3 = math.clip(pre_target2, (0, net.MAX_TARGET)) + max_bits = bitcoin_data.FloatingInteger.from_target_upper_bound(pre_target3) + bits = bitcoin_data.FloatingInteger.from_target_upper_bound(math.clip(desired_target, (pre_target3//10, pre_target3))) + + weights, total_weight, donation_weight = tracker.get_cumulative_weights(share_data['previous_share_hash'], + min(height, net.REAL_CHAIN_LENGTH), + 65535*net.SPREAD*bitcoin_data.target_to_average_attempts(block_target), + ) + assert total_weight == sum(weights.itervalues()) + donation_weight, (total_weight, sum(weights.itervalues()) + donation_weight) + + amounts = dict((script, share_data['subsidy']*(199*weight)//(200*total_weight)) for script, weight in weights.iteritems()) # 99.5% goes according to weights prior to this share + this_script = bitcoin_data.pubkey_hash_to_script2(share_data['pubkey_hash']) + amounts[this_script] = amounts.get(this_script, 0) + share_data['subsidy']//200 # 0.5% goes to block finder + amounts[DONATION_SCRIPT] = amounts.get(DONATION_SCRIPT, 0) + share_data['subsidy'] - sum(amounts.itervalues()) # all that's left over is the donation weight and some extra satoshis due to rounding + + if sum(amounts.itervalues()) != share_data['subsidy'] or any(x < 0 for x in amounts.itervalues()): + raise ValueError() + + dests = sorted(amounts.iterkeys(), key=lambda script: (script == DONATION_SCRIPT, amounts[script], script))[-4000:] # block length limit, unlikely to ever be hit + + new_transaction_hashes = [] + new_transaction_size = 0 + transaction_hash_refs = [] + other_transaction_hashes = [] + + for tx_hash in desired_other_transaction_hashes: + for i, share in enumerate(tracker.get_chain(share_data['previous_share_hash'], min(height, 100))): + if tx_hash in share.new_transaction_hashes: + this = dict(share_count=i+1, tx_count=share.new_transaction_hashes.index(tx_hash)) + break + else: + if known_txs is not None: + this_size = len(bitcoin_data.tx_type.pack(known_txs[tx_hash])) + if new_transaction_size + this_size > 50000: # only allow 50 kB of new txns/share + break + new_transaction_size += this_size + new_transaction_hashes.append(tx_hash) + this = dict(share_count=0, tx_count=len(new_transaction_hashes)-1) + transaction_hash_refs.append(this) + other_transaction_hashes.append(tx_hash) + + share_info = dict( + share_data=share_data, + far_share_hash=None if last is None and height < 99 else tracker.get_nth_parent_hash(share_data['previous_share_hash'], 99), + max_bits=max_bits, + bits=bits, + timestamp=math.clip(desired_timestamp, ( + (previous_share.timestamp + net.SHARE_PERIOD) - (net.SHARE_PERIOD - 1), # = previous_share.timestamp + 1 + (previous_share.timestamp + net.SHARE_PERIOD) + (net.SHARE_PERIOD - 1), + )) if previous_share is not None else desired_timestamp, + new_transaction_hashes=new_transaction_hashes, + transaction_hash_refs=transaction_hash_refs, + ) + + gentx = dict( + version=1, + tx_ins=[dict( + previous_output=None, + sequence=None, + script=share_data['coinbase'], + )], + tx_outs=[dict(value=amounts[script], script=script) for script in dests if amounts[script] or script == DONATION_SCRIPT] + [dict( + value=0, + script='\x20' + cls.get_ref_hash(net, share_info, ref_merkle_link), + )], + lock_time=0, + ) + + def get_share(header, transactions): + min_header=dict(header);del min_header['merkle_root'] + return cls(net, None, dict( + min_header=min_header, + share_info=share_info, + ref_merkle_link=dict(branch=[], index=0), + hash_link=prefix_to_hash_link(bitcoin_data.tx_type.pack(gentx)[:-32-4], cls.gentx_before_refhash), + merkle_link=bitcoin_data.calculate_merkle_link([None] + other_transaction_hashes, 0), + )) + + return share_info, gentx, other_transaction_hashes, get_share + + @classmethod + def get_ref_hash(cls, net, share_info, ref_merkle_link): + return pack.IntType(256).pack(bitcoin_data.check_merkle_link(bitcoin_data.hash256(cls.ref_type.pack(dict( + identifier=net.IDENTIFIER, + share_info=share_info, + ))), ref_merkle_link)) + + __slots__ = 'net peer contents min_header share_info hash_link merkle_link hash share_data max_target target timestamp previous_hash new_script desired_version gentx_hash header pow_hash header_hash new_transaction_hashes time_seen'.split(' ') + + def __init__(self, net, peer, contents): + self.net = net + self.peer = peer + self.contents = contents + + self.min_header = contents['min_header'] + self.share_info = contents['share_info'] + self.hash_link = contents['hash_link'] + self.merkle_link = contents['merkle_link'] + + if not (2 <= len(self.share_info['share_data']['coinbase']) <= 100): + raise ValueError('''bad coinbase size! %i bytes''' % (len(self.share_info['share_data']['coinbase']),)) + + if len(self.merkle_link['branch']) > 16: + raise ValueError('merkle branch too long!') + + assert not self.hash_link['extra_data'], repr(self.hash_link['extra_data']) + + self.share_data = self.share_info['share_data'] + self.max_target = self.share_info['max_bits'].target + self.target = self.share_info['bits'].target + self.timestamp = self.share_info['timestamp'] + self.previous_hash = self.share_data['previous_share_hash'] + self.new_script = bitcoin_data.pubkey_hash_to_script2(self.share_data['pubkey_hash']) + self.desired_version = self.share_data['desired_version'] + + for x in self.share_info['transaction_hash_refs']: + assert x['share_count'] < 110 + for i, x in enumerate(self.share_info['new_transaction_hashes']): + assert dict(share_count=0, tx_count=i) in self.share_info['transaction_hash_refs'] + + self.gentx_hash = check_hash_link( + self.hash_link, + self.get_ref_hash(net, self.share_info, contents['ref_merkle_link']) + pack.IntType(32).pack(0), + self.gentx_before_refhash, + ) + merkle_root = bitcoin_data.check_merkle_link(self.gentx_hash, self.merkle_link) + self.header = dict(self.min_header, merkle_root=merkle_root) + self.pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(self.header)) + self.hash = self.header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.header)) + + if self.target > net.MAX_TARGET: + from p2pool import p2p + raise p2p.PeerMisbehavingError('share target invalid') + + if self.pow_hash > self.target: + from p2pool import p2p + raise p2p.PeerMisbehavingError('share PoW invalid') + + self.new_transaction_hashes = self.share_info['new_transaction_hashes'] + + # XXX eww + self.time_seen = time.time() + + def __repr__(self): + return 'Share' + repr((self.net, self.peer, self.contents)) + + def as_share(self): + return dict(type=self.VERSION, contents=self.share_type.pack(self.contents)) + + def check(self, tracker): + if self.share_data['previous_share_hash'] is not None: + previous_share = tracker.items[self.share_data['previous_share_hash']] + if isinstance(previous_share, Share): + if tracker.get_height(previous_share.hash) < self.net.CHAIN_LENGTH: + from p2pool import p2p + raise p2p.PeerMisbehavingError('NewShare without enough history') + else: + # Share -> NewShare only valid if 85% of hashes in [self.net.CHAIN_LENGTH*9//10, self.net.CHAIN_LENGTH] for new version + counts = get_desired_version_counts(tracker, + tracker.get_nth_parent_hash(previous_share.hash, self.net.CHAIN_LENGTH*9//10), self.net.CHAIN_LENGTH//10) + if counts.get(self.VERSION, 0) < sum(counts.itervalues())*85//100: + from p2pool import p2p + raise p2p.PeerMisbehavingError('NewShare without enough hash power upgraded') + + other_tx_hashes = [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']] + + share_info, gentx, other_tx_hashes2, get_share = self.generate_transaction(tracker, self.share_info['share_data'], self.header['bits'].target, self.share_info['timestamp'], self.share_info['bits'].target, self.contents['ref_merkle_link'], other_tx_hashes, self.net) + assert other_tx_hashes2 == other_tx_hashes + if share_info != self.share_info: + raise ValueError('share_info invalid') + if bitcoin_data.hash256(bitcoin_data.tx_type.pack(gentx)) != self.gentx_hash: + raise ValueError('''gentx doesn't match hash_link''') + + if bitcoin_data.calculate_merkle_link([None] + other_tx_hashes, 0) != self.merkle_link: + raise ValueError('merkle_link and other_tx_hashes do not match') + + return gentx # only used by as_block + + def get_other_tx_hashes(self, tracker): + return [tracker.items[tracker.get_nth_parent_hash(self.hash, x['share_count'])].share_info['new_transaction_hashes'][x['tx_count']] for x in self.share_info['transaction_hash_refs']] + + def get_other_txs(self, tracker, known_txs): + other_tx_hashes = self.get_other_tx_hashes(tracker) + + if not all(tx_hash in known_txs for tx_hash in other_tx_hashes): + return None # not all txs present + + return [known_txs[tx_hash] for tx_hash in other_tx_hashes] + + def get_other_txs_size(self, tracker, known_txs): + other_txs = self.get_other_txs(tracker, known_txs) + if other_txs is None: + return None # not all txs present + size = sum(len(bitcoin_data.tx_type.pack(tx)) for tx in other_txs) + + def get_new_txs_size(self, known_txs): + if not all(tx_hash in known_txs for tx_hash in self.share_info['new_transaction_hashes']): + return None # not all txs present + return sum(len(bitcoin_data.tx_type.pack(known_txs[tx_hash])) for tx_hash in self.share_info['new_transaction_hashes']) + + def as_block(self, tracker, known_txs): + other_txs = self.get_other_txs(tracker, known_txs) + if other_txs is None: + return None # not all txs present + return dict(header=self.header, txs=[self.check(tracker)] + other_txs) + + class WeightsSkipList(forest.TrackerSkipList): # share_count, weights, total_weight @@ -338,7 +630,7 @@ class OkayTracker(forest.Tracker): self.verified.add(share) return True - def think(self, block_rel_height_func, previous_block, bits): + def think(self, block_rel_height_func, previous_block, bits, known_txs): desired = set() # O(len(self.heads)) @@ -404,6 +696,9 @@ class OkayTracker(forest.Tracker): #self.items[h].peer is None, self.items[h].pow_hash <= self.items[h].header['bits'].target, # is block solution (self.items[h].header['previous_block'], self.items[h].header['bits']) == (previous_block, bits) or self.items[h].peer is None, + self.items[h].get_other_txs(self, known_txs) is not None, + self.items[h].get_other_txs_size(self, known_txs) < 1000000, + self.items[h].get_new_txs_size(known_txs) < 50000, -self.items[h].time_seen, ), h) for h in self.verified.tails.get(best_tail, [])) if p2pool.DEBUG: @@ -466,6 +761,15 @@ class OkayTracker(forest.Tracker): if p2pool.DEBUG: print 'Stale detected! %x < %x' % (best_share.header['previous_block'], previous_block) best = best_share.previous_hash + elif best_share.get_other_txs(self, known_txs) is None: + print 'Share with incomplete transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash)) + best = best_share.previous_hash + elif best_share.get_other_txs_size(self, known_txs) > 1000000: + print >>sys.stderr, 'Share with too many transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash)) + best = best_share.previous_hash + elif best_share.get_new_txs_size(known_txs) > 50000: + print >>sys.stderr, 'Share with too many new transactions detected! Jumping from %s to %s!' % (format_hash(best), format_hash(best_share.previous_hash)) + best = best_share.previous_hash timestamp_cutoff = min(int(time.time()), best_share.timestamp) - 3600 target_cutoff = 2**256//(self.net.SHARE_PERIOD*best_tail_score[1] + 1) * 2 if best_tail_score[1] is not None else 2**256-1 @@ -549,9 +853,9 @@ def get_warnings(tracker, best_share, net, bitcoind_warning, bitcoind_work_value res = [] desired_version_counts = get_desired_version_counts(tracker, best_share, - min(60*60//net.SHARE_PERIOD, tracker.get_height(best_share))) + min(net.CHAIN_LENGTH, 60*60//net.SHARE_PERIOD, tracker.get_height(best_share))) majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k]) - if majority_desired_version > 5 and desired_version_counts[majority_desired_version] > sum(desired_version_counts.itervalues())/2: + if majority_desired_version > NewShare.VERSION and desired_version_counts[majority_desired_version] > sum(desired_version_counts.itervalues())/2: res.append('A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)\n' 'An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.' % ( majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))) diff --git a/p2pool/main.py b/p2pool/main.py index 784c9d5..8ff9e0f 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -50,6 +50,7 @@ def getwork(bitcoind, use_getblocktemplate=False): version=work['version'], previous_block=int(work['previousblockhash'], 16), transactions=map(bitcoin_data.tx_type.unpack, packed_transactions), + transaction_hashes=map(bitcoin_data.hash256, packed_transactions), subsidy=work['coinbasevalue'], time=work['time'] if 'time' in work else work['curtime'], bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']), @@ -226,12 +227,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # BEST SHARE + known_txs_var = variable.Variable({}) # hash -> tx + mining_txs_var = variable.Variable({}) # hash -> tx get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net) best_share_var = variable.Variable(None) desired_var = variable.Variable(None) def set_best_share(): - best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits']) + best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value) best_share_var.set(best) desired_var.set(desired) @@ -243,6 +246,30 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # setup p2p logic and join p2pool network + # update mining_txs according to getwork results + @bitcoind_work.changed.run_and_watch + def _(_=None): + new_mining_txs = {} + new_known_txs = dict(known_txs_var.value) + for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']): + new_mining_txs[tx_hash] = tx + new_known_txs[tx_hash] = tx + mining_txs_var.set(new_mining_txs) + known_txs_var.set(new_known_txs) + # add p2p transactions from bitcoind to known_txs + @factory.new_tx.watch + def _(tx): + new_known_txs = dict(known_txs_var.value) + new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx + known_txs_var.set(new_known_txs) + # forward transactions seen to bitcoind + @known_txs_var.transitioned.watch + @defer.inlineCallbacks + def _(before, after): + yield deferral.sleep(random.expovariate(1/1)) + for tx_hash in set(after) - set(before): + factory.conn.value.send_tx(tx=after[tx_hash]) + class Node(p2p.Node): def handle_shares(self, shares, peer): if len(shares) > 5: @@ -326,7 +353,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): @tracker.verified.added.watch def _(share): if share.pow_hash <= share.header['bits'].target: - submit_block(share.as_block(tracker), ignore_failure=True) + block = share.as_block(tracker, known_txs_var.value) + if block is None: + print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) + return + submit_block(block, ignore_failure=True) print print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) print @@ -377,9 +408,23 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): connect_addrs=connect_addrs, max_incoming_conns=args.p2pool_conns, traffic_happened=traffic_happened, + known_txs_var=known_txs_var, + mining_txs_var=mining_txs_var, ) p2p_node.start() + def forget_old_txs(): + new_known_txs = {} + for peer in p2p_node.peers.itervalues(): + new_known_txs.update(peer.remembered_txs) + new_known_txs.update(mining_txs_var.value) + for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))): + for tx_hash in share.new_transaction_hashes: + if tx_hash in known_txs_var.value: + new_known_txs[tx_hash] = known_txs_var.value[tx_hash] + known_txs_var.set(new_known_txs) + task.LoopingCall(forget_old_txs).start(10) + def save_addrs(): with open(os.path.join(datadir_path, 'addrs'), 'wb') as f: f.write(json.dumps(p2p_node.addr_store.items())) @@ -400,7 +445,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): shares.append(share) for peer in list(p2p_node.peers.itervalues()): - yield peer.sendShares([share for share in shares if share.peer is not peer]) + yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash]) # send share when the chain changes to their chain best_share_var.changed.watch(broadcast_share) diff --git a/p2pool/p2p.py b/p2pool/p2p.py index fd73a89..083ad13 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -15,7 +15,17 @@ from p2pool.util import deferral, p2protocol, pack, variable class PeerMisbehavingError(Exception): pass + +def fragment(f, **kwargs): + try: + return f(**kwargs) + except p2protocol.TooLong: + att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) + return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) + class Protocol(p2protocol.Protocol): + max_remembered_txs_size = 2500000 + def __init__(self, node, incoming): p2protocol.Protocol.__init__(self, node.net.PREFIX, 1000000, node.traffic_happened) self.node = node @@ -29,10 +39,12 @@ class Protocol(p2protocol.Protocol): self.factory.proto_made_connection(self) + self.connection_lost_event = variable.Event() + self.addr = self.transport.getPeer().host, self.transport.getPeer().port self.send_version( - version=4, + version=8, services=0, addr_to=dict( services=0, @@ -58,6 +70,13 @@ class Protocol(p2protocol.Protocol): timeout=15, on_timeout=self.transport.loseConnection, ) + + self.remote_tx_hashes = set() # view of peer's known_txs # not actually initially empty, but sending txs instead of tx hashes won't hurt + self.remote_remembered_txs_size = 0 + + self.remembered_txs = {} # view of peer's mining_txs + self.remembered_txs_size = 0 + self.known_txs_cache = {} def _connect_timeout(self): self.timeout_delayed = None @@ -137,6 +156,43 @@ class Protocol(p2protocol.Protocol): if best_share_hash is not None: self.node.handle_share_hashes([best_share_hash], self) + + if self.other_version < 8: + return + + def update_remote_view_of_my_known_txs(before, after): + added = set(after) - set(before) + removed = set(before) - set(after) + if added: + self.send_have_tx(tx_hashes=list(added)) + if removed: + self.send_losing_tx(tx_hashes=list(removed)) + + # cache forgotten txs here for a little while so latency of "losing_tx" packets doesn't cause problems + key = max(self.known_txs_cache) + 1 if self.known_txs_cache else 0 + self.known_txs_cache[key] = dict((h, before[h]) for h in removed) + reactor.callLater(20, self.known_txs_cache.pop, key) + watch_id = self.node.known_txs_var.transitioned.watch(update_remote_view_of_my_known_txs) + self.connection_lost_event.watch(lambda: self.node.known_txs_var.transitioned.unwatch(watch_id)) + + self.send_have_tx(tx_hashes=self.node.known_txs_var.value.keys()) + + def update_remote_view_of_my_mining_txs(before, after): + added = set(after) - set(before) + removed = set(before) - set(after) + if added: + self.remote_remembered_txs_size += sum(len(bitcoin_data.tx_type.pack(after[x])) for x in added) + assert self.remote_remembered_txs_size <= self.max_remembered_txs_size + fragment(self.send_remember_tx, tx_hashes=[x for x in added if x in self.remote_tx_hashes], txs=[after[x] for x in added if x not in self.remote_tx_hashes]) + if removed: + self.send_forget_tx(tx_hashes=removed) + self.remote_remembered_txs_size -= sum(len(bitcoin_data.tx_type.pack(before[x])) for x in removed) + watch_id2 = self.node.mining_txs_var.transitioned.watch(update_remote_view_of_my_mining_txs) + self.connection_lost_event.watch(lambda: self.node.mining_txs_var.transitioned.unwatch(watch_id2)) + + self.remote_remembered_txs_size += sum(len(bitcoin_data.tx_type.pack(x)) for x in self.node.mining_txs_var.value.values()) + assert self.remote_remembered_txs_size <= self.max_remembered_txs_size + fragment(self.send_remember_tx, tx_hashes=[], txs=self.node.mining_txs_var.value.values()) message_ping = pack.ComposedType([]) def handle_ping(self): @@ -201,17 +257,33 @@ class Protocol(p2protocol.Protocol): def handle_shares(self, shares): self.node.handle_shares([p2pool_data.load_share(share, self.node.net, self) for share in shares if share['type'] not in [6, 7]], self) - def sendShares(self, shares): - def att(f, **kwargs): - try: - return f(**kwargs) - except p2protocol.TooLong: - att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems())) - return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems())) - if shares: - return att(self.send_shares, shares=[share.as_share() for share in shares]) - else: + def sendShares(self, shares, tracker, known_txs, include_txs_with=[]): + if not shares: return defer.succeed(None) + + if self.other_version >= 8: + tx_hashes = set() + for share in shares: + if share.hash in include_txs_with: + tx_hashes.update(share.get_other_tx_hashes(tracker)) + + hashes_to_send = [x for x in tx_hashes if x not in self.node.mining_txs_var.value and x in known_txs] + + new_remote_remembered_txs_size = self.remote_remembered_txs_size + sum(len(bitcoin_data.tx_type.pack(known_txs[x])) for x in hashes_to_send) + if new_remote_remembered_txs_size > self.max_remembered_txs_size: + raise ValueError('shares have too many txs') + self.remote_remembered_txs_size = new_remote_remembered_txs_size + + fragment(self.send_remember_tx, tx_hashes=[x for x in hashes_to_send if x in self.remote_tx_hashes], txs=[known_txs[x] for x in hashes_to_send if x not in self.remote_tx_hashes]) + + res = fragment(self.send_shares, shares=[share.as_share() for share in shares]) + + if self.other_version >= 8: + res = self.send_forget_tx(tx_hashes=hashes_to_send) + + self.remote_remembered_txs_size -= sum(len(bitcoin_data.tx_type.pack(known_txs[x])) for x in hashes_to_send) + + return res message_sharereq = pack.ComposedType([ @@ -239,13 +311,85 @@ class Protocol(p2protocol.Protocol): res = failure.Failure("sharereply result: " + result) self.get_shares.got_response(id, res) + message_bestblock = pack.ComposedType([ ('header', bitcoin_data.block_header_type), ]) def handle_bestblock(self, header): self.node.handle_bestblock(header, self) + + message_have_tx = pack.ComposedType([ + ('tx_hashes', pack.ListType(pack.IntType(256))), + ]) + def handle_have_tx(self, tx_hashes): + assert self.remote_tx_hashes.isdisjoint(tx_hashes) + self.remote_tx_hashes.update(tx_hashes) + message_losing_tx = pack.ComposedType([ + ('tx_hashes', pack.ListType(pack.IntType(256))), + ]) + def handle_losing_tx(self, tx_hashes): + assert self.remote_tx_hashes.issuperset(tx_hashes) + self.remote_tx_hashes.difference_update(tx_hashes) + + + message_remember_tx = pack.ComposedType([ + ('tx_hashes', pack.ListType(pack.IntType(256))), + ('txs', pack.ListType(bitcoin_data.tx_type)), + ]) + def handle_remember_tx(self, tx_hashes, txs): + for tx_hash in tx_hashes: + if tx_hash in self.remembered_txs: + print >>sys.stderr, 'Peer referenced transaction twice, disconnecting' + self.transport.loseConnection() + return + + if tx_hash in self.node.known_txs_var.value: + tx = self.node.known_txs_var.value[tx_hash] + else: + for cache in self.known_txs_cache.itervalues(): + if tx_hash in cache: + tx = cache[tx_hash] + print 'Transaction rescued from peer latency cache!' + break + else: + print >>sys.stderr, 'Peer referenced unknown transaction, disconnecting' + self.transport.loseConnection() + return + + self.remembered_txs[tx_hash] = tx + self.remembered_txs_size += len(bitcoin_data.tx_type.pack(tx)) + new_known_txs = dict(self.node.known_txs_var.value) + warned = False + for tx in txs: + tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) + if tx_hash in self.remembered_txs: + print >>sys.stderr, 'Peer referenced transaction twice, disconnecting' + self.transport.loseConnection() + return + + if tx_hash in self.node.known_txs_var.value and not warned: + print 'Peer sent entire transaction that was already received' + warned = True + + self.remembered_txs[tx_hash] = tx + self.remembered_txs_size += len(bitcoin_data.tx_type.pack(tx)) + new_known_txs[tx_hash] = tx + self.node.known_txs_var.set(new_known_txs) + if self.remembered_txs_size >= self.max_remembered_txs_size: + raise PeerMisbehavingError('too much transaction data stored') + message_forget_tx = pack.ComposedType([ + ('tx_hashes', pack.ListType(pack.IntType(256))), + ]) + def handle_forget_tx(self, tx_hashes): + for tx_hash in tx_hashes: + self.remembered_txs_size -= len(bitcoin_data.tx_type.pack(self.remembered_txs[tx_hash])) + assert self.remembered_txs_size >= 0 + del self.remembered_txs[tx_hash] + + def connectionLost(self, reason): + self.connection_lost_event.happened() if self.timeout_delayed is not None: self.timeout_delayed.cancel() if self.connected2: @@ -407,7 +551,7 @@ class SingleClientFactory(protocol.ReconnectingClientFactory): self.node.lost_conn(proto, reason) class Node(object): - def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event()): + def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net @@ -415,6 +559,8 @@ class Node(object): self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage self.traffic_happened = traffic_happened + self.known_txs_var = known_txs_var + self.mining_txs_var = mining_txs_var self.nonce = random.randrange(2**64) self.peers = {} diff --git a/p2pool/test/test_p2p.py b/p2pool/test/test_p2p.py index a771e8c..7707ce7 100644 --- a/p2pool/test/test_p2p.py +++ b/p2pool/test/test_p2p.py @@ -1,9 +1,11 @@ import random -from twisted.internet import defer +from twisted.internet import defer, endpoints, protocol, reactor from twisted.trial import unittest from p2pool import networks, p2p +from p2pool.bitcoin import data as bitcoin_data +from p2pool.util import deferral class Test(unittest.TestCase): @@ -29,3 +31,56 @@ class Test(unittest.TestCase): yield df finally: yield n.stop() + + @defer.inlineCallbacks + def test_tx_limit(self): + class MyNode(p2p.Node): + def __init__(self, df): + p2p.Node.__init__(self, lambda: None, 29333, networks.nets['litecoin_testnet'], {}, set([('127.0.0.1', 19338)]), 0, 0, 0, 0) + + self.df = df + self.sent_time = 0 + + @defer.inlineCallbacks + def got_conn(self, conn): + p2p.Node.got_conn(self, conn) + huge_tx = dict( + version=0, + tx_ins=[], + tx_outs=[dict( + value=0, + script='x'*900000, + )], + lock_time=0, + ) + new_mining_txs = dict(self.mining_txs_var.value) + new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(huge_tx))] = huge_tx + self.mining_txs_var.set(new_mining_txs) + + yield deferral.sleep(1) + + huge_tx = dict( + version=0, + tx_ins=[], + tx_outs=[dict( + value=0, + script='x'*900000, + )], + lock_time=1, + ) + new_mining_txs = dict(self.mining_txs_var.value) + new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(huge_tx))] = huge_tx + self.mining_txs_var.set(new_mining_txs) + self.sent_time = reactor.seconds() + + + def lost_conn(self, conn, reason): + self.df.callback(None) + + df = defer.Deferred() + n = MyNode(df) + n.start() + yield df + if not (n.sent_time <= reactor.seconds() <= n.sent_time + 1): + raise ValueError('node did not disconnect within 1 seconds of receiving too much tx data') + yield n.stop() diff --git a/p2pool/util/variable.py b/p2pool/util/variable.py index f11b94f..f9af6db 100644 --- a/p2pool/util/variable.py +++ b/p2pool/util/variable.py @@ -10,6 +10,9 @@ class Event(object): self._once = None self.times = 0 + def run_and_watch(self, func): + func() + return self.watch(func) def watch(self, func): id = self.id_generator.next() self.observers[id] = func diff --git a/p2pool/work.py b/p2pool/work.py index 470a3ee..2be5900 100644 --- a/p2pool/work.py +++ b/p2pool/work.py @@ -183,8 +183,21 @@ class WorkerBridge(worker_interface.WorkerBridge): tx_hashes = [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in self.current_work.value['transactions']] tx_map = dict(zip(tx_hashes, self.current_work.value['transactions'])) + share_type = p2pool_data.NewShare + if self.best_share_var.value is not None: + previous_share = self.tracker.items[self.best_share_var.value] + if isinstance(previous_share, p2pool_data.Share): + # Share -> NewShare only valid if 85% of hashes in [net.CHAIN_LENGTH*9//10, net.CHAIN_LENGTH] for new version + if self.tracker.get_height(previous_share.hash) < self.net.CHAIN_LENGTH: + share_type = p2pool_data.Share + else: + counts = p2pool_data.get_desired_version_counts(self.tracker, + self.tracker.get_nth_parent_hash(previous_share.hash, self.net.CHAIN_LENGTH*9//10), self.net.CHAIN_LENGTH//10) + if counts.get(p2pool_data.NewShare.VERSION, 0) < sum(counts.itervalues())*95//100: + share_type = p2pool_data.Share + if True: - share_info, gentx, other_transaction_hashes, get_share = p2pool_data.Share.generate_transaction( + share_info, gentx, other_transaction_hashes, get_share = share_type.generate_transaction( tracker=self.tracker, share_data=dict( previous_share_hash=self.best_share_var.value, @@ -201,7 +214,7 @@ class WorkerBridge(worker_interface.WorkerBridge): 'doa' if doas > doas_recorded_in_chain else None )(*self.get_stale_counts()), - desired_version=5, + desired_version=p2pool_data.NewShare.VERSION, ), block_target=self.current_work.value['bits'].target, desired_timestamp=int(time.time() + 0.5), @@ -209,6 +222,7 @@ class WorkerBridge(worker_interface.WorkerBridge): ref_merkle_link=dict(branch=[], index=0), desired_other_transaction_hashes=tx_hashes, net=self.net, + known_txs=tx_map, ) transactions = [gentx] + [tx_map[tx_hash] for tx_hash in other_transaction_hashes]