warn user about bitcoin connection slots if p2p connect takes longer than 5 seconds
[p2pool.git] / p2pool / main.py
index 1f09ae3..fdd3d65 100644 (file)
@@ -1,6 +1,7 @@
 from __future__ import division
 
 import base64
+import gc
 import json
 import os
 import random
@@ -13,14 +14,14 @@ import urlparse
 if '--iocp' in sys.argv:
     from twisted.internet import iocpreactor
     iocpreactor.install()
-from twisted.internet import defer, reactor, protocol, task
+from twisted.internet import defer, reactor, protocol, tcp
 from twisted.web import server
 from twisted.python import log
 from nattraverso import portmapper, ipdiscover
 
 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
-from bitcoin import worker_interface, helper
-from util import fixargparse, jsonrpc, variable, deferral, math, logging
+from bitcoin import stratum, worker_interface, helper
+from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol
 from . import networks, web, work
 import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
 
@@ -36,7 +37,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
             factory = bitcoin_p2p.ClientFactory(net.PARENT)
             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
+            def long():
+                print '''    ...taking a while. Common reasons for this include all of bitcoind's connection slots being used...'''
+            long_dc = reactor.callLater(5, long)
             yield factory.getProtocol() # waits until handshake is successful
+            if not long_dc.called: long_dc.cancel()
             print '    ...success!'
             print
             defer.returnValue(factory)
@@ -47,7 +52,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # connect to bitcoind over JSON-RPC and do initial getmemorypool
         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
-        bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
+        bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
         yield helper.check(bitcoind, net)
         temp_work = yield helper.getwork(bitcoind)
         
@@ -57,7 +62,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
             bitcoind_warning_var.set(errors if errors != '' else None)
         yield poll_warnings()
-        task.LoopingCall(poll_warnings).start(20*60)
+        deferral.RobustLoopingCall(poll_warnings).start(20*60)
         
         print '    ...success!'
         print '    Current block hash: %x' % (temp_work['previous_block'],)
@@ -97,20 +102,15 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
         print
         
-        ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
+        print "Loading shares..."
         shares = {}
         known_verified = set()
-        print "Loading shares..."
-        for i, (mode, contents) in enumerate(ss.get_shares()):
-            if mode == 'share':
-                contents.time_seen = 0
-                shares[contents.hash] = contents
-                if len(shares) % 1000 == 0 and shares:
-                    print "    %i" % (len(shares),)
-            elif mode == 'verified_hash':
-                known_verified.add(contents)
-            else:
-                raise AssertionError()
+        def share_cb(share):
+            share.time_seen = 0 # XXX
+            shares[share.hash] = share
+            if len(shares) % 1000 == 0 and shares:
+                print "    %i" % (len(shares),)
+        ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net, share_cb, known_verified.add)
         print "    ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified))
         print
         
@@ -126,7 +126,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         for share_hash in known_verified:
             if share_hash not in node.tracker.verified.items:
                 ss.forget_verified_share(share_hash)
-        del shares, known_verified
         node.tracker.removed.watch(lambda share: ss.forget_share(share.hash))
         node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
         
@@ -135,7 +134,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 ss.add_share(share)
                 if share.hash in node.tracker.verified.items:
                     ss.add_verified_hash(share.hash)
-        task.LoopingCall(save_shares).start(60)
+        deferral.RobustLoopingCall(save_shares).start(60)
         
         print '    ...success!'
         print
@@ -144,12 +143,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
         
         @defer.inlineCallbacks
-        def parse(x):
-            if ':' in x:
-                ip, port = x.split(':')
-                defer.returnValue(((yield reactor.resolve(ip)), int(port)))
-            else:
-                defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
+        def parse(host):
+            port = net.P2P_PORT
+            if ':' in host:
+                host, port_str = host.split(':')
+                port = int(port_str)
+            defer.returnValue(((yield reactor.resolve(host)), port))
         
         addrs = {}
         if os.path.exists(os.path.join(datadir_path, 'addrs')):
@@ -185,7 +184,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         def save_addrs():
             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
                 f.write(json.dumps(node.p2p_node.addr_store.items()))
-        task.LoopingCall(save_addrs).start(60)
+        deferral.RobustLoopingCall(save_addrs).start(60)
         
         print '    ...success!'
         print
@@ -213,9 +212,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         
         wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee)
         web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
-        worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
+        caching_wb = worker_interface.CachingWorkerBridge(wb)
+        worker_interface.WorkerInterface(caching_wb).attach_to(web_root, get_handler=lambda request: request.redirect('static/'))
+        web_serverfactory = server.Site(web_root)
         
-        deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
+        
+        serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(caching_wb)}, web_serverfactory)
+        deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0])
         
         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
             pass
@@ -227,9 +230,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         # done!
         print 'Started successfully!'
         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
-        if args.donation_percentage > 0.51:
+        if args.donation_percentage > 1.1:
             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
-        elif args.donation_percentage < 0.49:
+        elif args.donation_percentage < .9:
             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
         else:
             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
@@ -242,7 +245,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
             ))
             signal.siginterrupt(signal.SIGALRM, False)
-            task.LoopingCall(signal.alarm, 30).start(1)
+            deferral.RobustLoopingCall(signal.alarm, 30).start(1)
         
         if args.irc_announce:
             from twisted.words.protocols import irc
@@ -306,11 +309,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                     
                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
                     my_att_s = sum(datum['work']/dt for datum in datums)
+                    my_shares_per_s = sum(datum['work']/dt/bitcoin_data.target_to_average_attempts(datum['share_target']) for datum in datums)
                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
                         math.format(int(my_att_s)),
                         math.format_dt(dt),
                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
-                        math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???',
+                        math.format_dt(1/my_shares_per_s) if my_shares_per_s else '???',
                     )
                     
                     if height > 2:
@@ -334,6 +338,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                             print >>sys.stderr, '#'*40
                             print >>sys.stderr, '>>> Warning: ' + warning
                             print >>sys.stderr, '#'*40
+                        
+                        if gc.garbage:
+                            print '%i pieces of uncollectable cyclic garbage! Types: %r' % (len(gc.garbage), map(type, gc.garbage))
                     
                     if this_str != last_str or time.time() > last_time + 15:
                         print this_str
@@ -347,6 +354,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         log.err(None, 'Fatal error:')
 
 def run():
+    if not hasattr(tcp.Client, 'abortConnection'):
+        print "Twisted doesn't have abortConnection! Upgrade to a newer version of Twisted to avoid memory leaks!"
+        print 'Pausing for 3 seconds...'
+        time.sleep(3)
+    
     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
     
     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
@@ -373,8 +385,8 @@ def run():
         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
         type=str, action='append', default=[], dest='merged_urls')
     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
-        help='donate this percentage of work towards the development of p2pool (default: 0.5)',
-        type=float, action='store', default=0.5, dest='donation_percentage')
+        help='donate this percentage of work towards the development of p2pool (default: 1.0)',
+        type=float, action='store', default=1.0, dest='donation_percentage')
     parser.add_argument('--iocp',
         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
         action='store_true', default=False, dest='iocp')
@@ -433,6 +445,8 @@ def run():
     if args.debug:
         p2pool.DEBUG = True
         defer.setDebugging(True)
+    else:
+        p2pool.DEBUG = False
     
     net_name = args.net_name + ('_testnet' if args.testnet else '')
     net = networks.nets[net_name]
@@ -527,7 +541,7 @@ def run():
             logfile.reopen()
             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
         signal.signal(signal.SIGUSR1, sigusr1)
-    task.LoopingCall(logfile.reopen).start(5)
+    deferral.RobustLoopingCall(logfile.reopen).start(5)
     
     class ErrorReporter(object):
         def __init__(self):