indentation and imports cleaned up
[p2pool.git] / p2pool / main.py
index 1e943ee..f001cdc 100644 (file)
@@ -11,14 +11,17 @@ import sqlite3
 import struct
 import sys
 import time
+import json
+import signal
 
 from twisted.internet import defer, reactor
-from twisted.web import server
+from twisted.web import server, resource
 from twisted.python import log
+from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
-from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
-from . import p2p, worker_interface
+from util import db, expiring_dict, jsonrpc, variable, deferral, math
+from . import p2p, worker_interface, skiplists
 import p2pool.data as p2pool
 import p2pool as p2pool_init
 
@@ -40,9 +43,9 @@ def getwork(bitcoind):
 def get_payout_script(factory):
     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
     if res['reply'] == 'success':
-        my_script = res['script']
+        defer.returnValue(res['script'])
     elif res['reply'] == 'denied':
-        my_script = None
+        defer.returnValue(None)
     else:
         raise ValueError('Unexpected reply: %r' % (res,))
 
@@ -54,6 +57,9 @@ def get_payout_script2(bitcoind, net):
 @defer.inlineCallbacks
 def main(args):
     try:
+        if args.charts:
+            from . import draw
+        
         print 'p2pool (version %s)' % (p2pool_init.__version__,)
         print
         
@@ -73,7 +79,7 @@ def main(args):
         my_script = yield get_payout_script(factory)
         if args.pubkey_hash is None:
             if my_script is None:
-                print 'IP transaction denied ... falling back to sending to address.'
+                print '    IP transaction denied ... falling back to sending to address.'
                 my_script = yield get_payout_script2(bitcoind, args.net)
         else:
             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
@@ -81,7 +87,10 @@ def main(args):
         print '    Payout script:', my_script.encode('hex')
         print
         
-        ht = bitcoin.p2p.HeightTracker(factory)
+        print 'Loading cached block headers...'
+        ht = bitcoin.p2p.HeightTracker(factory, args.net.HEADERSTORE_FILENAME)
+        print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
+        print
         
         tracker = p2pool.OkayTracker(args.net)
         chains = expiring_dict.ExpiringDict(300)
@@ -96,14 +105,13 @@ def main(args):
         current_work2 = variable.Variable(None)
         
         work_updated = variable.Event()
-        tracker_updated = variable.Event()
         
         requested = expiring_dict.ExpiringDict(300)
         
         @defer.inlineCallbacks
         def set_real_work1():
             work, height = yield getwork(bitcoind)
-            # XXX call tracker_updated
+            changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
             current_work.set(dict(
                 version=work.version,
                 previous_block=work.previous_block,
@@ -114,18 +122,22 @@ def main(args):
             current_work2.set(dict(
                 clock_offset=time.time() - work.timestamp,
             ))
+            if changed:
+                set_real_work2()
         
-        @defer.inlineCallbacks
         def set_real_work2():
-            best, desired = yield tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
+            best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
             
             t = dict(current_work.value)
             t['best_share_hash'] = best
             current_work.set(t)
             
+            t = time.time()
             for peer2, share_hash in desired:
+                if share_hash not in tracker.tails: # was received in the time tracker.think was running
+                    continue
                 last_request_time, count = requested.get(share_hash, (None, 0))
-                if last_request_time is not None and last_request_time - 5 < time.time() < last_request_time + 10 * 1.5**count:
+                if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
                     continue
                 potential_peers = set()
                 for head in tracker.tails[share_hash]:
@@ -146,12 +158,13 @@ def main(args):
                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
                     ))[:100],
                 )
-                requested[share_hash] = time.time(), count + 1
+                requested[share_hash] = t, count + 1
         
         print 'Initializing work...'
         yield set_real_work1()
-        yield set_real_work2()
+        set_real_work2()
         print '    ...success!'
+        print
         
         start_time = time.time() - current_work2.value['clock_offset']
         
@@ -161,6 +174,8 @@ def main(args):
             for peer in p2p_node.peers.itervalues():
                 if peer is ignore_peer:
                     continue
+                #if p2pool_init.DEBUG:
+                #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
                 peer.send_shares([share])
             share.flag_shared()
         
@@ -195,19 +210,23 @@ def main(args):
                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
             
             if some_new:
-                tracker_updated.happened()
+                set_real_work2()
             
             if len(shares) > 5:
                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
         
         def p2p_share_hashes(share_hashes, peer):
+            t = time.time()
             get_hashes = []
             for share_hash in share_hashes:
                 if share_hash in tracker.shares:
-                    pass # print 'Got share hash, already have, ignoring. Hash: %s' % (p2pool.format_hash(share_hash),)
-                else:
-                    print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
-                    get_hashes.append(share_hash)
+                    continue
+                last_request_time, count = requested.get(share_hash, (None, 0))
+                if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
+                    continue
+                print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
+                get_hashes.append(share_hash)
+                requested[share_hash] = t, count + 1
             
             if share_hashes and peer is not None:
                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
@@ -234,14 +253,20 @@ def main(args):
             else:
                 return x, args.net.P2P_PORT
         
-        nodes = [
+        nodes = set([
             ('72.14.191.28', args.net.P2P_PORT),
             ('62.204.197.159', args.net.P2P_PORT),
-        ]
-        try:
-            nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
-        except:
-            log.err(None, 'Error resolving bootstrap node IP:')
+            ('142.58.248.28', args.net.P2P_PORT),
+            ('94.23.34.145', args.net.P2P_PORT),
+        ])
+        for host in [
+            'p2pool.forre.st',
+            'dabuttonfactory.com',
+        ]:
+            try:
+                nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
+            except:
+                log.err(None, 'Error resolving bootstrap node IP:')
         
         p2p_node = p2p.Node(
             current_work=current_work,
@@ -249,7 +274,7 @@ def main(args):
             net=args.net,
             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
             mode=0 if args.low_bandwidth else 1,
-            preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
+            preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
         )
         p2p_node.handle_shares = p2p_shares
         p2p_node.handle_share_hashes = p2p_share_hashes
@@ -269,6 +294,23 @@ def main(args):
         print '    ...success!'
         print
         
+        @defer.inlineCallbacks
+        def upnp_thread():
+            while True:
+                try:
+                    is_lan, lan_ip = yield ipdiscover.get_local_ip()
+                    if not is_lan:
+                        continue
+                    pm = yield portmapper.get_port_mapper()
+                    yield pm._upnp.add_port_mapping(lan_ip, args.net.P2P_PORT, args.net.P2P_PORT, 'p2pool', 'TCP')
+                except:
+                    if p2pool_init.DEBUG:
+                        log.err(None, "UPnP error:")
+                yield deferral.sleep(random.expovariate(1/120))
+        
+        if args.upnp:
+            upnp_thread()
+        
         # start listening for workers with a JSON-RPC server
         
         print 'Listening for workers on port %i...' % (args.worker_port,)
@@ -278,7 +320,9 @@ def main(args):
         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
         run_identifier = struct.pack('<Q', random.randrange(2**64))
         
-        def compute(state, all_targets):
+        def compute(state, payout_script):
+            if payout_script is None:
+                payout_script = my_script
             if state['best_share_hash'] is None and args.net.PERSIST:
                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
@@ -296,7 +340,7 @@ def main(args):
             generate_tx = p2pool.generate_transaction(
                 tracker=tracker,
                 previous_share_hash=state['best_share_hash'],
-                new_script=my_script,
+                new_script=payout_script,
                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
                 block_target=state['target'],
@@ -316,8 +360,6 @@ def main(args):
                     print 'Toff', timestamp2 - timestamp
                     timestamp = timestamp2
             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
-            if not all_targets:
-                target2 = min(2**256//2**32 - 1, target2)
             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
@@ -336,13 +378,14 @@ def main(args):
                 block = dict(header=header, txs=transactions)
                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
-                    print
-                    print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
-                    print
                     if factory.conn.value is not None:
                         factory.conn.value.send_block(block=block)
                     else:
                         print 'No bitcoind connection! Erp!'
+                    if hash_ <= block['header']['target']:
+                        print
+                        print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
+                        print
                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
                 if hash_ > target:
                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
@@ -359,22 +402,37 @@ def main(args):
                 log.err(None, 'Error processing data received from worker:')
                 return False
         
+        web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
+        
         def get_rate():
             if current_work.value['best_share_hash'] is not None:
                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
-                return att_s
+                return json.dumps(att_s)
+            return json.dumps(None)
         
         def get_users():
             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
             res = {}
             for script in sorted(weights, key=lambda s: weights[s]):
-                res[script.encode('hex')] = weights[script]/total_weight
-            return res
-
+                res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
+            return json.dumps(res)
+        
+        class WebInterface(resource.Resource):
+            def __init__(self, func, mime_type):
+                self.func, self.mime_type = func, mime_type
+            
+            def render_GET(self, request):
+                request.setHeader('Content-Type', self.mime_type)
+                return self.func()
+        
+        web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
+        web_root.putChild('users', WebInterface(get_users, 'application/json'))
+        if args.charts:
+            web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
         
-        reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate, get_users)))
+        reactor.listenTCP(args.worker_port, server.Site(web_root))
         
         print '    ...success!'
         print
@@ -441,6 +499,8 @@ def main(args):
         print 'Started successfully!'
         print
         
+        ht.updated.watch(set_real_work2)
+        
         @defer.inlineCallbacks
         def work1_thread():
             while True:
@@ -449,40 +509,42 @@ def main(args):
                     yield set_real_work1()
                 except:
                     log.err()
-                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/20))], fireOnOneCallback=True)
         
         @defer.inlineCallbacks
         def work2_thread():
             while True:
-                flag = tracker_updated.get_deferred()
                 try:
-                    yield set_real_work2()
+                    set_real_work2()
                 except:
                     log.err()
-                yield defer.DeferredList([flag, deferral.sleep(random.expovariate(1/1))], fireOnOneCallback=True)
+                yield deferral.sleep(random.expovariate(1/20))
         
         work1_thread()
         work2_thread()
         
-        counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
+        counter = skiplists.CountsSkipList(tracker, run_identifier)
         
         while True:
-            yield deferral.sleep(random.expovariate(1/1))
+            yield deferral.sleep(3)
             try:
                 if current_work.value['best_share_hash'] is not None:
                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
-                    if height > 5:
-                        att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
+                    if height > 2:
+                        att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
-                        count = counter(current_work.value['best_share_hash'], height, 2**100)
-                        print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale)' % (
+                        matching_in_chain = counter(current_work.value['best_share_hash'], height)
+                        shares_in_chain = my_shares & matching_in_chain
+                        stale_shares = my_shares - matching_in_chain
+                        print 'Pool: %sH/s in %i shares Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
                             math.format(att_s),
                             height,
                             weights.get(my_script, 0)/total_weight*100,
                             math.format(weights.get(my_script, 0)/total_weight*att_s),
-                            len(my_shares),
-                            len(my_shares) - count,
-                        )
+                            len(shares_in_chain) + len(stale_shares),
+                            len(stale_shares),
+                            len(p2p_node.peers),
+                        ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
                         #for k, v in weights.iteritems():
                         #    print k.encode('hex'), v/total_weight
@@ -504,6 +566,9 @@ def run():
     parser.add_argument('-a', '--address',
         help='generate to this address (defaults to requesting one from bitcoind)',
         type=str, action='store', default=None, dest='address')
+    parser.add_argument('--charts',
+        help='generate charts on the web interface (requires PIL and pygame)',
+        action='store_const', const=True, default=False, dest='charts')
     
     p2pool_group = parser.add_argument_group('p2pool interface')
     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
@@ -515,6 +580,9 @@ def run():
     parser.add_argument('-l', '--low-bandwidth',
         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
         action='store_true', default=False, dest='low_bandwidth')
+    parser.add_argument('--disable-upnp',
+        help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
+        action='store_false', default=True, dest='upnp')
     
     worker_group = parser.add_argument_group('worker interface')
     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
@@ -543,6 +611,17 @@ def run():
     
     if args.debug:
         p2pool_init.DEBUG = True
+        class ReopeningFile(object):
+            def __init__(self, *open_args, **open_kwargs):
+                self.open_args, self.open_kwargs = open_args, open_kwargs
+                self.inner_file = open(*self.open_args, **self.open_kwargs)
+            def reopen(self):
+                self.inner_file.close()
+                self.inner_file = open(*self.open_args, **self.open_kwargs)
+            def write(self, data):
+                self.inner_file.write(data)
+            def flush(self):
+                self.inner_file.flush()
         class TeePipe(object):
             def __init__(self, outputs):
                 self.outputs = outputs
@@ -566,7 +645,14 @@ def run():
                 self.buf = lines[-1]
             def flush(self):
                 pass
-        sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, open(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')]))
+        logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
+        sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
+        if hasattr(signal, "SIGUSR1"):
+            def sigusr1(signum, frame):
+                print '''Caught SIGUSR1, closing 'debug.log'...'''
+                logfile.reopen()
+                print '''...and reopened 'debug.log' after catching SIGUSR1.'''
+            signal.signal(signal.SIGUSR1, sigusr1)
     
     if args.bitcoind_p2p_port is None:
         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT