1 from __future__ import division
13 if '--iocp' in sys.argv:
14 from twisted.internet import iocpreactor
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import stratum, worker_interface, helper
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging, switchprotocol
24 from . import networks, web, work
25 import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node
27 @defer.inlineCallbacks
28 def main(args, net, datadir_path, merged_urls, worker_endpoint):
30 print 'p2pool (version %s)' % (p2pool.__version__,)
33 @defer.inlineCallbacks
35 # connect to bitcoind over bitcoin-p2p
36 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
37 factory = bitcoin_p2p.ClientFactory(net.PARENT)
38 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
39 yield factory.getProtocol() # waits until handshake is successful
42 defer.returnValue(factory)
44 if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
45 factory = yield connect_p2p()
47 # connect to bitcoind over JSON-RPC and do initial getmemorypool
48 url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
49 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
50 bitcoind = jsonrpc.HTTPProxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
51 yield helper.check(bitcoind, net)
52 temp_work = yield helper.getwork(bitcoind)
54 bitcoind_warning_var = variable.Variable(None)
55 @defer.inlineCallbacks
57 errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
58 bitcoind_warning_var.set(errors if errors != '' else None)
60 task.LoopingCall(poll_warnings).start(20*60)
63 print ' Current block hash: %x' % (temp_work['previous_block'],)
64 print ' Current block height: %i' % (temp_work['height'] - 1,)
68 factory = yield connect_p2p()
70 print 'Determining payout address...'
71 if args.pubkey_hash is None:
72 address_path = os.path.join(datadir_path, 'cached_payout_address')
74 if os.path.exists(address_path):
75 with open(address_path, 'rb') as f:
76 address = f.read().strip('\r\n')
77 print ' Loaded cached address: %s...' % (address,)
81 if address is not None:
82 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
83 if not res['isvalid'] or not res['ismine']:
84 print ' Cached address is either invalid or not controlled by local bitcoind!'
88 print ' Getting payout address from bitcoind...'
89 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
91 with open(address_path, 'wb') as f:
94 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
96 my_pubkey_hash = args.pubkey_hash
97 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
100 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
102 known_verified = set()
103 print "Loading shares..."
104 for i, (mode, contents) in enumerate(ss.get_shares()):
106 contents.time_seen = 0
107 shares[contents.hash] = contents
108 if len(shares) % 1000 == 0 and shares:
109 print " %i" % (len(shares),)
110 elif mode == 'verified_hash':
111 known_verified.add(contents)
113 raise AssertionError()
114 print " ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified))
118 print 'Initializing work...'
120 node = p2pool_node.Node(factory, bitcoind, shares.values(), known_verified, net)
123 for share_hash in shares:
124 if share_hash not in node.tracker.items:
125 ss.forget_share(share_hash)
126 for share_hash in known_verified:
127 if share_hash not in node.tracker.verified.items:
128 ss.forget_verified_share(share_hash)
129 del shares, known_verified
130 node.tracker.removed.watch(lambda share: ss.forget_share(share.hash))
131 node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
134 for share in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 2*net.CHAIN_LENGTH)):
136 if share.hash in node.tracker.verified.items:
137 ss.add_verified_hash(share.hash)
138 task.LoopingCall(save_shares).start(60)
144 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
146 @defer.inlineCallbacks
149 ip, port = x.split(':')
150 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
152 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
155 if os.path.exists(os.path.join(datadir_path, 'addrs')):
157 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
158 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
160 print >>sys.stderr, 'error parsing addrs'
161 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
164 if addr not in addrs:
165 addrs[addr] = (0, time.time(), time.time())
169 connect_addrs = set()
170 for addr_df in map(parse, args.p2pool_nodes):
172 connect_addrs.add((yield addr_df))
176 node.p2p_node = p2pool_node.P2PNode(node,
177 port=args.p2pool_port,
178 max_incoming_conns=args.p2pool_conns,
180 connect_addrs=connect_addrs,
181 desired_outgoing_conns=args.p2pool_outgoing_conns,
183 node.p2p_node.start()
186 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
187 f.write(json.dumps(node.p2p_node.addr_store.items()))
188 task.LoopingCall(save_addrs).start(60)
194 @defer.inlineCallbacks
198 is_lan, lan_ip = yield ipdiscover.get_local_ip()
200 pm = yield portmapper.get_port_mapper()
201 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
202 except defer.TimeoutError:
206 log.err(None, 'UPnP error:')
207 yield deferral.sleep(random.expovariate(1/120))
210 # start listening for workers with a JSON-RPC server
212 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
214 wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee)
215 web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var)
216 caching_wb = worker_interface.CachingWorkerBridge(wb)
217 worker_interface.WorkerInterface(caching_wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
218 web_serverfactory = server.Site(web_root)
221 serverfactory = switchprotocol.FirstByteSwitchFactory({'{': stratum.StratumServerFactory(caching_wb)}, web_serverfactory)
222 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], serverfactory, interface=worker_endpoint[0])
224 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
232 print 'Started successfully!'
233 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
234 if args.donation_percentage > 1.1:
235 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
236 elif args.donation_percentage < .9:
237 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
239 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
240 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
244 if hasattr(signal, 'SIGALRM'):
245 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
246 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
248 signal.siginterrupt(signal.SIGALRM, False)
249 task.LoopingCall(signal.alarm, 30).start(1)
251 if args.irc_announce:
252 from twisted.words.protocols import irc
253 class IRCClient(irc.IRCClient):
254 nickname = 'p2pool%02i' % (random.randrange(100),)
255 channel = net.ANNOUNCE_CHANNEL
256 def lineReceived(self, line):
259 irc.IRCClient.lineReceived(self, line)
261 self.in_channel = False
262 irc.IRCClient.signedOn(self)
263 self.factory.resetDelay()
264 self.join(self.channel)
265 @defer.inlineCallbacks
266 def new_share(share):
267 if not self.in_channel:
269 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
270 yield deferral.sleep(random.expovariate(1/60))
271 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
272 if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
273 self.say(self.channel, message)
274 self._remember_message(message)
275 self.watch_id = node.tracker.verified.added.watch(new_share)
276 self.recent_messages = []
277 def joined(self, channel):
278 self.in_channel = True
279 def left(self, channel):
280 self.in_channel = False
281 def _remember_message(self, message):
282 self.recent_messages.append(message)
283 while len(self.recent_messages) > 100:
284 self.recent_messages.pop(0)
285 def privmsg(self, user, channel, message):
286 if channel == self.channel:
287 self._remember_message(message)
288 def connectionLost(self, reason):
289 node.tracker.verified.added.unwatch(self.watch_id)
290 print 'IRC connection lost:', reason.getErrorMessage()
291 class IRCClientFactory(protocol.ReconnectingClientFactory):
293 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
295 @defer.inlineCallbacks
300 yield deferral.sleep(3)
302 height = node.tracker.get_height(node.best_share_var.value)
303 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
305 len(node.tracker.verified.items),
306 len(node.tracker.items),
307 len(node.p2p_node.peers),
308 sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming),
309 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
311 datums, dt = wb.local_rate_monitor.get_datums_in_last()
312 my_att_s = sum(datum['work']/dt for datum in datums)
313 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
314 math.format(int(my_att_s)),
316 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
317 math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???',
321 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
322 stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
323 real_att_s = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
325 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
326 shares, stale_orphan_shares, stale_doa_shares,
327 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
328 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
329 node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
331 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
332 math.format(int(real_att_s)),
334 math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s),
337 for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value):
338 print >>sys.stderr, '#'*40
339 print >>sys.stderr, '>>> Warning: ' + warning
340 print >>sys.stderr, '#'*40
342 if this_str != last_str or time.time() > last_time + 15:
345 last_time = time.time()
351 log.err(None, 'Fatal error:')
354 realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
356 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
357 parser.add_argument('--version', action='version', version=p2pool.__version__)
358 parser.add_argument('--net',
359 help='use specified network (default: bitcoin)',
360 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
361 parser.add_argument('--testnet',
362 help='''use the network's testnet''',
363 action='store_const', const=True, default=False, dest='testnet')
364 parser.add_argument('--debug',
365 help='enable debugging mode',
366 action='store_const', const=True, default=False, dest='debug')
367 parser.add_argument('-a', '--address',
368 help='generate payouts to this address (default: <address requested from bitcoind>)',
369 type=str, action='store', default=None, dest='address')
370 parser.add_argument('--datadir',
371 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
372 type=str, action='store', default=None, dest='datadir')
373 parser.add_argument('--logfile',
374 help='''log to this file (default: data/<NET>/log)''',
375 type=str, action='store', default=None, dest='logfile')
376 parser.add_argument('--merged',
377 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
378 type=str, action='append', default=[], dest='merged_urls')
379 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
380 help='donate this percentage of work towards the development of p2pool (default: 1.0)',
381 type=float, action='store', default=1.0, dest='donation_percentage')
382 parser.add_argument('--iocp',
383 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
384 action='store_true', default=False, dest='iocp')
385 parser.add_argument('--irc-announce',
386 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
387 action='store_true', default=False, dest='irc_announce')
388 parser.add_argument('--no-bugreport',
389 help='disable submitting caught exceptions to the author',
390 action='store_true', default=False, dest='no_bugreport')
392 p2pool_group = parser.add_argument_group('p2pool interface')
393 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
394 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
395 type=int, action='store', default=None, dest='p2pool_port')
396 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
397 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
398 type=str, action='append', default=[], dest='p2pool_nodes')
399 parser.add_argument('--disable-upnp',
400 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
401 action='store_false', default=True, dest='upnp')
402 p2pool_group.add_argument('--max-conns', metavar='CONNS',
403 help='maximum incoming connections (default: 40)',
404 type=int, action='store', default=40, dest='p2pool_conns')
405 p2pool_group.add_argument('--outgoing-conns', metavar='CONNS',
406 help='outgoing connections (default: 6)',
407 type=int, action='store', default=6, dest='p2pool_outgoing_conns')
409 worker_group = parser.add_argument_group('worker interface')
410 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
411 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
412 type=str, action='store', default=None, dest='worker_endpoint')
413 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
414 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
415 type=float, action='store', default=0, dest='worker_fee')
417 bitcoind_group = parser.add_argument_group('bitcoind interface')
418 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
419 help='connect to this address (default: 127.0.0.1)',
420 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
421 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
422 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
423 type=int, action='store', default=None, dest='bitcoind_rpc_port')
424 bitcoind_group.add_argument('--bitcoind-rpc-ssl',
425 help='connect to JSON-RPC interface using SSL',
426 action='store_true', default=False, dest='bitcoind_rpc_ssl')
427 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
428 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
429 type=int, action='store', default=None, dest='bitcoind_p2p_port')
431 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
432 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
433 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
435 args = parser.parse_args()
439 defer.setDebugging(True)
443 net_name = args.net_name + ('_testnet' if args.testnet else '')
444 net = networks.nets[net_name]
446 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
447 if not os.path.exists(datadir_path):
448 os.makedirs(datadir_path)
450 if len(args.bitcoind_rpc_userpass) > 2:
451 parser.error('a maximum of two arguments are allowed')
452 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
454 if args.bitcoind_rpc_password is None:
455 conf_path = net.PARENT.CONF_FILE_FUNC()
456 if not os.path.exists(conf_path):
457 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
458 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
461 '''rpcpassword=%x\r\n'''
463 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
464 conf = open(conf_path, 'rb').read()
466 for line in conf.splitlines(True):
468 line = line[:line.index('#')]
471 k, v = line.split('=', 1)
472 contents[k.strip()] = v.strip()
473 for conf_name, var_name, var_type in [
474 ('rpcuser', 'bitcoind_rpc_username', str),
475 ('rpcpassword', 'bitcoind_rpc_password', str),
476 ('rpcport', 'bitcoind_rpc_port', int),
477 ('port', 'bitcoind_p2p_port', int),
479 if getattr(args, var_name) is None and conf_name in contents:
480 setattr(args, var_name, var_type(contents[conf_name]))
481 if args.bitcoind_rpc_password is None:
482 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
484 if args.bitcoind_rpc_username is None:
485 args.bitcoind_rpc_username = ''
487 if args.bitcoind_rpc_port is None:
488 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
490 if args.bitcoind_p2p_port is None:
491 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
493 if args.p2pool_port is None:
494 args.p2pool_port = net.P2P_PORT
496 if args.p2pool_outgoing_conns > 10:
497 parser.error('''--outgoing-conns can't be more than 10''')
499 if args.worker_endpoint is None:
500 worker_endpoint = '', net.WORKER_PORT
501 elif ':' not in args.worker_endpoint:
502 worker_endpoint = '', int(args.worker_endpoint)
504 addr, port = args.worker_endpoint.rsplit(':', 1)
505 worker_endpoint = addr, int(port)
507 if args.address is not None:
509 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
511 parser.error('error parsing address: ' + repr(e))
513 args.pubkey_hash = None
515 def separate_url(url):
516 s = urlparse.urlsplit(url)
517 if '@' not in s.netloc:
518 parser.error('merged url netloc must contain an "@"')
519 userpass, new_netloc = s.netloc.rsplit('@', 1)
520 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
521 merged_urls = map(separate_url, args.merged_urls)
523 if args.logfile is None:
524 args.logfile = os.path.join(datadir_path, 'log')
526 logfile = logging.LogFile(args.logfile)
527 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
528 sys.stdout = logging.AbortPipe(pipe)
529 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
530 if hasattr(signal, "SIGUSR1"):
531 def sigusr1(signum, frame):
532 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
534 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
535 signal.signal(signal.SIGUSR1, sigusr1)
536 task.LoopingCall(logfile.reopen).start(5)
538 class ErrorReporter(object):
540 self.last_sent = None
542 def emit(self, eventDict):
543 if not eventDict["isError"]:
546 if self.last_sent is not None and time.time() < self.last_sent + 5:
548 self.last_sent = time.time()
550 if 'failure' in eventDict:
551 text = ((eventDict.get('why') or 'Unhandled Error')
552 + '\n' + eventDict['failure'].getTraceback())
554 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
556 from twisted.web import client
558 url='http://u.forre.st/p2pool_error.cgi',
560 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
562 ).addBoth(lambda x: None)
563 if not args.no_bugreport:
564 log.addObserver(ErrorReporter().emit)
566 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)