indentation and imports cleaned up
[p2pool.git] / p2pool / main.py
index 2fa4ad3..f001cdc 100644 (file)
@@ -12,6 +12,7 @@ import struct
 import sys
 import time
 import json
+import signal
 
 from twisted.internet import defer, reactor
 from twisted.web import server, resource
@@ -78,7 +79,7 @@ def main(args):
         my_script = yield get_payout_script(factory)
         if args.pubkey_hash is None:
             if my_script is None:
-                print 'IP transaction denied ... falling back to sending to address.'
+                print '    IP transaction denied ... falling back to sending to address.'
                 my_script = yield get_payout_script2(bitcoind, args.net)
         else:
             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
@@ -86,7 +87,10 @@ def main(args):
         print '    Payout script:', my_script.encode('hex')
         print
         
-        ht = bitcoin.p2p.HeightTracker(factory)
+        print 'Loading cached block headers...'
+        ht = bitcoin.p2p.HeightTracker(factory, args.net.HEADERSTORE_FILENAME)
+        print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
+        print
         
         tracker = p2pool.OkayTracker(args.net)
         chains = expiring_dict.ExpiringDict(300)
@@ -101,14 +105,13 @@ def main(args):
         current_work2 = variable.Variable(None)
         
         work_updated = variable.Event()
-        tracker_updated = variable.Event()
         
         requested = expiring_dict.ExpiringDict(300)
         
         @defer.inlineCallbacks
         def set_real_work1():
             work, height = yield getwork(bitcoind)
-            # XXX call tracker_updated
+            changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
             current_work.set(dict(
                 version=work.version,
                 previous_block=work.previous_block,
@@ -119,6 +122,8 @@ def main(args):
             current_work2.set(dict(
                 clock_offset=time.time() - work.timestamp,
             ))
+            if changed:
+                set_real_work2()
         
         def set_real_work2():
             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
@@ -129,8 +134,8 @@ def main(args):
             
             t = time.time()
             for peer2, share_hash in desired:
-                #if share_hash not in tracker.tails: # was received in the time tracker.think was running
-                #    continue
+                if share_hash not in tracker.tails: # was received in the time tracker.think was running
+                    continue
                 last_request_time, count = requested.get(share_hash, (None, 0))
                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
                     continue
@@ -159,6 +164,7 @@ def main(args):
         yield set_real_work1()
         set_real_work2()
         print '    ...success!'
+        print
         
         start_time = time.time() - current_work2.value['clock_offset']
         
@@ -204,7 +210,7 @@ def main(args):
                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
             
             if some_new:
-                tracker_updated.happened()
+                set_real_work2()
             
             if len(shares) > 5:
                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
@@ -298,12 +304,13 @@ def main(args):
                     pm = yield portmapper.get_port_mapper()
                     yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
                 except:
-                    log.err()
+                    if p2pool_init.DEBUG:
+                        log.err(None, "UPnP error:")
                 yield deferral.sleep(random.expovariate(1/120))
         
         if args.upnp:
             upnp_thread()
-         
+        
         # start listening for workers with a JSON-RPC server
         
         print 'Listening for workers on port %i...' % (args.worker_port,)
@@ -371,13 +378,14 @@ def main(args):
                 block = dict(header=header, txs=transactions)
                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
-                    print
-                    print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
-                    print
                     if factory.conn.value is not None:
                         factory.conn.value.send_block(block=block)
                     else:
                         print 'No bitcoind connection! Erp!'
+                    if hash_ <= block['header']['target']:
+                        print
+                        print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
+                        print
                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
                 if hash_ > target:
                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
@@ -408,7 +416,7 @@ def main(args):
             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
             res = {}
             for script in sorted(weights, key=lambda s: weights[s]):
-                res[script.encode('hex')] = weights[script]/total_weight
+                res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
             return json.dumps(res)
         
         class WebInterface(resource.Resource):
@@ -491,6 +499,8 @@ def main(args):
         print 'Started successfully!'
         print
         
+        ht.updated.watch(set_real_work2)
+        
         @defer.inlineCallbacks
         def work1_thread():
             while True:
@@ -499,17 +509,16 @@ def main(args):
                     yield set_real_work1()
                 except:
                     log.err()
-                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/20))], fireOnOneCallback=True)
         
         @defer.inlineCallbacks
         def work2_thread():
             while True:
-                flag = tracker_updated.get_deferred()
                 try:
                     set_real_work2()
                 except:
                     log.err()
-                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+                yield deferral.sleep(random.expovariate(1/20))
         
         work1_thread()
         work2_thread()
@@ -517,12 +526,12 @@ def main(args):
         counter = skiplists.CountsSkipList(tracker, run_identifier)
         
         while True:
-            yield deferral.sleep(random.expovariate(1/1))
+            yield deferral.sleep(3)
             try:
                 if current_work.value['best_share_hash'] is not None:
                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
-                    if height > 5:
-                        att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
+                    if height > 2:
+                        att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
                         matching_in_chain = counter(current_work.value['best_share_hash'], height)
                         shares_in_chain = my_shares & matching_in_chain
@@ -535,7 +544,7 @@ def main(args):
                             len(shares_in_chain) + len(stale_shares),
                             len(stale_shares),
                             len(p2p_node.peers),
-                        )
+                        ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
                         #for k, v in weights.iteritems():
                         #    print k.encode('hex'), v/total_weight
@@ -602,6 +611,17 @@ def run():
     
     if args.debug:
         p2pool_init.DEBUG = True
+        class ReopeningFile(object):
+            def __init__(self, *open_args, **open_kwargs):
+                self.open_args, self.open_kwargs = open_args, open_kwargs
+                self.inner_file = open(*self.open_args, **self.open_kwargs)
+            def reopen(self):
+                self.inner_file.close()
+                self.inner_file = open(*self.open_args, **self.open_kwargs)
+            def write(self, data):
+                self.inner_file.write(data)
+            def flush(self):
+                self.inner_file.flush()
         class TeePipe(object):
             def __init__(self, outputs):
                 self.outputs = outputs
@@ -625,7 +645,14 @@ def run():
                 self.buf = lines[-1]
             def flush(self):
                 pass
-        sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
+        logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
+        sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
+        if hasattr(signal, "SIGUSR1"):
+            def sigusr1(signum, frame):
+                print '''Caught SIGUSR1, closing 'debug.log'...'''
+                logfile.reopen()
+                print '''...and reopened 'debug.log' after catching SIGUSR1.'''
+            signal.signal(signal.SIGUSR1, sigusr1)
     
     if args.bitcoind_p2p_port is None:
         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT