re-added support for including transactions; depends on 'getmemorypool' patch for...
authorForrest Voight <forrest@forre.st>
Tue, 30 Aug 2011 15:41:52 +0000 (11:41 -0400)
committerForrest Voight <forrest@forre.st>
Tue, 30 Aug 2011 15:41:52 +0000 (11:41 -0400)
p2pool/main.py
p2pool/util/deferral.py

index 3aab645..cf199b9 100644 (file)
@@ -372,17 +372,12 @@ def main(args):
                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
             if time.time() > current_work2.value['last_update'] + 60:
                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
-            pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
-            pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
-            extra_txs = []
-            size = 0
-            for tx in pre_extra_txs:
-                this_size = len(bitcoin.data.tx_type.pack(tx.tx))
-                if size + this_size > 500000:
-                    break
-                extra_txs.append(tx)
-                size += this_size
-            # XXX check sigops!
+            
+            extra_txs_hashes, extra_txs_fees = memorypools.get(state['previous_block'], ([], 0))
+            extra_txs = [get_transaction.call_now(tx_hash, None) for tx_hash in extra_txs_hashes]
+            if any(x is None for x in extra_txs):
+                print "haven't gotten all yet", sum(x is None for x in extra_txs), len(extra_txs)
+                extra_txs, extra_txs_fees = [], 0
             # XXX assuming generate_tx is smallish here..
             def get_stale_frac():
                 shares, stale_shares = get_share_counts()
@@ -390,7 +385,7 @@ def main(args):
                     return ""
                 frac = stale_shares/shares
                 return 2*struct.pack('<H', int(65535*frac + .5))
-            subsidy = args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs)
+            subsidy = args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + extra_txs_fees
             generate_tx = p2pool.generate_transaction(
                 tracker=tracker,
                 previous_share_hash=state['best_share_hash'],
@@ -400,10 +395,10 @@ def main(args):
                 block_target=state['target'],
                 net=args.net,
             )
-            print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL)
+            print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Includes %i transactions with %.6f %s fees' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, len(extra_txs), extra_txs_fees*1e-8, args.net.BITCOIN_SYMBOL)
             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
-            transactions = [generate_tx] + [tx.tx for tx in extra_txs]
+            transactions = [generate_tx] + list(extra_txs)
             merkle_root = bitcoin.data.merkle_hash(transactions)
             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
             
@@ -493,58 +488,24 @@ def main(args):
         
         # done!
         
-        tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
-        get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
-        
-        class Tx(object):
-            def __init__(self, tx, seen_at_block):
-                self.hash = bitcoin.data.tx_type.hash256(tx)
-                self.tx = tx
-                self.seen_at_block = seen_at_block
-                self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
-                #print
-                #print '%x %r' % (seen_at_block, tx)
-                #for mention in self.mentions:
-                #    print '%x' % mention
-                #print
-                self.parents_all_in_blocks = False
-                self.value_in = 0
-                #print self.tx
-                self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
-                self._find_parents_in_blocks()
-            
-            @defer.inlineCallbacks
-            def _find_parents_in_blocks(self):
-                for tx_in in self.tx['tx_ins']:
-                    try:
-                        raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
-                    except Exception:
-                        return
-                    self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
-                    #print raw_transaction
-                    if not raw_transaction['parent_blocks']:
-                        return
-                self.parents_all_in_blocks = True
-            
-            def is_good(self):
-                if not self.parents_all_in_blocks:
-                    return False
-                x = self.is_good2()
-                #print 'is_good:', x
-                return x
+        get_transaction = deferral.DeferredCacher(defer.inlineCallbacks(lambda tx_hash: defer.returnValue((yield (yield factory.getProtocol()).get_tx(tx_hash)))), expiring_dict.ExpiringDict(300))
+        memorypools = expiring_dict.ExpiringDict(300)
         
         @defer.inlineCallbacks
-        def new_tx(tx_hash):
-            try:
-                assert isinstance(tx_hash, (int, long))
-                #print 'REQUESTING', tx_hash
-                tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
-                #print 'GOT', tx
-                tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
-            except:
-                log.err(None, 'Error handling tx:')
-        # disable for now, for testing impact on stales
-        #factory.new_tx.watch(new_tx)
+        def memorypool_thread():
+            while True:
+                try:
+                    res = yield deferral.retry('Error while getting memory pool:', 1)(bitcoind.rpc_getmemorypool)()
+                    tx_hashes = [int(tx_hash_hex, 16) for tx_hash_hex in res['transactions']]
+                    memorypools[int(res['previous_block'], 16)] = tx_hashes, res['fees']
+                    for tx_hash in tx_hashes:
+                        get_transaction.call_now(tx_hash, None)
+                except:
+                    log.err(None, 'Error while getting memory pool:')
+                yield deferral.sleep(random.expovariate(1/10))
+        memorypool_thread()
+        
+        # do new getwork when a block is heard on the p2p interface
         
         def new_block(block_hash):
             work_updated.happened()
index d5901b5..fd53f4e 100644 (file)
@@ -140,9 +140,12 @@ class DeferredCacher(object):
         self.backing[key] = value
         defer.returnValue(value)
     
-    def call_now(self, key):
+    _nothing = object()
+    def call_now(self, key, default=_nothing):
         if key in self.waiting:
-            raise NotNowError()
+            if default is not self._nothing:
+                return default
+            raise NotNowError(key)
         
         if key in self.backing:
             return self.backing[key]
@@ -158,4 +161,6 @@ class DeferredCacher(object):
                 fail.printTraceback()
                 print
             self.func(key).addCallback(cb).addErrback(eb)
-            raise NotNowError()
+            if default is not self._nothing:
+                return default
+            raise NotNowError(key)