X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fmain.py;h=94c89c9e2f6be00b2f7755e0db839a25b76e6023;hb=f732111a6e08d7d0649c330d1c703535a8ea80b5;hp=098eb3a47dc8d9ec2d8bfb2461e3794b74cef455;hpb=63e135b882578581c9fa8c09e0249bed822a4f0d;p=p2pool.git diff --git a/p2pool/main.py b/p2pool/main.py index 098eb3a..94c89c9 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -1,6 +1,7 @@ from __future__ import division import base64 +import gc import json import os import random @@ -13,14 +14,14 @@ import urlparse if '--iocp' in sys.argv: from twisted.internet import iocpreactor iocpreactor.install() -from twisted.internet import defer, reactor, protocol, task +from twisted.internet import defer, reactor, protocol, tcp from twisted.web import server from twisted.python import log from nattraverso import portmapper, ipdiscover import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data -from bitcoin import worker_interface, helper -from util import fixargparse, jsonrpc, variable, deferral, math, logging +from bitcoin import stratum, worker_interface, helper +from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol from . import networks, web, work import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node @@ -29,14 +30,18 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): try: print 'p2pool (version %s)' % (p2pool.__version__,) print - + @defer.inlineCallbacks def connect_p2p(): # connect to bitcoind over bitcoin-p2p print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port) factory = bitcoin_p2p.ClientFactory(net.PARENT) reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory) + def long(): + print ''' ...taking a while. Common reasons for this include all of bitcoind's connection slots being used...''' + long_dc = reactor.callLater(5, long) yield factory.getProtocol() # waits until handshake is successful + if not long_dc.called: long_dc.cancel() print ' ...success!' print defer.returnValue(factory) @@ -47,17 +52,16 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # connect to bitcoind over JSON-RPC and do initial getmemorypool url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port) print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username) - bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30) + bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30) yield helper.check(bitcoind, net) temp_work = yield helper.getwork(bitcoind) - bitcoind_warning_var = variable.Variable(None) + bitcoind_getinfo_var = variable.Variable(None) @defer.inlineCallbacks def poll_warnings(): - errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors'] - bitcoind_warning_var.set(errors if errors != '' else None) + bitcoind_getinfo_var.set((yield deferral.retry('Error while calling getinfo:')(bitcoind.rpc_getinfo)())) yield poll_warnings() - task.LoopingCall(poll_warnings).start(20*60) + deferral.RobustLoopingCall(poll_warnings).start(20*60) print ' ...success!' print ' Current block hash: %x' % (temp_work['previous_block'],) @@ -97,20 +101,15 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT) print - ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net) + print "Loading shares..." shares = {} known_verified = set() - print "Loading shares..." - for i, (mode, contents) in enumerate(ss.get_shares()): - if mode == 'share': - contents.time_seen = 0 - shares[contents.hash] = contents - if len(shares) % 1000 == 0 and shares: - print " %i" % (len(shares),) - elif mode == 'verified_hash': - known_verified.add(contents) - else: - raise AssertionError() + def share_cb(share): + share.time_seen = 0 # XXX + shares[share.hash] = share + if len(shares) % 1000 == 0 and shares: + print " %i" % (len(shares),) + ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net, share_cb, known_verified.add) print " ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified)) print @@ -126,7 +125,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): for share_hash in known_verified: if share_hash not in node.tracker.verified.items: ss.forget_verified_share(share_hash) - del shares, known_verified node.tracker.removed.watch(lambda share: ss.forget_share(share.hash)) node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash)) @@ -135,7 +133,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): ss.add_share(share) if share.hash in node.tracker.verified.items: ss.add_verified_hash(share.hash) - task.LoopingCall(save_shares).start(60) + deferral.RobustLoopingCall(save_shares).start(60) print ' ...success!' print @@ -144,12 +142,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'Joining p2pool network using port %i...' % (args.p2pool_port,) @defer.inlineCallbacks - def parse(x): - if ':' in x: - ip, port = x.split(':') - defer.returnValue(((yield reactor.resolve(ip)), int(port))) - else: - defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT)) + def parse(host): + port = net.P2P_PORT + if ':' in host: + host, port_str = host.split(':') + port = int(port_str) + defer.returnValue(((yield reactor.resolve(host)), port)) addrs = {} if os.path.exists(os.path.join(datadir_path, 'addrs')): @@ -179,13 +177,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): addr_store=addrs, connect_addrs=connect_addrs, desired_outgoing_conns=args.p2pool_outgoing_conns, + advertise_ip=args.advertise_ip, ) node.p2p_node.start() def save_addrs(): with open(os.path.join(datadir_path, 'addrs'), 'wb') as f: f.write(json.dumps(node.p2p_node.addr_store.items())) - task.LoopingCall(save_addrs).start(60) + deferral.RobustLoopingCall(save_addrs).start(60) print ' ...success!' print @@ -212,10 +211,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1]) wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee) - web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var) - worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/')) + web_root = web.get_web_root(wb, datadir_path, bitcoind_getinfo_var) + caching_wb = worker_interface.CachingWorkerBridge(wb) + worker_interface.WorkerInterface(caching_wb).attach_to(web_root, get_handler=lambda request: request.redirect('static/')) + web_serverfactory = server.Site(web_root) + - deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0]) + serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(caching_wb)}, web_serverfactory) + deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0]) with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f: pass @@ -227,9 +230,9 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): # done! print 'Started successfully!' print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],) - if args.donation_percentage > 0.51: + if args.donation_percentage > 1.1: print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,) - elif args.donation_percentage < 0.49: + elif args.donation_percentage < .9: print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,) else: print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,) @@ -242,7 +245,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack()) )) signal.siginterrupt(signal.SIGALRM, False) - task.LoopingCall(signal.alarm, 30).start(1) + deferral.RobustLoopingCall(signal.alarm, 30).start(1) if args.irc_announce: from twisted.words.protocols import irc @@ -286,7 +289,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'IRC connection lost:', reason.getErrorMessage() class IRCClientFactory(protocol.ReconnectingClientFactory): protocol = IRCClient - reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory()) + reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory(), bindAddress=(worker_endpoint[0], 0)) @defer.inlineCallbacks def status_thread(): @@ -306,11 +309,12 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): datums, dt = wb.local_rate_monitor.get_datums_in_last() my_att_s = sum(datum['work']/dt for datum in datums) + my_shares_per_s = sum(datum['work']/dt/bitcoin_data.target_to_average_attempts(datum['share_target']) for datum in datums) this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % ( math.format(int(my_att_s)), math.format_dt(dt), math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95), - math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???', + math.format_dt(1/my_shares_per_s) if my_shares_per_s else '???', ) if height > 2: @@ -322,7 +326,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): shares, stale_orphan_shares, stale_doa_shares, math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95), math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)), - node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL, + node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-6, net.PARENT.SYMBOL, ) this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % ( math.format(int(real_att_s)), @@ -330,10 +334,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s), ) - for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value): + for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_getinfo_var.value, node.bitcoind_work.value): print >>sys.stderr, '#'*40 print >>sys.stderr, '>>> Warning: ' + warning print >>sys.stderr, '#'*40 + + if gc.garbage: + print '%i pieces of uncollectable cyclic garbage! Types: %r' % (len(gc.garbage), map(type, gc.garbage)) if this_str != last_str or time.time() > last_time + 15: print this_str @@ -347,13 +354,18 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): log.err(None, 'Fatal error:') def run(): + if not hasattr(tcp.Client, 'abortConnection'): + print "Twisted doesn't have abortConnection! Upgrade to a newer version of Twisted to avoid memory leaks!" + print 'Pausing for 3 seconds...' + time.sleep(3) + realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name) parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@') parser.add_argument('--version', action='version', version=p2pool.__version__) parser.add_argument('--net', - help='use specified network (default: bitcoin)', - action='store', choices=sorted(realnets), default='bitcoin', dest='net_name') + help='use specified network (default: novacoin)', + action='store', choices=sorted(realnets), default='novacoin', dest='net_name') parser.add_argument('--testnet', help='''use the network's testnet''', action='store_const', const=True, default=False, dest='testnet') @@ -373,8 +385,8 @@ def run(): help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)', type=str, action='append', default=[], dest='merged_urls') parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE', - help='donate this percentage of work towards the development of p2pool (default: 0.5)', - type=float, action='store', default=0.5, dest='donation_percentage') + help='donate this percentage of work towards the development of p2pool (default: 1.0)', + type=float, action='store', default=1.0, dest='donation_percentage') parser.add_argument('--iocp', help='use Windows IOCP API in order to avoid errors due to large number of sockets being open', action='store_true', default=False, dest='iocp') @@ -401,6 +413,9 @@ def run(): p2pool_group.add_argument('--outgoing-conns', metavar='CONNS', help='outgoing connections (default: 6)', type=int, action='store', default=6, dest='p2pool_outgoing_conns') + parser.add_argument('--disable-advertise', + help='''don't advertise local IP address as being available for incoming connections. useful for running a dark node, along with multiple -n ADDR's and --outgoing-conns 0''', + action='store_false', default=True, dest='advertise_ip') worker_group = parser.add_argument_group('worker interface') worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT', @@ -529,7 +544,7 @@ def run(): logfile.reopen() print '...and reopened %r after catching SIGUSR1.' % (args.logfile,) signal.signal(signal.SIGUSR1, sigusr1) - task.LoopingCall(logfile.reopen).start(5) + deferral.RobustLoopingCall(logfile.reopen).start(5) class ErrorReporter(object): def __init__(self):