refactored work computation
authorForrest Voight <forrest@forre.st>
Sat, 9 Jun 2012 18:19:26 +0000 (14:19 -0400)
committerForrest Voight <forrest@forre.st>
Sun, 10 Jun 2012 07:03:35 +0000 (03:03 -0400)
p2pool/main.py
p2pool/util/variable.py
p2pool/web.py

index 1106645..73ae9c4 100644 (file)
@@ -146,45 +146,76 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
         
-        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
+        print 'Initializing work...'
         
-        pre_current_work = variable.Variable(None)
-        pre_merged_work = variable.Variable({})
-        # information affecting work that should trigger a long-polling update
-        current_work = variable.Variable(None)
-        # information affecting work that should not trigger a long-polling update
-        current_work2 = variable.Variable(None)
         
-        requested = expiring_dict.ExpiringDict(300)
+        # BITCOIND WORK
+        
+        bitcoind_work = variable.Variable(None)
         
-        print 'Initializing work...'
         @defer.inlineCallbacks
-        def set_real_work1():
+        def poll_bitcoind():
             work = yield getwork(bitcoind)
-            current_work2.set(dict(
+            bitcoind_work.set(dict(
+                version=work['version'],
+                previous_block=work['previous_block_hash'],
+                bits=work['bits'],
+                coinbaseflags=work['coinbaseflags'],
                 time=work['time'],
                 transactions=work['transactions'],
                 merkle_link=work['merkle_link'],
                 subsidy=work['subsidy'],
                 clock_offset=time.time() - work['time'],
                 last_update=time.time(),
-            )) # second set first because everything hooks on the first
-            pre_current_work.set(dict(
-                version=work['version'],
-                previous_block=work['previous_block_hash'],
-                bits=work['bits'],
-                coinbaseflags=work['coinbaseflags'],
             ))
-        yield set_real_work1()
+        yield poll_bitcoind()
         
-        get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
+        @defer.inlineCallbacks
+        def work_poller():
+            while True:
+                flag = factory.new_block.get_deferred()
+                try:
+                    yield poll_bitcoind()
+                except:
+                    log.err()
+                yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
+        work_poller()
+        
+        # MERGED WORK
+        
+        merged_work = variable.Variable({})
         
-        def set_real_work2():
-            best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
+        @defer.inlineCallbacks
+        def set_merged_work(merged_url, merged_userpass):
+            merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
+            while True:
+                auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
+                merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
+                    hash=int(auxblock['hash'], 16),
+                    target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
+                    merged_proxy=merged_proxy,
+                )}))
+                yield deferral.sleep(1)
+        for merged_url, merged_userpass in merged_urls:
+            set_merged_work(merged_url, merged_userpass)
+        
+        @merged_work.changed.watch
+        def _(new_merged_work):
+            print 'Got new merged mining work!'
+        
+        # COMBINE WORK
+        
+        current_work = variable.Variable(None)
+        
+        get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
+        requested = expiring_dict.ExpiringDict(300)
+        peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
+        def compute_work():
+            best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
             
-            t = dict(pre_current_work.value)
+            t = dict(bitcoind_work.value)
             t['best_share_hash'] = best
-            t['mm_chains'] = pre_merged_work.value
+            t['mm_chains'] = merged_work.value
             current_work.set(t)
             
             t = time.time()
@@ -214,30 +245,22 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     ))[:100],
                 )
                 requested[share_hash] = t, count + 1
-        pre_current_work.changed.watch(lambda _: set_real_work2())
-        pre_merged_work.changed.watch(lambda _: set_real_work2())
-        set_real_work2()
-        print '    ...success!'
-        print
+        bitcoind_work.changed.watch(lambda _: compute_work())
+        merged_work.changed.watch(lambda _: compute_work())
+        compute_work()
         
+        # LONG POLLING
         
-        @defer.inlineCallbacks
-        def set_merged_work(merged_url, merged_userpass):
-            merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
-            while True:
-                auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
-                pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
-                    hash=int(auxblock['hash'], 16),
-                    target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
-                    merged_proxy=merged_proxy,
-                )}))
-                yield deferral.sleep(1)
-        for merged_url, merged_userpass in merged_urls:
-            set_merged_work(merged_url, merged_userpass)
+        lp_signal = variable.Event()
+        
+        @current_work.transitioned.watch
+        def _(before, after):
+            if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']):
+                lp_signal.happened()
         
-        @pre_merged_work.changed.watch
-        def _(new_merged_work):
-            print 'Got new merged mining work!'
+        
+        print '    ...success!'
+        print
         
         # setup p2p logic and join p2pool network
         
@@ -262,7 +285,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
                 
                 if new_count:
-                    set_real_work2()
+                    compute_work()
                 
                 if len(shares) > 5:
                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
@@ -383,7 +406,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         task.LoopingCall(save_addrs).start(60)
         
         best_block = variable.Variable(None)
-        @pre_current_work.changed.watch
+        @bitcoind_work.changed.watch
         def _(work):
             best_block.set(work['previous_block'])
         @best_block.changed.watch
@@ -477,7 +500,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         class WorkerBridge(worker_interface.WorkerBridge):
             def __init__(self):
                 worker_interface.WorkerBridge.__init__(self)
-                self.new_work_event = current_work.changed
+                self.new_work_event = lp_signal
                 self.recent_shares_ts_work = []
             
             def get_user_details(self, request):
@@ -518,7 +541,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
                 if current_work.value['best_share_hash'] is None and net.PERSIST:
                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
-                if time.time() > current_work2.value['last_update'] + 60:
+                if time.time() > current_work.value['last_update'] + 60:
                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
                 
                 if current_work.value['mm_chains']:
@@ -542,7 +565,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
                             nonce=random.randrange(2**32),
                             pubkey_hash=pubkey_hash,
-                            subsidy=current_work2.value['subsidy'],
+                            subsidy=current_work.value['subsidy'],
                             donation=math.perfect_round(65535*args.donation_percentage/100),
                             stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
                                 'orphan' if orphans > orphans_recorded_in_chain else
@@ -552,7 +575,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             desired_version=1,
                         ),
                         block_target=current_work.value['bits'].target,
-                        desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
+                        desired_timestamp=int(time.time() - current_work.value['clock_offset']),
                         desired_target=desired_share_target,
                         ref_merkle_link=dict(branch=[], index=0),
                         net=net,
@@ -571,18 +594,18 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     target = max(target, aux_work['target'])
                 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
                 
-                transactions = [generate_tx] + list(current_work2.value['transactions'])
+                transactions = [generate_tx] + list(current_work.value['transactions'])
                 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
-                merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
+                merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
                 
                 getwork_time = time.time()
-                merkle_link = current_work2.value['merkle_link']
+                merkle_link = current_work.value['merkle_link']
                 
                 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
                     bitcoin_data.target_to_difficulty(target),
                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
-                    current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
-                    len(current_work2.value['transactions']),
+                    current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
+                    len(current_work.value['transactions']),
                 )
                 
                 bits = current_work.value['bits']
@@ -591,7 +614,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     version=current_work.value['version'],
                     previous_block=current_work.value['previous_block'],
                     merkle_root=merkle_root,
-                    timestamp=current_work2.value['time'],
+                    timestamp=current_work.value['time'],
                     bits=current_work.value['bits'],
                     share_target=target,
                 )
@@ -667,7 +690,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                         tracker.add(share)
                         if not p2pool.DEBUG:
                             tracker.verified.add(share)
-                        set_real_work2()
+                        compute_work()
                         
                         try:
                             if pow_hash <= header['bits'].target or p2pool.DEBUG:
@@ -698,9 +721,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 
                 return ba, got_response
         
-        get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
+        get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
         
-        web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
+        web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
         
         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
@@ -712,18 +735,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print
         
         
-        @defer.inlineCallbacks
-        def work_poller():
-            while True:
-                flag = factory.new_block.get_deferred()
-                try:
-                    yield set_real_work1()
-                except:
-                    log.err()
-                yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
-        work_poller()
-        
-        
         # done!
         print 'Started successfully!'
         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
@@ -788,8 +799,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             while True:
                 yield deferral.sleep(3)
                 try:
-                    if time.time() > current_work2.value['last_update'] + 60:
-                        print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
+                    if time.time() > current_work.value['last_update'] + 60:
+                        print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
                     
                     height = tracker.get_height(current_work.value['best_share_hash'])
                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
index b0c0de8..8155e2c 100644 (file)
@@ -55,13 +55,16 @@ class Variable(object):
     def __init__(self, value):
         self.value = value
         self.changed = Event()
+        self.transitioned = Event()
     
     def set(self, value):
         if value == self.value:
             return
         
+        oldvalue = self.value
         self.value = value
         self.changed.happened(value)
+        self.transitioned.happened(oldvalue, value)
     
     def get_not_none(self):
         if self.value is not None:
index 0885b32..9dfb926 100644 (file)
@@ -43,7 +43,7 @@ def _atomic_write(filename, data):
         os.remove(filename)
         os.rename(filename + '.new', filename)
 
-def get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received):
+def get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received):
     start_time = time.time()
     
     web_root = resource.Resource()
@@ -171,7 +171,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
                 dead=stale_doa_shares,
             ),
             uptime=time.time() - start_time,
-            block_value=current_work2.value['subsidy']*1e-8,
+            block_value=current_work.value['subsidy']*1e-8,
             warnings=p2pool_data.get_warnings(tracker, current_work, net),
         )
     
@@ -242,7 +242,7 @@ def get_web_root(tracker, current_work, current_work2, get_current_txouts, datad
             ),
             attempts_to_share=bitcoin_data.target_to_average_attempts(tracker.shares[current_work.value['best_share_hash']].max_target),
             attempts_to_block=bitcoin_data.target_to_average_attempts(current_work.value['bits'].target),
-            block_value=current_work2.value['subsidy']*1e-8,
+            block_value=current_work.value['subsidy']*1e-8,
         ))
         
         with open(os.path.join(datadir_path, 'stats'), 'wb') as f: