X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fmain.py;h=b03eee7d2a77711fd1ddc394af68545372979388;hb=2664f794;hp=998b42619b42c7069dca306016e230aee08de513;hpb=38bda9cbbf508fe8f8e5701e1d7cc644fad5f6ec;p=p2pool.git diff --git a/p2pool/main.py b/p2pool/main.py index 998b426..b03eee7 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -1,6 +1,7 @@ from __future__ import division import base64 +import gc import json import os import random @@ -13,14 +14,14 @@ import urlparse if '--iocp' in sys.argv: from twisted.internet import iocpreactor iocpreactor.install() -from twisted.internet import defer, reactor, protocol, task +from twisted.internet import defer, reactor, protocol, tcp from twisted.web import server from twisted.python import log from nattraverso import portmapper, ipdiscover import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data -from bitcoin import worker_interface, helper -from util import fixargparse, jsonrpc, variable, deferral, math, logging +from bitcoin import stratum, worker_interface, helper +from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol from . import networks, web, work import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node @@ -57,7 +58,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors'] bitcoind_warning_var.set(errors if errors != '' else None) yield poll_warnings() - task.LoopingCall(poll_warnings).start(20*60) + deferral.RobustLoopingCall(poll_warnings).start(20*60) print ' ...success!' print ' Current block hash: %x' % (temp_work['previous_block'],) @@ -97,20 +98,15 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT) print - ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net) + print "Loading shares..." shares = {} known_verified = set() - print "Loading shares..." - for i, (mode, contents) in enumerate(ss.get_shares()): - if mode == 'share': - contents.time_seen = 0 - shares[contents.hash] = contents - if len(shares) % 1000 == 0 and shares: - print " %i" % (len(shares),) - elif mode == 'verified_hash': - known_verified.add(contents) - else: - raise AssertionError() + def share_cb(share): + share.time_seen = 0 # XXX + shares[share.hash] = share + if len(shares) % 1000 == 0 and shares: + print " %i" % (len(shares),) + ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net, share_cb, known_verified.add) print " ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified)) print @@ -126,7 +122,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): for share_hash in known_verified: if share_hash not in node.tracker.verified.items: ss.forget_verified_share(share_hash) - del shares, known_verified node.tracker.removed.watch(lambda share: ss.forget_share(share.hash)) node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash)) @@ -135,7 +130,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): ss.add_share(share) if share.hash in node.tracker.verified.items: ss.add_verified_hash(share.hash) - task.LoopingCall(save_shares).start(60) + deferral.RobustLoopingCall(save_shares).start(60) print ' ...success!' print @@ -144,12 +139,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'Joining p2pool network using port %i...' % (args.p2pool_port,) @defer.inlineCallbacks - def parse(x): - if ':' in x: - ip, port = x.split(':') - defer.returnValue(((yield reactor.resolve(ip)), int(port))) - else: - defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT)) + def parse(host): + port = net.P2P_PORT + if ':' in host: + host, port_str = host.split(':') + port = int(port_str) + defer.returnValue(((yield reactor.resolve(host)), port)) addrs = {} if os.path.exists(os.path.join(datadir_path, 'addrs')): @@ -185,7 +180,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): def save_addrs(): with open(os.path.join(datadir_path, 'addrs'), 'wb') as f: f.write(json.dumps(node.p2p_node.addr_store.items())) - task.LoopingCall(save_addrs).start(60) + deferral.RobustLoopingCall(save_addrs).start(60) print ' ...success!' print @@ -213,9 +208,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee) web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var) - worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/')) + caching_wb = worker_interface.CachingWorkerBridge(wb) + worker_interface.WorkerInterface(caching_wb).attach_to(web_root, get_handler=lambda request: request.redirect('./static/')) + web_serverfactory = server.Site(web_root) - deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0]) + + serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(caching_wb)}, web_serverfactory) + deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0]) with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f: pass @@ -227,9 +226,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # done! print 'Started successfully!' print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],) - if args.donation_percentage > 0.51: + if args.donation_percentage > 1.1: print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,) - elif args.donation_percentage < 0.49: + elif args.donation_percentage < .9: print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,) else: print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,) @@ -242,7 +241,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack()) )) signal.siginterrupt(signal.SIGALRM, False) - task.LoopingCall(signal.alarm, 30).start(1) + deferral.RobustLoopingCall(signal.alarm, 30).start(1) if args.irc_announce: from twisted.words.protocols import irc @@ -306,11 +305,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): datums, dt = wb.local_rate_monitor.get_datums_in_last() my_att_s = sum(datum['work']/dt for datum in datums) + my_shares_per_s = sum(datum['work']/dt/bitcoin_data.target_to_average_attempts(datum['share_target']) for datum in datums) this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % ( math.format(int(my_att_s)), math.format_dt(dt), math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95), - math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???', + math.format_dt(1/my_shares_per_s) if my_shares_per_s else '???', ) if height > 2: @@ -334,6 +334,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print >>sys.stderr, '#'*40 print >>sys.stderr, '>>> Warning: ' + warning print >>sys.stderr, '#'*40 + + if gc.garbage: + print '%i pieces of uncollectable cyclic garbage! Types: %r' % (len(gc.garbage), map(type, gc.garbage)) if this_str != last_str or time.time() > last_time + 15: print this_str @@ -347,6 +350,11 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): log.err(None, 'Fatal error:') def run(): + if not hasattr(tcp.Client, 'abortConnection'): + print "Twisted doesn't have abortConnection! Upgrade to a newer version of Twisted to avoid memory leaks!" + print 'Pausing for 3 seconds...' + time.sleep(3) + realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name) parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@') @@ -373,8 +381,8 @@ def run(): help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)', type=str, action='append', default=[], dest='merged_urls') parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE', - help='donate this percentage of work towards the development of p2pool (default: 0.5)', - type=float, action='store', default=0.5, dest='donation_percentage') + help='donate this percentage of work towards the development of p2pool (default: 1.0)', + type=float, action='store', default=1.0, dest='donation_percentage') parser.add_argument('--iocp', help='use Windows IOCP API in order to avoid errors due to large number of sockets being open', action='store_true', default=False, dest='iocp') @@ -529,7 +537,7 @@ def run(): logfile.reopen() print '...and reopened %r after catching SIGUSR1.' % (args.logfile,) signal.signal(signal.SIGUSR1, sigusr1) - task.LoopingCall(logfile.reopen).start(5) + deferral.RobustLoopingCall(logfile.reopen).start(5) class ErrorReporter(object): def __init__(self):