separated best_share_hash and mm_chains from current_work
[p2pool.git] / p2pool / main.py
index 2489ed7..d44db83 100644 (file)
@@ -49,7 +49,7 @@ def getwork(bitcoind):
     ))
 
 class WorkerBridge(worker_interface.WorkerBridge):
-    def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes):
+    def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes):
         worker_interface.WorkerBridge.__init__(self)
         self.new_work_event = lp_signal
         self.recent_shares_ts_work = []
@@ -58,13 +58,15 @@ class WorkerBridge(worker_interface.WorkerBridge):
         self.net = net
         self.donation_percentage = donation_percentage
         self.current_work = current_work
+        self.merged_work = merged_work
+        self.best_share_var = best_share_var
         self.tracker = tracker
         self.my_share_hashes = my_share_hashes
         self.my_doa_share_hashes = my_doa_share_hashes
         self.worker_fee = worker_fee
         self.p2p_node = p2p_node
         self.submit_block = submit_block
-        self.compute_work = compute_work
+        self.set_best_share = set_best_share
         self.shared_share_hashes = shared_share_hashes
         
         self.pseudoshare_received = variable.Event()
@@ -76,21 +78,21 @@ class WorkerBridge(worker_interface.WorkerBridge):
         
         @tracker.verified.removed.watch
         def _(share):
-            if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
+            if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
                 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
                 self.removed_unstales_var.set((
                     self.removed_unstales_var.value[0] + 1,
                     self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
                     self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
                 ))
-            if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
+            if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
                 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
     
     def get_stale_counts(self):
         '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
         my_shares = len(self.my_share_hashes)
         my_doa_shares = len(self.my_doa_share_hashes)
-        delta = self.tracker.verified.get_delta_to_last(self.current_work.value['best_share_hash'])
+        delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
         my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
         my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
         orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
@@ -137,20 +139,20 @@ class WorkerBridge(worker_interface.WorkerBridge):
     def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
         if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
             raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
-        if self.current_work.value['best_share_hash'] is None and self.net.PERSIST:
+        if self.best_share_var.value is None and self.net.PERSIST:
             raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
         if time.time() > self.current_work.value['last_update'] + 60:
             raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
         
-        if self.current_work.value['mm_chains']:
-            tree, size = bitcoin_data.make_auxpow_tree(self.current_work.value['mm_chains'])
-            mm_hashes = [self.current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
+        if self.merged_work.value:
+            tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
+            mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
             mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
                 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
                 size=size,
                 nonce=0,
             ))
-            mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.current_work.value['mm_chains'].iteritems()]
+            mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
         else:
             mm_data = ''
             mm_later = []
@@ -159,7 +161,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
             share_info, generate_tx = p2pool_data.Share.generate_transaction(
                 tracker=self.tracker,
                 share_data=dict(
-                    previous_share_hash=self.current_work.value['best_share_hash'],
+                    previous_share_hash=self.best_share_var.value,
                     coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
                     nonce=random.randrange(2**32),
                     pubkey_hash=pubkey_hash,
@@ -188,7 +190,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
         else:
             target = desired_pseudoshare_target
         target = max(target, share_info['bits'].target)
-        for aux_work in self.current_work.value['mm_chains'].itervalues():
+        for aux_work in self.merged_work.value.itervalues():
             target = max(target, aux_work['target'])
         target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
         
@@ -237,7 +239,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
             assert header['previous_block'] == previous_block
             assert header['bits'] == bits
             
-            on_time = self.current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
+            on_time = self.best_share_var.value == share_info['share_data']['previous_share_hash']
             
             for aux_work, index, hashes in mm_later:
                 try:
@@ -288,7 +290,7 @@ class WorkerBridge(worker_interface.WorkerBridge):
                 self.tracker.add(share)
                 if not p2pool.DEBUG:
                     self.tracker.verified.add(share)
-                self.compute_work()
+                self.set_best_share()
                 
                 try:
                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
@@ -509,13 +511,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # COMBINE WORK
         
         current_work = variable.Variable(None)
-        
-        get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
-        requested = expiring_dict.ExpiringDict(300)
-        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
         def compute_work():
-            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
-            
             t = dict(bitcoind_work.value)
             
             if (best_block_header.value is not None and
@@ -536,9 +532,22 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     last_update=current_work.value['last_update'],
                 )
             
-            t['best_share_hash'] = best
-            t['mm_chains'] = merged_work.value
             current_work.set(t)
+        bitcoind_work.changed.watch(lambda _: compute_work())
+        best_block_header.changed.watch(lambda _: compute_work())
+        compute_work()
+        
+        # BEST SHARE
+        
+        get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
+        requested = expiring_dict.ExpiringDict(300)
+        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
+        
+        best_share_var = variable.Variable(None)
+        def set_best_share():
+            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
+            
+            best_share_var.set(best)
             
             t = time.time()
             for peer2, share_hash in desired:
@@ -567,10 +576,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     ))[:100],
                 )
                 requested[share_hash] = t, count + 1
-        bitcoind_work.changed.watch(lambda _: compute_work())
-        merged_work.changed.watch(lambda _: compute_work())
-        best_block_header.changed.watch(lambda _: compute_work())
-        compute_work()
+        bitcoind_work.changed.watch(lambda _: set_best_share())
+        set_best_share()
         
         # LONG POLLING
         
@@ -579,8 +586,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         @current_work.transitioned.watch
         def _(before, after):
             # trigger LP if version/previous_block/bits changed or transactions changed from nothing
-            if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
+            if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
                 lp_signal.happened()
+        merged_work.changed.watch(lambda _: lp_signal.happened())
+        best_share_var.changed.watch(lambda _: lp_signal.happened())
         
         
         print '    ...success!'
@@ -609,7 +618,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
                 
                 if new_count:
-                    compute_work()
+                    set_best_share()
                 
                 if len(shares) > 5:
                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
@@ -720,7 +729,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 log.err()
         
         p2p_node = Node(
-            best_share_hash_func=lambda: current_work.value['best_share_hash'],
+            best_share_hash_func=lambda: best_share_var.value,
             port=args.p2pool_port,
             net=net,
             addr_store=addrs,
@@ -751,10 +760,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 peer.sendShares([share for share in shares if share.peer is not peer])
         
         # send share when the chain changes to their chain
-        current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
+        best_share_var.changed.watch(broadcast_share)
         
         def save_shares():
-            for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
+            for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
                 ss.add_share(share)
                 if share.hash in tracker.verified.shares:
                     ss.add_verified_hash(share.hash)
@@ -784,10 +793,10 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
         
-        get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
+        get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, current_work.value['bits'].target, current_work.value['subsidy'], net)
         
-        wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes)
-        web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received)
+        wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes)
+        web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
@@ -866,7 +875,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     if time.time() > current_work.value['last_update'] + 60:
                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
                     
-                    height = tracker.get_height(current_work.value['best_share_hash'])
+                    height = tracker.get_height(best_share_var.value)
                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
                         height,
                         len(tracker.verified.shares),
@@ -881,13 +890,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                         math.format(int(my_att_s)),
                         math.format_dt(dt),
                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
-                        math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
+                        math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
                     )
                     
                     if height > 2:
                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
-                        stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
-                        real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
+                        stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
+                        real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
                         
                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
                             shares, stale_orphan_shares, stale_doa_shares,
@@ -901,7 +910,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
                         )
                         
-                        for warning in p2pool_data.get_warnings(tracker, current_work, net):
+                        for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
                             print >>sys.stderr, '#'*40
                             print >>sys.stderr, '>>> Warning: ' + warning
                             print >>sys.stderr, '#'*40