refactored p2pool node implementation from p2pool.main to p2pool.node. dedicated...
authorForrest Voight <forrest@forre.st>
Fri, 19 Oct 2012 00:56:33 +0000 (20:56 -0400)
committerForrest Voight <forrest@forre.st>
Sun, 21 Oct 2012 01:48:41 +0000 (21:48 -0400)
p2pool/bitcoin/helper.py [new file with mode: 0644]
p2pool/data.py
p2pool/main.py
p2pool/node.py [new file with mode: 0644]
p2pool/p2p.py
p2pool/web.py
p2pool/work.py

diff --git a/p2pool/bitcoin/helper.py b/p2pool/bitcoin/helper.py
new file mode 100644 (file)
index 0000000..3d5810b
--- /dev/null
@@ -0,0 +1,78 @@
+import sys
+import time
+
+from twisted.internet import defer
+
+import p2pool
+from p2pool.bitcoin import data as bitcoin_data
+from p2pool.util import deferral, jsonrpc
+
+@deferral.retry('Error while checking Bitcoin connection:', 1)
+@defer.inlineCallbacks
+def check(bitcoind, net):
+    if not (yield net.PARENT.RPC_CHECK(bitcoind)):
+        print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
+        raise deferral.RetrySilentlyException()
+    if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
+        print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
+        raise deferral.RetrySilentlyException()
+
+@deferral.retry('Error getting work from bitcoind:', 3)
+@defer.inlineCallbacks
+def getwork(bitcoind, use_getblocktemplate=False):
+    def go():
+        if use_getblocktemplate:
+            return bitcoind.rpc_getblocktemplate(dict(mode='template'))
+        else:
+            return bitcoind.rpc_getmemorypool()
+    try:
+        work = yield go()
+    except jsonrpc.Error_for_code(-32601): # Method not found
+        use_getblocktemplate = not use_getblocktemplate
+        try:
+            work = yield go()
+        except jsonrpc.Error_for_code(-32601): # Method not found
+            print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
+            raise deferral.RetrySilentlyException()
+    packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
+    if 'height' not in work:
+        work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
+    elif p2pool.DEBUG:
+        assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
+    defer.returnValue(dict(
+        version=work['version'],
+        previous_block=int(work['previousblockhash'], 16),
+        transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
+        transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
+        subsidy=work['coinbasevalue'],
+        time=work['time'] if 'time' in work else work['curtime'],
+        bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
+        coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
+        height=work['height'],
+        last_update=time.time(),
+        use_getblocktemplate=use_getblocktemplate,
+    ))
+
+@deferral.retry('Error submitting primary block: (will retry)', 10, 10)
+def submit_block_p2p(block, factory, net):
+    if factory.conn.value is None:
+        print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
+        raise deferral.RetrySilentlyException()
+    factory.conn.value.send_block(block=block)
+
+@deferral.retry('Error submitting block: (will retry)', 10, 10)
+@defer.inlineCallbacks
+def submit_block_rpc(block, ignore_failure, bitcoind, bitcoind_work, net):
+    if bitcoind_work.value['use_getblocktemplate']:
+        result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
+        success = result is None
+    else:
+        result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
+        success = result
+    success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
+    if (not success and success_expected and not ignore_failure) or (success and not success_expected):
+        print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
+
+def submit_block(block, ignore_failure, factory, bitcoind, bitcoind_work, net):
+    submit_block_p2p(block, factory, net)
+    submit_block_rpc(block, ignore_failure, bitcoind, bitcoind_work, net)
index 2aa0efc..e24e90b 100644 (file)
@@ -601,7 +601,7 @@ class WeightsSkipList(forest.TrackerSkipList):
         return math.add_dicts(*math.flatten_linked_list(weights_list)), total_weight, total_donation_weight
 
 class OkayTracker(forest.Tracker):
-    def __init__(self, net, my_share_hashes, my_doa_share_hashes):
+    def __init__(self, net):
         forest.Tracker.__init__(self, delta_type=forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs,
             work=lambda share: bitcoin_data.target_to_average_attempts(share.target),
             min_work=lambda share: bitcoin_data.target_to_average_attempts(share.max_target),
@@ -609,10 +609,6 @@ class OkayTracker(forest.Tracker):
         self.net = net
         self.verified = forest.SubsetTracker(delta_type=forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs,
             work=lambda share: bitcoin_data.target_to_average_attempts(share.target),
-            my_count=lambda share: 1 if share.hash in my_share_hashes else 0,
-            my_doa_count=lambda share: 1 if share.hash in my_doa_share_hashes else 0,
-            my_orphan_announce_count=lambda share: 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 'orphan' else 0,
-            my_dead_announce_count=lambda share: 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 'doa' else 0,
         )), subset_of=self)
         self.get_cumulative_weights = WeightsSkipList(self)
     
index ff85aac..7eb6d3e 100644 (file)
@@ -19,46 +19,10 @@ from twisted.python import log
 from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
-from bitcoin import worker_interface, height_tracker
+from bitcoin import worker_interface, helper
 from util import fixargparse, jsonrpc, variable, deferral, math, logging
-from . import p2p, networks, web, work
-import p2pool, p2pool.data as p2pool_data
-
-@deferral.retry('Error getting work from bitcoind:', 3)
-@defer.inlineCallbacks
-def getwork(bitcoind, use_getblocktemplate=False):
-    def go():
-        if use_getblocktemplate:
-            return bitcoind.rpc_getblocktemplate(dict(mode='template'))
-        else:
-            return bitcoind.rpc_getmemorypool()
-    try:
-        work = yield go()
-    except jsonrpc.Error_for_code(-32601): # Method not found
-        use_getblocktemplate = not use_getblocktemplate
-        try:
-            work = yield go()
-        except jsonrpc.Error_for_code(-32601): # Method not found
-            print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
-            raise deferral.RetrySilentlyException()
-    packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
-    if 'height' not in work:
-        work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
-    elif p2pool.DEBUG:
-        assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
-    defer.returnValue(dict(
-        version=work['version'],
-        previous_block=int(work['previousblockhash'], 16),
-        transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
-        transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
-        subsidy=work['coinbasevalue'],
-        time=work['time'] if 'time' in work else work['curtime'],
-        bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
-        coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
-        height=work['height'],
-        last_update=time.time(),
-        use_getblocktemplate=use_getblocktemplate,
-    ))
+from . import networks, web, work
+import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
 
 @defer.inlineCallbacks
 def main(args, net, datadir_path, merged_urls, worker_endpoint):
@@ -66,8 +30,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print 'p2pool (version %s)' % (p2pool.__version__,)
         print
         
-        traffic_happened = variable.Event()
-        
         @defer.inlineCallbacks
         def connect_p2p():
             # connect to bitcoind over bitcoin-p2p
@@ -86,17 +48,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
-        @deferral.retry('Error while checking Bitcoin connection:', 1)
-        @defer.inlineCallbacks
-        def check():
-            if not (yield net.PARENT.RPC_CHECK(bitcoind)):
-                print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
-                raise deferral.RetrySilentlyException()
-            if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
-                print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
-                raise deferral.RetrySilentlyException()
-        yield check()
-        temp_work = yield getwork(bitcoind)
+        yield helper.check(bitcoind, net)
+        temp_work = yield helper.getwork(bitcoind)
         
         bitcoind_warning_var = variable.Variable(None)
         @defer.inlineCallbacks
@@ -144,222 +97,49 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
         print
         
-        my_share_hashes = set()
-        my_doa_share_hashes = set()
-        
-        tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
-        shared_share_hashes = set()
         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
+        shares = {}
         known_verified = set()
         print "Loading shares..."
         for i, (mode, contents) in enumerate(ss.get_shares()):
             if mode == 'share':
-                if contents.hash in tracker.items:
-                    continue
-                shared_share_hashes.add(contents.hash)
                 contents.time_seen = 0
-                tracker.add(contents)
-                if len(tracker.items) % 1000 == 0 and tracker.items:
-                    print "    %i" % (len(tracker.items),)
+                shares[contents.hash] = contents
+                if len(shares) % 1000 == 0 and shares:
+                    print "    %i" % (len(shares),)
             elif mode == 'verified_hash':
                 known_verified.add(contents)
             else:
                 raise AssertionError()
-        print "    ...inserting %i verified shares..." % (len(known_verified),)
-        for h in known_verified:
-            if h not in tracker.items:
-                ss.forget_verified_share(h)
-                continue
-            tracker.verified.add(tracker.items[h])
-        print "    ...done loading %i shares!" % (len(tracker.items),)
+        print "    ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified))
         print
-        tracker.removed.watch(lambda share: ss.forget_share(share.hash))
-        tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
-        tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
-        
-        print 'Initializing work...'
-        
         
-        # BITCOIND WORK
         
-        bitcoind_work = variable.Variable((yield getwork(bitcoind)))
-        @defer.inlineCallbacks
-        def work_poller():
-            while True:
-                flag = factory.new_block.get_deferred()
-                try:
-                    bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
-                except:
-                    log.err()
-                yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
-        work_poller()
-        
-        # PEER WORK
-        
-        best_block_header = variable.Variable(None)
-        def handle_header(new_header):
-            # check that header matches current target
-            if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
-                return
-            bitcoind_best_block = bitcoind_work.value['previous_block']
-            if (best_block_header.value is None
-                or (
-                    new_header['previous_block'] == bitcoind_best_block and
-                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
-                ) # new is child of current and previous is current
-                or (
-                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
-                    best_block_header.value['previous_block'] != bitcoind_best_block
-                )): # new is current and previous is not a child of current
-                best_block_header.set(new_header)
-        @defer.inlineCallbacks
-        def poll_header():
-            handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
-        bitcoind_work.changed.watch(lambda _: poll_header())
-        yield deferral.retry('Error while requesting best block header:')(poll_header)()
+        print 'Initializing work...'
         
-        # BEST SHARE
+        node = p2pool_node.Node(factory, bitcoind, shares.values(), known_verified, net)
+        yield node.start()
         
-        known_txs_var = variable.Variable({}) # hash -> tx
-        mining_txs_var = variable.Variable({}) # hash -> tx
-        get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
+        for share_hash in shares:
+            if share_hash not in node.tracker.items:
+                ss.forget_share(share_hash)
+        for share_hash in known_verified:
+            if share_hash not in node.tracker.verified.items:
+                ss.forget_verified_share(share_hash)
+        del shares, known_verified
+        node.tracker.removed.watch(lambda share: ss.forget_share(share.hash))
+        node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
         
-        best_share_var = variable.Variable(None)
-        desired_var = variable.Variable(None)
-        def set_best_share():
-            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
-            
-            best_share_var.set(best)
-            desired_var.set(desired)
-        bitcoind_work.changed.watch(lambda _: set_best_share())
-        set_best_share()
+        def save_shares():
+            for share in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 2*net.CHAIN_LENGTH)):
+                ss.add_share(share)
+                if share.hash in node.tracker.verified.items:
+                    ss.add_verified_hash(share.hash)
+        task.LoopingCall(save_shares).start(60)
         
         print '    ...success!'
         print
         
-        # setup p2p logic and join p2pool network
-        
-        # update mining_txs according to getwork results
-        @bitcoind_work.changed.run_and_watch
-        def _(_=None):
-            new_mining_txs = {}
-            new_known_txs = dict(known_txs_var.value)
-            for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']):
-                new_mining_txs[tx_hash] = tx
-                new_known_txs[tx_hash] = tx
-            mining_txs_var.set(new_mining_txs)
-            known_txs_var.set(new_known_txs)
-        # add p2p transactions from bitcoind to known_txs
-        @factory.new_tx.watch
-        def _(tx):
-            new_known_txs = dict(known_txs_var.value)
-            new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
-            known_txs_var.set(new_known_txs)
-        # forward transactions seen to bitcoind
-        @known_txs_var.transitioned.watch
-        @defer.inlineCallbacks
-        def _(before, after):
-            yield deferral.sleep(random.expovariate(1/1))
-            for tx_hash in set(after) - set(before):
-                factory.conn.value.send_tx(tx=after[tx_hash])
-        
-        class Node(p2p.Node):
-            def handle_shares(self, shares, peer):
-                if len(shares) > 5:
-                    print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
-                
-                new_count = 0
-                for share in shares:
-                    if share.hash in tracker.items:
-                        #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
-                        continue
-                    
-                    new_count += 1
-                    
-                    #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
-                    
-                    tracker.add(share)
-                
-                if new_count:
-                    set_best_share()
-                
-                if len(shares) > 5:
-                    print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
-            
-            @defer.inlineCallbacks
-            def handle_share_hashes(self, hashes, peer):
-                new_hashes = [x for x in hashes if x not in tracker.items]
-                if not new_hashes:
-                    return
-                try:
-                    shares = yield peer.get_shares(
-                        hashes=new_hashes,
-                        parents=0,
-                        stops=[],
-                    )
-                except:
-                    log.err(None, 'in handle_share_hashes:')
-                else:
-                    self.handle_shares(shares, peer)
-            
-            def handle_get_shares(self, hashes, parents, stops, peer):
-                parents = min(parents, 1000//len(hashes))
-                stops = set(stops)
-                shares = []
-                for share_hash in hashes:
-                    for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
-                        if share.hash in stops:
-                            break
-                        shares.append(share)
-                print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
-                return shares
-            
-            def handle_bestblock(self, header, peer):
-                if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
-                    raise p2p.PeerMisbehavingError('received block header fails PoW test')
-                handle_header(header)
-        
-        @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
-        def submit_block_p2p(block):
-            if factory.conn.value is None:
-                print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
-                raise deferral.RetrySilentlyException()
-            factory.conn.value.send_block(block=block)
-        
-        @deferral.retry('Error submitting block: (will retry)', 10, 10)
-        @defer.inlineCallbacks
-        def submit_block_rpc(block, ignore_failure):
-            if bitcoind_work.value['use_getblocktemplate']:
-                result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
-                success = result is None
-            else:
-                result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
-                success = result
-            success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
-            if (not success and success_expected and not ignore_failure) or (success and not success_expected):
-                print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
-        
-        def submit_block(block, ignore_failure):
-            submit_block_p2p(block)
-            submit_block_rpc(block, ignore_failure)
-        
-        @tracker.verified.added.watch
-        def _(share):
-            if share.pow_hash <= share.header['bits'].target:
-                block = share.as_block(tracker, known_txs_var.value)
-                if block is None:
-                    print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
-                    return
-                submit_block(block, ignore_failure=True)
-                print
-                print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
-                print
-                def spread():
-                    if (get_height_rel_highest(share.header['previous_block']) > -5 or
-                        bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
-                        broadcast_share(share.hash)
-                spread()
-                reactor.callLater(5, spread) # so get_height_rel_highest can update
         
         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
         
@@ -393,91 +173,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             except:
                 log.err()
         
-        p2p_node = Node(
-            best_share_hash_func=lambda: best_share_var.value,
-            port=args.p2pool_port,
-            net=net,
-            addr_store=addrs,
-            connect_addrs=connect_addrs,
-            max_incoming_conns=args.p2pool_conns,
-            traffic_happened=traffic_happened,
-            known_txs_var=known_txs_var,
-            mining_txs_var=mining_txs_var,
-        )
-        p2p_node.start()
-        
-        def forget_old_txs():
-            new_known_txs = {}
-            for peer in p2p_node.peers.itervalues():
-                new_known_txs.update(peer.remembered_txs)
-            new_known_txs.update(mining_txs_var.value)
-            for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))):
-                for tx_hash in share.new_transaction_hashes:
-                    if tx_hash in known_txs_var.value:
-                        new_known_txs[tx_hash] = known_txs_var.value[tx_hash]
-            known_txs_var.set(new_known_txs)
-        task.LoopingCall(forget_old_txs).start(10)
+        node.p2p_node = p2pool_node.P2PNode(node, args.p2pool_port, args.p2pool_conns, addrs, connect_addrs)
+        node.p2p_node.start()
         
         def save_addrs():
             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
-                f.write(json.dumps(p2p_node.addr_store.items()))
+                f.write(json.dumps(node.p2p_node.addr_store.items()))
         task.LoopingCall(save_addrs).start(60)
         
-        @best_block_header.changed.watch
-        def _(header):
-            for peer in p2p_node.peers.itervalues():
-                peer.send_bestblock(header=header)
-        
-        @defer.inlineCallbacks
-        def broadcast_share(share_hash):
-            shares = []
-            for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
-                if share.hash in shared_share_hashes:
-                    break
-                shared_share_hashes.add(share.hash)
-                shares.append(share)
-            
-            for peer in list(p2p_node.peers.itervalues()):
-                yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
-        
-        # send share when the chain changes to their chain
-        best_share_var.changed.watch(broadcast_share)
-        
-        def save_shares():
-            for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
-                ss.add_share(share)
-                if share.hash in tracker.verified.items:
-                    ss.add_verified_hash(share.hash)
-        task.LoopingCall(save_shares).start(60)
-        
-        @apply
-        @defer.inlineCallbacks
-        def download_shares():
-            while True:
-                desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
-                peer2, share_hash = random.choice(desired)
-                
-                if len(p2p_node.peers) == 0:
-                    yield deferral.sleep(1)
-                    continue
-                peer = random.choice(p2p_node.peers.values())
-                
-                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
-                try:
-                    shares = yield peer.get_shares(
-                        hashes=[share_hash],
-                        parents=500,
-                        stops=[],
-                    )
-                except:
-                    log.err(None, 'in download_shares:')
-                    continue
-                
-                if not shares:
-                    yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
-                    continue
-                p2p_node.handle_shares(shares, peer)
-        
         print '    ...success!'
         print
         
@@ -502,10 +205,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
         
-        get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
-        
-        wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share)
-        web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened, args.donation_percentage)
+        wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee)
+        web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
@@ -561,7 +262,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
                                 self.say(self.channel, message)
                                 self._remember_message(message)
-                    self.watch_id = tracker.verified.added.watch(new_share)
+                    self.watch_id = node.tracker.verified.added.watch(new_share)
                     self.recent_messages = []
                 def joined(self, channel):
                     self.in_channel = True
@@ -575,7 +276,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     if channel == self.channel:
                         self._remember_message(message)
                 def connectionLost(self, reason):
-                    tracker.verified.added.unwatch(self.watch_id)
+                    node.tracker.verified.added.unwatch(self.watch_id)
                     print 'IRC connection lost:', reason.getErrorMessage()
             class IRCClientFactory(protocol.ReconnectingClientFactory):
                 protocol = IRCClient
@@ -588,13 +289,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             while True:
                 yield deferral.sleep(3)
                 try:
-                    height = tracker.get_height(best_share_var.value)
+                    height = node.tracker.get_height(node.best_share_var.value)
                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
                         height,
-                        len(tracker.verified.items),
-                        len(tracker.items),
-                        len(p2p_node.peers),
-                        sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
+                        len(node.tracker.verified.items),
+                        len(node.tracker.items),
+                        len(node.p2p_node.peers),
+                        sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
                     
                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
@@ -603,27 +304,27 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                         math.format(int(my_att_s)),
                         math.format_dt(dt),
                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
-                        math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
+                        math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???',
                     )
                     
                     if height > 2:
                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
-                        stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
-                        real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
+                        stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
+                        real_att_s = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
                         
                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
                             shares, stale_orphan_shares, stale_doa_shares,
                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
-                            get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
+                            node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
                         )
                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
                             math.format(int(real_att_s)),
                             100*stale_prop,
-                            math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
+                            math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s),
                         )
                         
-                        for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
+                        for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value):
                             print >>sys.stderr, '#'*40
                             print >>sys.stderr, '>>> Warning: ' + warning
                             print >>sys.stderr, '#'*40
diff --git a/p2pool/node.py b/p2pool/node.py
new file mode 100644 (file)
index 0000000..406787c
--- /dev/null
@@ -0,0 +1,281 @@
+import random
+import sys
+
+from twisted.internet import defer, reactor, task
+from twisted.python import log
+
+from p2pool import data as p2pool_data, p2p
+from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
+from p2pool.util import deferral, variable
+
+
+class P2PNode(p2p.Node):
+    def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
+        self.node = node
+        p2p.Node.__init__(self,
+            best_share_hash_func=lambda: node.best_share_var.value,
+            port=p2pool_port,
+            net=node.net,
+            addr_store=addrs,
+            connect_addrs=connect_addrs,
+            max_incoming_conns=p2pool_conns,
+            known_txs_var=node.known_txs_var,
+            mining_txs_var=node.mining_txs_var,
+        )
+    
+    def handle_shares(self, shares, peer):
+        if len(shares) > 5:
+            print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
+        
+        new_count = 0
+        for share in shares:
+            if share.hash in self.node.tracker.items:
+                #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
+                continue
+            
+            new_count += 1
+            
+            #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
+            
+            self.node.tracker.add(share)
+        
+        if new_count:
+            self.node.set_best_share()
+        
+        if len(shares) > 5:
+            print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
+    
+    @defer.inlineCallbacks
+    def handle_share_hashes(self, hashes, peer):
+        new_hashes = [x for x in hashes if x not in self.node.tracker.items]
+        if not new_hashes:
+            return
+        try:
+            shares = yield peer.get_shares(
+                hashes=new_hashes,
+                parents=0,
+                stops=[],
+            )
+        except:
+            log.err(None, 'in handle_share_hashes:')
+        else:
+            self.handle_shares(shares, peer)
+    
+    def handle_get_shares(self, hashes, parents, stops, peer):
+        parents = min(parents, 1000//len(hashes))
+        stops = set(stops)
+        shares = []
+        for share_hash in hashes:
+            for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
+                if share.hash in stops:
+                    break
+                shares.append(share)
+        print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
+        return shares
+    
+    def handle_bestblock(self, header, peer):
+        if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
+            raise p2p.PeerMisbehavingError('received block header fails PoW test')
+        self.node.handle_header(header)
+    
+    @defer.inlineCallbacks
+    def broadcast_share(self, share_hash):
+        shares = []
+        for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
+            if share.hash in self.shared_share_hashes:
+                break
+            self.shared_share_hashes.add(share.hash)
+            shares.append(share)
+        
+        for peer in list(self.peers.itervalues()):
+            yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
+    
+    def start(self):
+        p2p.Node.start(self)
+        
+        self.shared_share_hashes = set(self.node.tracker.items)
+        self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
+        
+        @apply
+        @defer.inlineCallbacks
+        def download_shares():
+            while True:
+                desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
+                peer2, share_hash = random.choice(desired)
+                
+                if len(self.peers) == 0:
+                    yield deferral.sleep(1)
+                    continue
+                peer = random.choice(self.peers.values())
+                
+                print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
+                try:
+                    shares = yield peer.get_shares(
+                        hashes=[share_hash],
+                        parents=500,
+                        stops=[],
+                    )
+                except:
+                    log.err(None, 'in download_shares:')
+                    continue
+                
+                if not shares:
+                    yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
+                    continue
+                self.handle_shares(shares, peer)
+        
+        
+        @self.node.best_block_header.changed.watch
+        def _(header):
+            for peer in self.peers.itervalues():
+                peer.send_bestblock(header=header)
+        
+        # send share when the chain changes to their chain
+        self.node.best_share_var.changed.watch(self.broadcast_share)
+        
+        @self.node.tracker.verified.added.watch
+        def _(share):
+            if not (share.pow_hash <= share.header['bits'].target):
+                return
+            
+            def spread():
+                if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
+                    self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
+                    self.broadcast_share(share.hash)
+            spread()
+            reactor.callLater(5, spread) # so get_height_rel_highest can update
+        
+
+class Node(object):
+    def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
+        self.factory = factory
+        self.bitcoind = bitcoind
+        self.net = net
+        
+        self.tracker = p2pool_data.OkayTracker(self.net)
+        
+        for share in shares:
+            self.tracker.add(share)
+        
+        for share_hash in known_verified_share_hashes:
+            if share_hash in self.tracker.items:
+                self.tracker.verified.add(self.tracker.items[share_hash])
+        
+        self.p2p_node = None # overwritten externally
+    
+    @defer.inlineCallbacks
+    def start(self):
+        stop_signal = variable.Event()
+        self.stop = stop_signal.happened
+        
+        # BITCOIND WORK
+        
+        self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
+        @defer.inlineCallbacks
+        def work_poller():
+            while True:
+                flag = self.factory.new_block.get_deferred()
+                try:
+                    self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
+                except:
+                    log.err()
+                yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
+        work_poller()
+        
+        # PEER WORK
+        
+        self.best_block_header = variable.Variable(None)
+        def handle_header(new_header):
+            # check that header matches current target
+            if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
+                return
+            bitcoind_best_block = self.bitcoind_work.value['previous_block']
+            if (self.best_block_header.value is None
+                or (
+                    new_header['previous_block'] == bitcoind_best_block and
+                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
+                ) # new is child of current and previous is current
+                or (
+                    bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
+                    self.best_block_header.value['previous_block'] != bitcoind_best_block
+                )): # new is current and previous is not a child of current
+                self.best_block_header.set(new_header)
+        self.handle_header = handle_header
+        @defer.inlineCallbacks
+        def poll_header():
+            handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
+        self.bitcoind_work.changed.watch(lambda _: poll_header())
+        yield deferral.retry('Error while requesting best block header:')(poll_header)()
+        
+        # BEST SHARE
+        
+        self.known_txs_var = variable.Variable({}) # hash -> tx
+        self.mining_txs_var = variable.Variable({}) # hash -> tx
+        self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
+        
+        self.best_share_var = variable.Variable(None)
+        self.desired_var = variable.Variable(None)
+        self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
+        self.set_best_share()
+        
+        # setup p2p logic and join p2pool network
+        
+        # update mining_txs according to getwork results
+        @self.bitcoind_work.changed.run_and_watch
+        def _(_=None):
+            new_mining_txs = {}
+            new_known_txs = dict(self.known_txs_var.value)
+            for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
+                new_mining_txs[tx_hash] = tx
+                new_known_txs[tx_hash] = tx
+            self.mining_txs_var.set(new_mining_txs)
+            self.known_txs_var.set(new_known_txs)
+        # add p2p transactions from bitcoind to known_txs
+        @self.factory.new_tx.watch
+        def _(tx):
+            new_known_txs = dict(self.known_txs_var.value)
+            new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
+            self.known_txs_var.set(new_known_txs)
+        # forward transactions seen to bitcoind
+        @self.known_txs_var.transitioned.watch
+        @defer.inlineCallbacks
+        def _(before, after):
+            yield deferral.sleep(random.expovariate(1/1))
+            for tx_hash in set(after) - set(before):
+                self.factory.conn.value.send_tx(tx=after[tx_hash])
+        
+        @self.tracker.verified.added.watch
+        def _(share):
+            if not (share.pow_hash <= share.header['bits'].target):
+                return
+            
+            block = share.as_block(self.tracker, self.known_txs_var.value)
+            if block is None:
+                print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+                return
+            helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
+            print
+            print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
+            print
+        
+        def forget_old_txs():
+            new_known_txs = {}
+            if self.p2p_node is not None:
+                for peer in self.p2p_node.peers.itervalues():
+                    new_known_txs.update(peer.remembered_txs)
+            new_known_txs.update(self.mining_txs_var.value)
+            for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
+                for tx_hash in share.new_transaction_hashes:
+                    if tx_hash in self.known_txs_var.value:
+                        new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
+            self.known_txs_var.set(new_known_txs)
+        task.LoopingCall(forget_old_txs).start(10)
+    
+    def set_best_share(self):
+        best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
+        
+        self.best_share_var.set(best)
+        self.desired_var.set(desired)
+    
+    def get_current_txouts(self):
+        return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
index 7d3c860..22a731f 100644 (file)
@@ -552,17 +552,17 @@ class SingleClientFactory(protocol.ReconnectingClientFactory):
         self.node.lost_conn(proto, reason)
 
 class Node(object):
-    def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})):
+    def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})):
         self.best_share_hash_func = best_share_hash_func
         self.port = port
         self.net = net
         self.addr_store = dict(addr_store)
         self.connect_addrs = connect_addrs
         self.preferred_storage = preferred_storage
-        self.traffic_happened = traffic_happened
         self.known_txs_var = known_txs_var
         self.mining_txs_var = mining_txs_var
         
+        self.traffic_happened = variable.Event()
         self.nonce = random.randrange(2**64)
         self.peers = {}
         self.bans = {} # address -> end_time
index 3eafa1b..ba7e9db 100644 (file)
@@ -44,21 +44,22 @@ def _atomic_write(filename, data):
         os.remove(filename)
         os.rename(filename + '.new', filename)
 
-def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received, best_share_var, bitcoin_warning_var, traffic_happened, donation_percentage):
+def get_web_root(wb, datadir_path, bitcoind_warning_var):
+    node = wb.node
     start_time = time.time()
     
     web_root = resource.Resource()
     
     def get_users():
-        height, last = tracker.get_height_and_last(best_share_var.value)
-        weights, total_weight, donation_weight = tracker.get_cumulative_weights(best_share_var.value, min(height, 720), 65535*2**256)
+        height, last = node.tracker.get_height_and_last(node.best_share_var.value)
+        weights, total_weight, donation_weight = node.tracker.get_cumulative_weights(node.best_share_var.value, min(height, 720), 65535*2**256)
         res = {}
         for script in sorted(weights, key=lambda s: weights[s]):
-            res[bitcoin_data.script2_to_address(script, net.PARENT)] = weights[script]/total_weight
+            res[bitcoin_data.script2_to_address(script, node.net.PARENT)] = weights[script]/total_weight
         return res
     
     def get_current_scaled_txouts(scale, trunc=0):
-        txouts = get_current_txouts()
+        txouts = node.get_current_txouts()
         total = sum(txouts.itervalues())
         results = dict((script, value*scale//total) for script, value in txouts.iteritems())
         if trunc > 0:
@@ -84,15 +85,15 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
         total = int(float(total)*1e8)
         trunc = int(float(trunc)*1e8)
         return json.dumps(dict(
-            (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
+            (bitcoin_data.script2_to_address(script, node.net.PARENT), value/1e8)
             for script, value in get_current_scaled_txouts(total, trunc).iteritems()
-            if bitcoin_data.script2_to_address(script, net.PARENT) is not None
+            if bitcoin_data.script2_to_address(script, node.net.PARENT) is not None
         ))
     
     def get_local_rates():
         miner_hash_rates = {}
         miner_dead_hash_rates = {}
-        datums, dt = local_rate_monitor.get_datums_in_last()
+        datums, dt = wb.local_rate_monitor.get_datums_in_last()
         for datum in datums:
             miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt
             if datum['dead']:
@@ -101,43 +102,43 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
     
     def get_global_stats():
         # averaged over last hour
-        if tracker.get_height(best_share_var.value) < 10:
+        if node.tracker.get_height(node.best_share_var.value) < 10:
             return None
-        lookbehind = min(tracker.get_height(best_share_var.value), 3600//net.SHARE_PERIOD)
+        lookbehind = min(node.tracker.get_height(node.best_share_var.value), 3600//node.net.SHARE_PERIOD)
         
-        nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, lookbehind)
-        stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind)
+        nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, lookbehind)
+        stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind)
         return dict(
             pool_nonstale_hash_rate=nonstale_hash_rate,
             pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
             pool_stale_prop=stale_prop,
-            min_difficulty=bitcoin_data.target_to_difficulty(tracker.items[best_share_var.value].max_target),
+            min_difficulty=bitcoin_data.target_to_difficulty(node.tracker.items[node.best_share_var.value].max_target),
         )
     
     def get_local_stats():
-        if tracker.get_height(best_share_var.value) < 10:
+        if node.tracker.get_height(node.best_share_var.value) < 10:
             return None
-        lookbehind = min(tracker.get_height(best_share_var.value), 3600//net.SHARE_PERIOD)
+        lookbehind = min(node.tracker.get_height(node.best_share_var.value), 3600//node.net.SHARE_PERIOD)
         
-        global_stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind)
+        global_stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind)
         
-        my_unstale_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes)
-        my_orphan_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 'orphan')
-        my_doa_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 'doa')
+        my_unstale_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes)
+        my_orphan_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes and share.share_data['stale_info'] == 'orphan')
+        my_doa_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes and share.share_data['stale_info'] == 'doa')
         my_share_count = my_unstale_count + my_orphan_count + my_doa_count
         my_stale_count = my_orphan_count + my_doa_count
         
         my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
         
         my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
-            for share in tracker.get_chain(best_share_var.value, lookbehind - 1)
-            if share.hash in my_share_hashes)
-        actual_time = (tracker.items[best_share_var.value].timestamp -
-            tracker.items[tracker.get_nth_parent_hash(best_share_var.value, lookbehind - 1)].timestamp)
+            for share in node.tracker.get_chain(node.best_share_var.value, lookbehind - 1)
+            if share.hash in wb.my_share_hashes)
+        actual_time = (node.tracker.items[node.best_share_var.value].timestamp -
+            node.tracker.items[node.tracker.get_nth_parent_hash(node.best_share_var.value, lookbehind - 1)].timestamp)
         share_att_s = my_work / actual_time
         
         miner_hash_rates, miner_dead_hash_rates = get_local_rates()
-        (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
+        (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
         
         return dict(
             my_hash_rates_in_last_hour=dict(
@@ -163,8 +164,8 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
             efficiency_if_miner_perfect=(1 - stale_orphan_shares/shares)/(1 - global_stale_prop) if shares else None, # ignores dead shares because those are miner's fault and indicated by pseudoshare rejection
             efficiency=(1 - (stale_orphan_shares+stale_doa_shares)/shares)/(1 - global_stale_prop) if shares else None,
             peers=dict(
-                incoming=sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
-                outgoing=sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming),
+                incoming=sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
+                outgoing=sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming),
             ),
             shares=dict(
                 total=shares,
@@ -172,11 +173,11 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
                 dead=stale_doa_shares,
             ),
             uptime=time.time() - start_time,
-            attempts_to_share=bitcoin_data.target_to_average_attempts(tracker.items[best_share_var.value].max_target),
-            attempts_to_block=bitcoin_data.target_to_average_attempts(bitcoind_work.value['bits'].target),
-            block_value=bitcoind_work.value['subsidy']*1e-8,
-            warnings=p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoin_warning_var.value, bitcoind_work.value),
-            donation_proportion=donation_percentage/100,
+            attempts_to_share=bitcoin_data.target_to_average_attempts(node.tracker.items[node.best_share_var.value].max_target),
+            attempts_to_block=bitcoin_data.target_to_average_attempts(node.bitcoind_work.value['bits'].target),
+            block_value=node.bitcoind_work.value['subsidy']*1e-8,
+            warnings=p2pool_data.get_warnings(node.tracker, node.best_share_var.value, node.net, bitcoind_warning_var.value, node.bitcoind_work.value),
+            donation_proportion=wb.donation_percentage/100,
         )
     
     class WebInterface(deferred_resource.DeferredResource):
@@ -194,18 +195,18 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
             res = yield self.func(*self.args)
             defer.returnValue(json.dumps(res) if self.mime_type == 'application/json' else res)
     
-    web_root.putChild('rate', WebInterface(lambda: p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, 720)/(1-p2pool_data.get_average_stale_prop(tracker, best_share_var.value, 720))))
-    web_root.putChild('difficulty', WebInterface(lambda: bitcoin_data.target_to_difficulty(tracker.items[best_share_var.value].max_target)))
+    web_root.putChild('rate', WebInterface(lambda: p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, 720)/(1-p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, 720))))
+    web_root.putChild('difficulty', WebInterface(lambda: bitcoin_data.target_to_difficulty(node.tracker.items[node.best_share_var.value].max_target)))
     web_root.putChild('users', WebInterface(get_users))
-    web_root.putChild('user_stales', WebInterface(lambda: dict((bitcoin_data.pubkey_hash_to_address(ph, net.PARENT), prop) for ph, prop in
-        p2pool_data.get_user_stale_props(tracker, best_share_var.value, tracker.get_height(best_share_var.value)).iteritems())))
-    web_root.putChild('fee', WebInterface(lambda: worker_fee))
-    web_root.putChild('current_payouts', WebInterface(lambda: dict((bitcoin_data.script2_to_address(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems())))
+    web_root.putChild('user_stales', WebInterface(lambda: dict((bitcoin_data.pubkey_hash_to_address(ph, node.net.PARENT), prop) for ph, prop in
+        p2pool_data.get_user_stale_props(node.tracker, node.best_share_var.value, node.tracker.get_height(node.best_share_var.value)).iteritems())))
+    web_root.putChild('fee', WebInterface(lambda: wb.worker_fee))
+    web_root.putChild('current_payouts', WebInterface(lambda: dict((bitcoin_data.script2_to_address(script, node.net.PARENT), value/1e8) for script, value in node.get_current_txouts().iteritems())))
     web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain'))
     web_root.putChild('global_stats', WebInterface(get_global_stats))
     web_root.putChild('local_stats', WebInterface(get_local_stats))
-    web_root.putChild('peer_addresses', WebInterface(lambda: ['%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port) for peer in p2p_node.peers.itervalues()]))
-    web_root.putChild('peer_txpool_sizes', WebInterface(lambda: dict(('%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port), peer.remembered_txs_size) for peer in p2p_node.peers.itervalues())))
+    web_root.putChild('peer_addresses', WebInterface(lambda: ['%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port) for peer in node.p2p_node.peers.itervalues()]))
+    web_root.putChild('peer_txpool_sizes', WebInterface(lambda: dict(('%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port), peer.remembered_txs_size) for peer in node.p2p_node.peers.itervalues())))
     web_root.putChild('pings', WebInterface(defer.inlineCallbacks(lambda: defer.returnValue(
         dict([(a, (yield b)) for a, b in
             [(
@@ -213,19 +214,19 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
                 defer.inlineCallbacks(lambda peer=peer: defer.returnValue(
                     min([(yield peer.do_ping().addCallback(lambda x: x/0.001).addErrback(lambda fail: None)) for i in xrange(3)])
                 ))()
-            ) for peer in list(p2p_node.peers.itervalues())]
+            ) for peer in list(node.p2p_node.peers.itervalues())]
         ])
     ))))
-    web_root.putChild('peer_versions', WebInterface(lambda: dict(('%s:%i' % peer.addr, peer.other_sub_version) for peer in p2p_node.peers.itervalues())))
-    web_root.putChild('payout_addr', WebInterface(lambda: bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)))
+    web_root.putChild('peer_versions', WebInterface(lambda: dict(('%s:%i' % peer.addr, peer.other_sub_version) for peer in node.p2p_node.peers.itervalues())))
+    web_root.putChild('payout_addr', WebInterface(lambda: bitcoin_data.pubkey_hash_to_address(wb.my_pubkey_hash, node.net.PARENT)))
     web_root.putChild('recent_blocks', WebInterface(lambda: [dict(
         ts=s.timestamp,
         hash='%064x' % s.header_hash,
         number=pack.IntType(24).unpack(s.share_data['coinbase'][1:4]),
         share='%064x' % s.hash,
-    ) for s in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 24*60*60//net.SHARE_PERIOD)) if s.pow_hash <= s.header['bits'].target]))
+    ) for s in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 24*60*60//node.net.SHARE_PERIOD)) if s.pow_hash <= s.header['bits'].target]))
     web_root.putChild('uptime', WebInterface(lambda: time.time() - start_time))
-    web_root.putChild('stale_rates', WebInterface(lambda: p2pool_data.get_stale_counts(tracker, best_share_var.value, 720, rates=True)))
+    web_root.putChild('stale_rates', WebInterface(lambda: p2pool_data.get_stale_counts(node.tracker, node.best_share_var.value, 720, rates=True)))
     
     new_root = resource.Resource()
     web_root.putChild('web', new_root)
@@ -241,31 +242,31 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
         while stat_log and stat_log[0]['time'] < time.time() - 24*60*60:
             stat_log.pop(0)
         
-        lookbehind = 3600//net.SHARE_PERIOD
-        if tracker.get_height(best_share_var.value) < lookbehind:
+        lookbehind = 3600//node.net.SHARE_PERIOD
+        if node.tracker.get_height(node.best_share_var.value) < lookbehind:
             return None
         
-        global_stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind)
-        (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
+        global_stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind)
+        (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
         miner_hash_rates, miner_dead_hash_rates = get_local_rates()
         
         stat_log.append(dict(
             time=time.time(),
-            pool_hash_rate=p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, lookbehind)/(1-global_stale_prop),
+            pool_hash_rate=p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, lookbehind)/(1-global_stale_prop),
             pool_stale_prop=global_stale_prop,
             local_hash_rates=miner_hash_rates,
             local_dead_hash_rates=miner_dead_hash_rates,
             shares=shares,
             stale_shares=stale_orphan_shares + stale_doa_shares,
             stale_shares_breakdown=dict(orphan=stale_orphan_shares, doa=stale_doa_shares),
-            current_payout=get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8,
+            current_payout=node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(wb.my_pubkey_hash), 0)*1e-8,
             peers=dict(
-                incoming=sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
-                outgoing=sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming),
+                incoming=sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
+                outgoing=sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming),
             ),
-            attempts_to_share=bitcoin_data.target_to_average_attempts(tracker.items[best_share_var.value].max_target),
-            attempts_to_block=bitcoin_data.target_to_average_attempts(bitcoind_work.value['bits'].target),
-            block_value=bitcoind_work.value['subsidy']*1e-8,
+            attempts_to_share=bitcoin_data.target_to_average_attempts(node.tracker.items[node.best_share_var.value].max_target),
+            attempts_to_block=bitcoin_data.target_to_average_attempts(node.bitcoind_work.value['bits'].target),
+            block_value=node.bitcoind_work.value['subsidy']*1e-8,
         ))
         
         with open(os.path.join(datadir_path, 'stats'), 'wb') as f:
@@ -274,15 +275,15 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
     new_root.putChild('log', WebInterface(lambda: stat_log))
     
     def get_share(share_hash_str):
-        if int(share_hash_str, 16) not in tracker.items:
+        if int(share_hash_str, 16) not in node.tracker.items:
             return None
-        share = tracker.items[int(share_hash_str, 16)]
+        share = node.tracker.items[int(share_hash_str, 16)]
         
         return dict(
             parent='%064x' % share.previous_hash,
-            children=['%064x' % x for x in sorted(tracker.reverse.get(share.hash, set()), key=lambda sh: -len(tracker.reverse.get(sh, set())))], # sorted from most children to least children
+            children=['%064x' % x for x in sorted(node.tracker.reverse.get(share.hash, set()), key=lambda sh: -len(node.tracker.reverse.get(sh, set())))], # sorted from most children to least children
             local=dict(
-                verified=share.hash in tracker.verified.items,
+                verified=share.hash in node.tracker.verified.items,
                 time_first_seen=start_time if share.time_seen == 0 else share.time_seen,
                 peer_first_received_from=share.peer.addr if share.peer is not None else None,
             ),
@@ -290,7 +291,7 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
                 timestamp=share.timestamp,
                 target=share.target,
                 max_target=share.max_target,
-                payout_address=bitcoin_data.script2_to_address(share.new_script, net.PARENT),
+                payout_address=bitcoin_data.script2_to_address(share.new_script, node.net.PARENT),
                 donation=share.share_data['donation']/65535,
                 stale_info=share.share_data['stale_info'],
                 nonce=share.share_data['nonce'],
@@ -315,21 +316,21 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
             ),
         )
     new_root.putChild('share', WebInterface(lambda share_hash_str: get_share(share_hash_str)))
-    new_root.putChild('heads', WebInterface(lambda: ['%064x' % x for x in tracker.heads]))
-    new_root.putChild('verified_heads', WebInterface(lambda: ['%064x' % x for x in tracker.verified.heads]))
-    new_root.putChild('tails', WebInterface(lambda: ['%064x' % x for t in tracker.tails for x in tracker.reverse.get(t, set())]))
-    new_root.putChild('verified_tails', WebInterface(lambda: ['%064x' % x for t in tracker.verified.tails for x in tracker.verified.reverse.get(t, set())]))
-    new_root.putChild('best_share_hash', WebInterface(lambda: '%064x' % best_share_var.value))
+    new_root.putChild('heads', WebInterface(lambda: ['%064x' % x for x in node.tracker.heads]))
+    new_root.putChild('verified_heads', WebInterface(lambda: ['%064x' % x for x in node.tracker.verified.heads]))
+    new_root.putChild('tails', WebInterface(lambda: ['%064x' % x for t in node.tracker.tails for x in node.tracker.reverse.get(t, set())]))
+    new_root.putChild('verified_tails', WebInterface(lambda: ['%064x' % x for t in node.tracker.verified.tails for x in node.tracker.verified.reverse.get(t, set())]))
+    new_root.putChild('best_share_hash', WebInterface(lambda: '%064x' % node.best_share_var.value))
     def get_share_data(share_hash_str):
-        if int(share_hash_str, 16) not in tracker.items:
+        if int(share_hash_str, 16) not in node.tracker.items:
             return ''
-        share = tracker.items[int(share_hash_str, 16)]
+        share = node.tracker.items[int(share_hash_str, 16)]
         return p2pool_data.share_type.pack(share.as_share1a())
     new_root.putChild('share_data', WebInterface(lambda share_hash_str: get_share_data(share_hash_str), 'application/octet-stream'))
     new_root.putChild('currency_info', WebInterface(lambda: dict(
-        symbol=net.PARENT.SYMBOL,
-        block_explorer_url_prefix=net.PARENT.BLOCK_EXPLORER_URL_PREFIX,
-        address_explorer_url_prefix=net.PARENT.ADDRESS_EXPLORER_URL_PREFIX,
+        symbol=node.net.PARENT.SYMBOL,
+        block_explorer_url_prefix=node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX,
+        address_explorer_url_prefix=node.net.PARENT.ADDRESS_EXPLORER_URL_PREFIX,
     )))
     new_root.putChild('version', WebInterface(lambda: p2pool.__version__))
     
@@ -392,7 +393,7 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
         'traffic_rate': graph.DataStreamDescription(dataview_descriptions, is_gauge=False, multivalues=True),
     }, hd_obj)
     task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100)
-    @pseudoshare_received.watch
+    @wb.pseudoshare_received.watch
     def _(work, dead, user):
         t = time.time()
         hd.datastreams['local_hash_rate'].add_datum(t, work)
@@ -402,33 +403,33 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net,
             hd.datastreams['miner_hash_rates'].add_datum(t, {user: work})
             if dead:
                 hd.datastreams['miner_dead_hash_rates'].add_datum(t, {user: work})
-    @share_received.watch
+    @wb.share_received.watch
     def _(work, dead):
         t = time.time()
         hd.datastreams['local_share_hash_rate'].add_datum(t, work)
         if dead:
             hd.datastreams['local_dead_share_hash_rate'].add_datum(t, work)
-    @traffic_happened.watch
+    @node.p2p_node.traffic_happened.watch
     def _(name, bytes):
         hd.datastreams['traffic_rate'].add_datum(time.time(), {name: bytes})
     def add_point():
-        if tracker.get_height(best_share_var.value) < 10:
+        if node.tracker.get_height(node.best_share_var.value) < 10:
             return None
-        lookbehind = min(net.CHAIN_LENGTH, 60*60//net.SHARE_PERIOD, tracker.get_height(best_share_var.value))
+        lookbehind = min(node.net.CHAIN_LENGTH, 60*60//node.net.SHARE_PERIOD, node.tracker.get_height(node.best_share_var.value))
         t = time.time()
         
-        hd.datastreams['pool_rates'].add_datum(t, p2pool_data.get_stale_counts(tracker, best_share_var.value, lookbehind, rates=True))
+        hd.datastreams['pool_rates'].add_datum(t, p2pool_data.get_stale_counts(node.tracker, node.best_share_var.value, lookbehind, rates=True))
         
-        current_txouts = get_current_txouts()
-        hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8)
+        current_txouts = node.get_current_txouts()
+        hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(wb.my_pubkey_hash), 0)*1e-8)
         miner_hash_rates, miner_dead_hash_rates = get_local_rates()
-        current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, net.PARENT), amount) for script, amount in current_txouts.iteritems())
+        current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, node.net.PARENT), amount) for script, amount in current_txouts.iteritems())
         hd.datastreams['current_payouts'].add_datum(t, dict((user, current_txouts_by_address[user]*1e-8) for user in miner_hash_rates if user in current_txouts_by_address))
         
-        hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming))
-        hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming))
+        hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming))
+        hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming))
         
-        vs = p2pool_data.get_desired_version_counts(tracker, best_share_var.value, lookbehind)
+        vs = p2pool_data.get_desired_version_counts(node.tracker, node.best_share_var.value, lookbehind)
         vs_total = sum(vs.itervalues())
         hd.datastreams['desired_versions'].add_datum(t, dict((str(k), v/vs_total) for k, v in vs.iteritems()))
     task.LoopingCall(add_point).start(5)
index b389100..bace060 100644 (file)
@@ -9,29 +9,19 @@ from twisted.internet import defer
 from twisted.python import log
 
 import bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
-from bitcoin import script, worker_interface
-from util import jsonrpc, variable, deferral, math, pack
+from bitcoin import helper, script, worker_interface
+from util import forest, jsonrpc, variable, deferral, math, pack
 import p2pool, p2pool.data as p2pool_data
 
 class WorkerBridge(worker_interface.WorkerBridge):
-    def __init__(self, my_pubkey_hash, net, donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, broadcast_share):
+    def __init__(self, node, my_pubkey_hash, donation_percentage, merged_urls, worker_fee):
         worker_interface.WorkerBridge.__init__(self)
         self.recent_shares_ts_work = []
         
+        self.node = node
         self.my_pubkey_hash = my_pubkey_hash
-        self.net = net
         self.donation_percentage = donation_percentage
-        self.bitcoind_work = bitcoind_work
-        self.best_block_header = best_block_header
-        self.best_share_var = best_share_var
-        self.tracker = tracker
-        self.my_share_hashes = my_share_hashes
-        self.my_doa_share_hashes = my_doa_share_hashes
         self.worker_fee = worker_fee
-        self.p2p_node = p2p_node
-        self.submit_block = submit_block
-        self.set_best_share = set_best_share
-        self.broadcast_share = broadcast_share
         
         self.pseudoshare_received = variable.Event()
         self.share_received = variable.Event()
@@ -40,16 +30,27 @@ class WorkerBridge(worker_interface.WorkerBridge):
         self.removed_unstales_var = variable.Variable((0, 0, 0))
         self.removed_doa_unstales_var = variable.Variable(0)
         
-        @tracker.verified.removed.watch
+        
+        self.my_share_hashes = set()
+        self.my_doa_share_hashes = set()
+        
+        self.tracker_view = forest.TrackerView(self.node.tracker, forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs,
+            my_count=lambda share: 1 if share.hash in self.my_share_hashes else 0,
+            my_doa_count=lambda share: 1 if share.hash in self.my_doa_share_hashes else 0,
+            my_orphan_announce_count=lambda share: 1 if share.hash in self.my_share_hashes and share.share_data['stale_info'] == 'orphan' else 0,
+            my_dead_announce_count=lambda share: 1 if share.hash in self.my_share_hashes and share.share_data['stale_info'] == 'doa' else 0,
+        )))
+        
+        @self.node.tracker.verified.removed.watch
         def _(share):
-            if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
+            if share.hash in self.my_share_hashes and self.node.tracker.is_child_of(share.hash, self.node.best_share_var.value):
                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
                 self.removed_unstales_var.set((
                     self.removed_unstales_var.value[0] + 1,
                     self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
                     self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
                 ))
-            if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
+            if share.hash in self.my_doa_share_hashes and self.node.tracker.is_child_of(share.hash, self.node.best_share_var.value):
                 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
         
         # MERGED WORK
@@ -78,9 +79,9 @@ class WorkerBridge(worker_interface.WorkerBridge):
         
         self.current_work = variable.Variable(None)
         def compute_work():
-            t = self.bitcoind_work.value
-            bb = self.best_block_header.value
-            if bb is not None and bb['previous_block'] == t['previous_block'] and net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target:
+            t = self.node.bitcoind_work.value
+            bb = self.node.best_block_header.value
+            if bb is not None and bb['previous_block'] == t['previous_block'] and self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target:
                 print 'Skipping from block %x to block %x!' % (bb['previous_block'],
                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)))
                 t = dict(
@@ -92,13 +93,13 @@ class WorkerBridge(worker_interface.WorkerBridge):
                     time=bb['timestamp'] + 600, # better way?
                     transactions=[],
                     merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
-                    subsidy=net.PARENT.SUBSIDY_FUNC(self.bitcoind_work.value['height']),
-                    last_update=self.bitcoind_work.value['last_update'],
+                    subsidy=self.node.net.PARENT.SUBSIDY_FUNC(self.node.bitcoind_work.value['height']),
+                    last_update=self.node.bitcoind_work.value['last_update'],
                 )
             
             self.current_work.set(t)
-        self.bitcoind_work.changed.watch(lambda _: compute_work())
-        self.best_block_header.changed.watch(lambda _: compute_work())
+        self.node.bitcoind_work.changed.watch(lambda _: compute_work())
+        self.node.best_block_header.changed.watch(lambda _: compute_work())
         compute_work()
         
         self.new_work_event = variable.Event()
@@ -108,13 +109,13 @@ class WorkerBridge(worker_interface.WorkerBridge):
             if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
                 self.new_work_event.happened()
         self.merged_work.changed.watch(lambda _: self.new_work_event.happened())
-        self.best_share_var.changed.watch(lambda _: self.new_work_event.happened())
+        self.node.best_share_var.changed.watch(lambda _: self.new_work_event.happened())
     
     def get_stale_counts(self):
         '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
         my_shares = len(self.my_share_hashes)
         my_doa_shares = len(self.my_doa_share_hashes)
-        delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
+        delta = self.tracker_view.get_delta_to_last(self.node.best_share_var.value)
         my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
         my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
         orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
@@ -148,7 +149,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
             pubkey_hash = self.my_pubkey_hash
         else:
             try:
-                pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
+                pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.node.net.PARENT)
             except: # XXX blah
                 pubkey_hash = self.my_pubkey_hash
         
@@ -159,9 +160,9 @@ class WorkerBridge(worker_interface.WorkerBridge):
         return pubkey_hash, desired_share_target, desired_pseudoshare_target
     
     def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
-        if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
+        if len(self.node.p2p_node.peers) == 0 and self.node.net.PERSIST:
             raise jsonrpc.Error_for_code(-12345)(u'p2pool is not connected to any peers')
-        if self.best_share_var.value is None and self.net.PERSIST:
+        if self.node.best_share_var.value is None and self.node.net.PERSIST:
             raise jsonrpc.Error_for_code(-12345)(u'p2pool is downloading shares')
         if time.time() > self.current_work.value['last_update'] + 60:
             raise jsonrpc.Error_for_code(-12345)(u'lost contact with bitcoind')
@@ -183,25 +184,25 @@ class WorkerBridge(worker_interface.WorkerBridge):
         tx_map = dict(zip(tx_hashes, self.current_work.value['transactions']))
         
         share_type = p2pool_data.NewShare
-        if self.best_share_var.value is not None:
-            previous_share = self.tracker.items[self.best_share_var.value]
+        if self.node.best_share_var.value is not None:
+            previous_share = self.node.tracker.items[self.node.best_share_var.value]
             if isinstance(previous_share, p2pool_data.Share):
                 # Share -> NewShare only valid if 85% of hashes in [net.CHAIN_LENGTH*9//10, net.CHAIN_LENGTH] for new version
-                if self.tracker.get_height(previous_share.hash) < self.net.CHAIN_LENGTH:
+                if self.node.tracker.get_height(previous_share.hash) < self.node.net.CHAIN_LENGTH:
                     share_type = p2pool_data.Share
-                elif time.time() < 1351383661 and self.net.NAME == 'bitcoin':
+                elif time.time() < 1351383661 and self.node.net.NAME == 'bitcoin':
                     share_type = p2pool_data.Share
                 else:
-                    counts = p2pool_data.get_desired_version_counts(self.tracker,
-                        self.tracker.get_nth_parent_hash(previous_share.hash, self.net.CHAIN_LENGTH*9//10), self.net.CHAIN_LENGTH//10)
+                    counts = p2pool_data.get_desired_version_counts(self.node.tracker,
+                        self.node.tracker.get_nth_parent_hash(previous_share.hash, self.node.net.CHAIN_LENGTH*9//10), self.node.net.CHAIN_LENGTH//10)
                     if counts.get(p2pool_data.NewShare.VERSION, 0) < sum(counts.itervalues())*95//100:
                         share_type = p2pool_data.Share
         
         if True:
             share_info, gentx, other_transaction_hashes, get_share = share_type.generate_transaction(
-                tracker=self.tracker,
+                tracker=self.node.tracker,
                 share_data=dict(
-                    previous_share_hash=self.best_share_var.value,
+                    previous_share_hash=self.node.best_share_var.value,
                     coinbase=(script.create_push_script([
                         self.current_work.value['height'],
                         ] + ([mm_data] if mm_data else []) + [
@@ -222,7 +223,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
                 desired_target=desired_share_target,
                 ref_merkle_link=dict(branch=[], index=0),
                 desired_other_transaction_hashes=tx_hashes,
-                net=self.net,
+                net=self.node.net,
                 known_txs=tx_map,
             )
         
@@ -241,7 +242,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
         target = max(target, share_info['bits'].target)
         for aux_work, index, hashes in mm_later:
             target = max(target, aux_work['target'])
-        target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
+        target = math.clip(target, self.node.net.PARENT.SANE_TARGET_RANGE)
         
         getwork_time = time.time()
         lp_count = self.new_work_event.times
@@ -250,7 +251,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
         print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
             bitcoin_data.target_to_difficulty(target),
             bitcoin_data.target_to_difficulty(share_info['bits'].target),
-            self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
+            self.current_work.value['subsidy']*1e-8, self.node.net.PARENT.SYMBOL,
             len(self.current_work.value['transactions']),
         )
         
@@ -267,13 +268,13 @@ class WorkerBridge(worker_interface.WorkerBridge):
         
         def got_response(header, request):
             header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
-            pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
+            pow_hash = self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
             try:
                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
-                    self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
+                    helper.submit_block(dict(header=header, txs=transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net)
                     if pow_hash <= header['bits'].target:
                         print
-                        print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
+                        print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
                         print
             except:
                 log.err(None, 'Error while processing potential block:')
@@ -326,14 +327,14 @@ class WorkerBridge(worker_interface.WorkerBridge):
                 if not on_time:
                     self.my_doa_share_hashes.add(share.hash)
                 
-                self.tracker.add(share)
+                self.node.tracker.add(share)
                 if not p2pool.DEBUG:
-                    self.tracker.verified.add(share)
-                self.set_best_share()
+                    self.node.tracker.verified.add(share)
+                self.node.set_best_share()
                 
                 try:
                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
-                        self.broadcast_share(share.hash)
+                        self.node.p2p_node.broadcast_share(share.hash)
                 except:
                     log.err(None, 'Error forwarding block solution:')