From 307acb00bf92bd3db82877a784d6bffe1cc1b25c Mon Sep 17 00:00:00 2001 From: Forrest Voight Date: Tue, 30 Aug 2011 11:41:52 -0400 Subject: [PATCH] re-added support for including transactions; depends on 'getmemorypool' patch for bitcoind --- p2pool/main.py | 89 +++++++++++++--------------------------------- p2pool/util/deferral.py | 11 ++++-- 2 files changed, 33 insertions(+), 67 deletions(-) diff --git a/p2pool/main.py b/p2pool/main.py index 3aab645..cf199b9 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -372,17 +372,12 @@ def main(args): raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers') if time.time() > current_work2.value['last_update'] + 60: raise jsonrpc.Error(-12345, u'lost contact with bitcoind') - pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()] - pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit - extra_txs = [] - size = 0 - for tx in pre_extra_txs: - this_size = len(bitcoin.data.tx_type.pack(tx.tx)) - if size + this_size > 500000: - break - extra_txs.append(tx) - size += this_size - # XXX check sigops! + + extra_txs_hashes, extra_txs_fees = memorypools.get(state['previous_block'], ([], 0)) + extra_txs = [get_transaction.call_now(tx_hash, None) for tx_hash in extra_txs_hashes] + if any(x is None for x in extra_txs): + print "haven't gotten all yet", sum(x is None for x in extra_txs), len(extra_txs) + extra_txs, extra_txs_fees = [], 0 # XXX assuming generate_tx is smallish here.. def get_stale_frac(): shares, stale_shares = get_share_counts() @@ -390,7 +385,7 @@ def main(args): return "" frac = stale_shares/shares return 2*struct.pack(' tx - get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100)) - - class Tx(object): - def __init__(self, tx, seen_at_block): - self.hash = bitcoin.data.tx_type.hash256(tx) - self.tx = tx - self.seen_at_block = seen_at_block - self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']]) - #print - #print '%x %r' % (seen_at_block, tx) - #for mention in self.mentions: - # print '%x' % mention - #print - self.parents_all_in_blocks = False - self.value_in = 0 - #print self.tx - self.value_out = sum(txout['value'] for txout in self.tx['tx_outs']) - self._find_parents_in_blocks() - - @defer.inlineCallbacks - def _find_parents_in_blocks(self): - for tx_in in self.tx['tx_ins']: - try: - raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash']) - except Exception: - return - self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value'] - #print raw_transaction - if not raw_transaction['parent_blocks']: - return - self.parents_all_in_blocks = True - - def is_good(self): - if not self.parents_all_in_blocks: - return False - x = self.is_good2() - #print 'is_good:', x - return x + get_transaction = deferral.DeferredCacher(defer.inlineCallbacks(lambda tx_hash: defer.returnValue((yield (yield factory.getProtocol()).get_tx(tx_hash)))), expiring_dict.ExpiringDict(300)) + memorypools = expiring_dict.ExpiringDict(300) @defer.inlineCallbacks - def new_tx(tx_hash): - try: - assert isinstance(tx_hash, (int, long)) - #print 'REQUESTING', tx_hash - tx = yield (yield factory.getProtocol()).get_tx(tx_hash) - #print 'GOT', tx - tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block']) - except: - log.err(None, 'Error handling tx:') - # disable for now, for testing impact on stales - #factory.new_tx.watch(new_tx) + def memorypool_thread(): + while True: + try: + res = yield deferral.retry('Error while getting memory pool:', 1)(bitcoind.rpc_getmemorypool)() + tx_hashes = [int(tx_hash_hex, 16) for tx_hash_hex in res['transactions']] + memorypools[int(res['previous_block'], 16)] = tx_hashes, res['fees'] + for tx_hash in tx_hashes: + get_transaction.call_now(tx_hash, None) + except: + log.err(None, 'Error while getting memory pool:') + yield deferral.sleep(random.expovariate(1/10)) + memorypool_thread() + + # do new getwork when a block is heard on the p2p interface def new_block(block_hash): work_updated.happened() diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py index d5901b5..fd53f4e 100644 --- a/p2pool/util/deferral.py +++ b/p2pool/util/deferral.py @@ -140,9 +140,12 @@ class DeferredCacher(object): self.backing[key] = value defer.returnValue(value) - def call_now(self, key): + _nothing = object() + def call_now(self, key, default=_nothing): if key in self.waiting: - raise NotNowError() + if default is not self._nothing: + return default + raise NotNowError(key) if key in self.backing: return self.backing[key] @@ -158,4 +161,6 @@ class DeferredCacher(object): fail.printTraceback() print self.func(key).addCallback(cb).addErrback(eb) - raise NotNowError() + if default is not self._nothing: + return default + raise NotNowError(key) -- 1.7.1