1 from __future__ import division
14 if '--iocp' in sys.argv:
15 from twisted.internet import iocpreactor
17 from twisted.internet import defer, reactor, protocol, task
18 from twisted.web import server
19 from twisted.python import log
20 from nattraverso import portmapper, ipdiscover
22 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
23 from bitcoin import stratum, worker_interface, helper
24 from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol
25 from . import networks, web, work
26 import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
28 @defer.inlineCallbacks
29 def main(args, net, datadir_path, merged_urls, worker_endpoint):
31 print 'p2pool (version %s)' % (p2pool.__version__,)
34 @defer.inlineCallbacks
36 # connect to bitcoind over bitcoin-p2p
37 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
38 factory = bitcoin_p2p.ClientFactory(net.PARENT)
39 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
40 yield factory.getProtocol() # waits until handshake is successful
43 defer.returnValue(factory)
45 if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
46 factory = yield connect_p2p()
48 # connect to bitcoind over JSON-RPC and do initial getmemorypool
49 url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
50 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
51 bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
52 yield helper.check(bitcoind, net)
53 temp_work = yield helper.getwork(bitcoind)
55 bitcoind_warning_var = variable.Variable(None)
56 @defer.inlineCallbacks
58 errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
59 bitcoind_warning_var.set(errors if errors != '' else None)
61 task.LoopingCall(poll_warnings).start(20*60)
64 print ' Current block hash: %x' % (temp_work['previous_block'],)
65 print ' Current block height: %i' % (temp_work['height'] - 1,)
69 factory = yield connect_p2p()
71 print 'Determining payout address...'
72 pubkey_path = os.path.join(datadir_path, 'cached_payout_pubkey')
74 if os.path.exists(pubkey_path):
75 with open(pubkey_path, 'rb') as f:
76 pubkey = f.read().strip('\r\n')
77 print ' Loaded cached pubkey, payout address: %s...' % (bitcoin_data.pubkey_to_address(pubkey.decode('hex'), net.PARENT),)
81 if pubkey is not None:
82 res = yield deferral.retry('Error validating cached pubkey:', 5)(lambda: bitcoind.rpc_validatepubkey(pubkey))()
83 if not res['isvalid'] or not res['ismine']:
84 print ' Cached pubkey is either invalid or not controlled by local bitcoind!'
88 print ' Getting payout pubkey from bitcoind...'
89 pubkey = yield deferral.retry('Error getting payout pubkey from bitcoind:', 5)(lambda: bitcoind.rpc_getnewpubkey('p2pool'))()
91 with open(pubkey_path, 'wb') as f:
94 my_pubkey = pubkey.decode('hex')
96 address = bitcoin_data.pubkey_to_address(my_pubkey, net.PARENT)
98 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
99 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
102 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
104 known_verified = set()
105 print "Loading shares..."
106 for i, (mode, contents) in enumerate(ss.get_shares()):
108 contents.time_seen = 0
109 shares[contents.hash] = contents
110 if len(shares) % 1000 == 0 and shares:
111 print " %i" % (len(shares),)
112 elif mode == 'verified_hash':
113 known_verified.add(contents)
115 raise AssertionError()
116 print " ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified))
120 print 'Initializing work...'
122 node = p2pool_node.Node(factory, bitcoind, shares.values(), known_verified, net)
125 for share_hash in shares:
126 if share_hash not in node.tracker.items:
127 ss.forget_share(share_hash)
128 for share_hash in known_verified:
129 if share_hash not in node.tracker.verified.items:
130 ss.forget_verified_share(share_hash)
131 del shares, known_verified
132 node.tracker.removed.watch(lambda share: ss.forget_share(share.hash))
133 node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
136 for share in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 2*net.CHAIN_LENGTH)):
138 if share.hash in node.tracker.verified.items:
139 ss.add_verified_hash(share.hash)
140 task.LoopingCall(save_shares).start(60)
146 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
148 @defer.inlineCallbacks
151 ip, port = x.split(':')
152 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
154 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
157 if os.path.exists(os.path.join(datadir_path, 'addrs')):
159 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
160 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
162 print >>sys.stderr, 'error parsing addrs'
163 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
166 if addr not in addrs:
167 addrs[addr] = (0, time.time(), time.time())
171 connect_addrs = set()
172 for addr_df in map(parse, args.p2pool_nodes):
174 connect_addrs.add((yield addr_df))
178 node.p2p_node = p2pool_node.P2PNode(node,
179 port=args.p2pool_port,
180 max_incoming_conns=args.p2pool_conns,
182 connect_addrs=connect_addrs,
183 desired_outgoing_conns=args.p2pool_outgoing_conns,
185 node.p2p_node.start()
188 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
189 f.write(json.dumps(node.p2p_node.addr_store.items()))
190 task.LoopingCall(save_addrs).start(60)
196 @defer.inlineCallbacks
200 is_lan, lan_ip = yield ipdiscover.get_local_ip()
202 pm = yield portmapper.get_port_mapper()
203 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
204 except defer.TimeoutError:
208 log.err(None, 'UPnP error:')
209 yield deferral.sleep(random.expovariate(1/120))
212 # start listening for workers with a JSON-RPC server
214 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
216 wb = work.WorkerBridge(node, my_pubkey, args.donation_percentage, merged_urls, args.worker_fee)
217 web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
218 caching_wb = worker_interface.CachingWorkerBridge(wb)
219 worker_interface.WorkerInterface(caching_wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
220 web_serverfactory = server.Site(web_root)
223 serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(caching_wb)}, web_serverfactory)
224 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0])
226 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
234 print 'Started successfully!'
235 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
236 if args.donation_percentage > 1.1:
237 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
238 elif args.donation_percentage < .9:
239 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
241 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
242 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
246 if hasattr(signal, 'SIGALRM'):
247 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
248 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
250 signal.siginterrupt(signal.SIGALRM, False)
251 task.LoopingCall(signal.alarm, 30).start(1)
253 if args.irc_announce:
254 from twisted.words.protocols import irc
255 class IRCClient(irc.IRCClient):
256 nickname = 'p2pool%02i' % (random.randrange(100),)
257 channel = net.ANNOUNCE_CHANNEL
258 def lineReceived(self, line):
261 irc.IRCClient.lineReceived(self, line)
263 self.in_channel = False
264 irc.IRCClient.signedOn(self)
265 self.factory.resetDelay()
266 self.join(self.channel)
267 @defer.inlineCallbacks
268 def new_share(share):
269 if not self.in_channel:
271 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
272 yield deferral.sleep(random.expovariate(1/60))
273 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
274 if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
275 self.say(self.channel, message)
276 self._remember_message(message)
277 self.watch_id = node.tracker.verified.added.watch(new_share)
278 self.recent_messages = []
279 def joined(self, channel):
280 self.in_channel = True
281 def left(self, channel):
282 self.in_channel = False
283 def _remember_message(self, message):
284 self.recent_messages.append(message)
285 while len(self.recent_messages) > 100:
286 self.recent_messages.pop(0)
287 def privmsg(self, user, channel, message):
288 if channel == self.channel:
289 self._remember_message(message)
290 def connectionLost(self, reason):
291 node.tracker.verified.added.unwatch(self.watch_id)
292 print 'IRC connection lost:', reason.getErrorMessage()
293 class IRCClientFactory(protocol.ReconnectingClientFactory):
295 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
297 @defer.inlineCallbacks
302 yield deferral.sleep(3)
304 height = node.tracker.get_height(node.best_share_var.value)
305 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
307 len(node.tracker.verified.items),
308 len(node.tracker.items),
309 len(node.p2p_node.peers),
310 sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
311 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
313 datums, dt = wb.local_rate_monitor.get_datums_in_last()
314 my_att_s = sum(datum['work']/dt for datum in datums)
315 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
316 math.format(int(my_att_s)),
318 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
319 math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???',
323 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
324 stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
325 real_att_s = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
327 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
328 shares, stale_orphan_shares, stale_doa_shares,
329 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
330 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
331 node.get_current_txouts().get(bitcoin_data.pubkey_to_script2(my_pubkey), 0) * 1e-6, net.PARENT.SYMBOL,
333 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
334 math.format(int(real_att_s)),
336 math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s),
339 for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value):
340 print >>sys.stderr, '#'*40
341 print >>sys.stderr, '>>> Warning: ' + warning
342 print >>sys.stderr, '#'*40
345 print '%i pieces of uncollectable cyclic garbage! Types: %r' % (len(gc.garbage), map(type, gc.garbage))
347 if this_str != last_str or time.time() > last_time + 15:
350 last_time = time.time()
356 log.err(None, 'Fatal error:')
359 realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
361 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
362 parser.add_argument('--version', action='version', version=p2pool.__version__)
363 parser.add_argument('--net',
364 help='use specified network (default: novacoin)',
365 action='store', choices=sorted(realnets), default='novacoin', dest='net_name')
366 parser.add_argument('--testnet',
367 help='''use the network's testnet''',
368 action='store_const', const=True, default=False, dest='testnet')
369 parser.add_argument('--debug',
370 help='enable debugging mode',
371 action='store_const', const=True, default=False, dest='debug')
372 parser.add_argument('--datadir',
373 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
374 type=str, action='store', default=None, dest='datadir')
375 parser.add_argument('--logfile',
376 help='''log to this file (default: data/<NET>/log)''',
377 type=str, action='store', default=None, dest='logfile')
378 parser.add_argument('--merged',
379 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
380 type=str, action='append', default=[], dest='merged_urls')
381 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
382 help='donate this percentage of work towards the development of p2pool (default: 0.0)',
383 type=float, action='store', default=0.0, dest='donation_percentage')
384 parser.add_argument('--iocp',
385 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
386 action='store_true', default=False, dest='iocp')
387 parser.add_argument('--irc-announce',
388 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
389 action='store_true', default=False, dest='irc_announce')
390 parser.add_argument('--no-bugreport',
391 help='disable submitting caught exceptions to the author',
392 action='store_true', default=False, dest='no_bugreport')
394 p2pool_group = parser.add_argument_group('p2pool interface')
395 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
396 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
397 type=int, action='store', default=None, dest='p2pool_port')
398 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
399 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
400 type=str, action='append', default=[], dest='p2pool_nodes')
401 parser.add_argument('--disable-upnp',
402 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
403 action='store_false', default=True, dest='upnp')
404 p2pool_group.add_argument('--max-conns', metavar='CONNS',
405 help='maximum incoming connections (default: 40)',
406 type=int, action='store', default=40, dest='p2pool_conns')
407 p2pool_group.add_argument('--outgoing-conns', metavar='CONNS',
408 help='outgoing connections (default: 6)',
409 type=int, action='store', default=6, dest='p2pool_outgoing_conns')
411 worker_group = parser.add_argument_group('worker interface')
412 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
413 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
414 type=str, action='store', default=None, dest='worker_endpoint')
415 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
416 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
417 type=float, action='store', default=0, dest='worker_fee')
419 bitcoind_group = parser.add_argument_group('bitcoind interface')
420 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
421 help='connect to this address (default: 127.0.0.1)',
422 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
423 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
424 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
425 type=int, action='store', default=None, dest='bitcoind_rpc_port')
426 bitcoind_group.add_argument('--bitcoind-rpc-ssl',
427 help='connect to JSON-RPC interface using SSL',
428 action='store_true', default=False, dest='bitcoind_rpc_ssl')
429 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
430 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
431 type=int, action='store', default=None, dest='bitcoind_p2p_port')
433 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
434 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
435 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
437 args = parser.parse_args()
441 defer.setDebugging(True)
445 net_name = args.net_name + ('_testnet' if args.testnet else '')
446 net = networks.nets[net_name]
448 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
449 if not os.path.exists(datadir_path):
450 os.makedirs(datadir_path)
452 if len(args.bitcoind_rpc_userpass) > 2:
453 parser.error('a maximum of two arguments are allowed')
454 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
456 if args.bitcoind_rpc_password is None:
457 conf_path = net.PARENT.CONF_FILE_FUNC()
458 if not os.path.exists(conf_path):
459 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
460 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
463 '''rpcpassword=%x\r\n'''
465 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
466 conf = open(conf_path, 'rb').read()
468 for line in conf.splitlines(True):
470 line = line[:line.index('#')]
473 k, v = line.split('=', 1)
474 contents[k.strip()] = v.strip()
475 for conf_name, var_name, var_type in [
476 ('rpcuser', 'bitcoind_rpc_username', str),
477 ('rpcpassword', 'bitcoind_rpc_password', str),
478 ('rpcport', 'bitcoind_rpc_port', int),
479 ('port', 'bitcoind_p2p_port', int),
481 if getattr(args, var_name) is None and conf_name in contents:
482 setattr(args, var_name, var_type(contents[conf_name]))
483 if args.bitcoind_rpc_password is None:
484 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
486 if args.bitcoind_rpc_username is None:
487 args.bitcoind_rpc_username = ''
489 if args.bitcoind_rpc_port is None:
490 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
492 if args.bitcoind_p2p_port is None:
493 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
495 if args.p2pool_port is None:
496 args.p2pool_port = net.P2P_PORT
498 if args.p2pool_outgoing_conns > 10:
499 parser.error('''--outgoing-conns can't be more than 10''')
501 if args.worker_endpoint is None:
502 worker_endpoint = '', net.WORKER_PORT
503 elif ':' not in args.worker_endpoint:
504 worker_endpoint = '', int(args.worker_endpoint)
506 addr, port = args.worker_endpoint.rsplit(':', 1)
507 worker_endpoint = addr, int(port)
509 def separate_url(url):
510 s = urlparse.urlsplit(url)
511 if '@' not in s.netloc:
512 parser.error('merged url netloc must contain an "@"')
513 userpass, new_netloc = s.netloc.rsplit('@', 1)
514 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
515 merged_urls = map(separate_url, args.merged_urls)
517 if args.logfile is None:
518 args.logfile = os.path.join(datadir_path, 'log')
520 logfile = logging.LogFile(args.logfile)
521 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
522 sys.stdout = logging.AbortPipe(pipe)
523 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
524 if hasattr(signal, "SIGUSR1"):
525 def sigusr1(signum, frame):
526 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
528 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
529 signal.signal(signal.SIGUSR1, sigusr1)
530 task.LoopingCall(logfile.reopen).start(5)
532 class ErrorReporter(object):
534 self.last_sent = None
536 def emit(self, eventDict):
537 if not eventDict["isError"]:
540 if self.last_sent is not None and time.time() < self.last_sent + 5:
542 self.last_sent = time.time()
544 if 'failure' in eventDict:
545 text = ((eventDict.get('why') or 'Unhandled Error')
546 + '\n' + eventDict['failure'].getTraceback())
548 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
550 from twisted.web import client
552 url='http://u.forre.st/p2pool_error.cgi',
554 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
556 ).addBoth(lambda x: None)
557 if not args.no_bugreport:
558 log.addObserver(ErrorReporter().emit)
560 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)