1 from __future__ import division
16 from twisted.internet import iocpreactor
21 print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
37 work = yield bitcoind.rpc_getmemorypool()
38 except jsonrpc.Error, e:
39 if e.code == -32601: # Method not found
40 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41 raise deferral.RetrySilentlyException()
43 packed_transactions = [x.decode('hex') for x in work['transactions']]
44 defer.returnValue(dict(
45 version=work['version'],
46 previous_block_hash=int(work['previousblockhash'], 16),
47 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49 subsidy=work['coinbasevalue'],
51 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58 print 'p2pool (version %s)' % (p2pool.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getmemorypool
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65 @deferral.retry('Error while checking Bitcoin connection:', 1)
66 @defer.inlineCallbacks
68 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70 raise deferral.RetrySilentlyException()
71 temp_work = yield getwork(bitcoind)
72 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74 raise deferral.RetrySilentlyException()
75 defer.returnValue(temp_work)
76 temp_work = yield check()
78 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
81 # connect to bitcoind over bitcoin-p2p
82 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83 factory = bitcoin_p2p.ClientFactory(net.PARENT)
84 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85 yield factory.getProtocol() # waits until handshake is successful
89 print 'Determining payout address...'
90 if args.pubkey_hash is None:
91 address_path = os.path.join(datadir_path, 'cached_payout_address')
93 if os.path.exists(address_path):
94 with open(address_path, 'rb') as f:
95 address = f.read().strip('\r\n')
96 print ' Loaded cached address: %s...' % (address,)
100 if address is not None:
101 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102 if not res['isvalid'] or not res['ismine']:
103 print ' Cached address is either invalid or not controlled by local bitcoind!'
107 print ' Getting payout address from bitcoind...'
108 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110 with open(address_path, 'wb') as f:
113 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115 my_pubkey_hash = args.pubkey_hash
116 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
119 my_share_hashes = set()
120 my_doa_share_hashes = set()
122 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123 shared_share_hashes = set()
124 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125 known_verified = set()
126 print "Loading shares..."
127 for i, (mode, contents) in enumerate(ss.get_shares()):
129 if contents.hash in tracker.shares:
131 shared_share_hashes.add(contents.hash)
132 contents.time_seen = 0
133 tracker.add(contents)
134 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135 print " %i" % (len(tracker.shares),)
136 elif mode == 'verified_hash':
137 known_verified.add(contents)
139 raise AssertionError()
140 print " ...inserting %i verified shares..." % (len(known_verified),)
141 for h in known_verified:
142 if h not in tracker.shares:
143 ss.forget_verified_share(h)
145 tracker.verified.add(tracker.shares[h])
146 print " ...done loading %i shares!" % (len(tracker.shares),)
148 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154 pre_current_work = variable.Variable(None)
155 pre_merged_work = variable.Variable({})
156 # information affecting work that should trigger a long-polling update
157 current_work = variable.Variable(None)
158 # information affecting work that should not trigger a long-polling update
159 current_work2 = variable.Variable(None)
161 requested = expiring_dict.ExpiringDict(300)
163 print 'Initializing work...'
164 @defer.inlineCallbacks
165 def set_real_work1():
166 work = yield getwork(bitcoind)
167 current_work2.set(dict(
169 transactions=work['transactions'],
170 merkle_link=work['merkle_link'],
171 subsidy=work['subsidy'],
172 clock_offset=time.time() - work['time'],
173 last_update=time.time(),
174 )) # second set first because everything hooks on the first
175 pre_current_work.set(dict(
176 version=work['version'],
177 previous_block=work['previous_block_hash'],
179 coinbaseflags=work['coinbaseflags'],
181 yield set_real_work1()
183 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185 def set_real_work2():
186 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188 t = dict(pre_current_work.value)
189 t['best_share_hash'] = best
190 t['mm_chains'] = pre_merged_work.value
194 for peer2, share_hash in desired:
195 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197 last_request_time, count = requested.get(share_hash, (None, 0))
198 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200 potential_peers = set()
201 for head in tracker.tails[share_hash]:
202 potential_peers.update(peer_heads.get(head, set()))
203 potential_peers = [peer for peer in potential_peers if peer.connected2]
204 if count == 0 and peer2 is not None and peer2.connected2:
207 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
211 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
215 stops=list(set(tracker.heads) | set(
216 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
219 requested[share_hash] = t, count + 1
220 pre_current_work.changed.watch(lambda _: set_real_work2())
221 pre_merged_work.changed.watch(lambda _: set_real_work2())
227 @defer.inlineCallbacks
228 def set_merged_work(merged_url, merged_userpass):
229 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233 hash=int(auxblock['hash'], 16),
234 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235 merged_proxy=merged_proxy,
237 yield deferral.sleep(1)
238 for merged_url, merged_userpass in merged_urls:
239 set_merged_work(merged_url, merged_userpass)
241 @pre_merged_work.changed.watch
242 def _(new_merged_work):
243 print 'Got new merged mining work!'
245 # setup p2p logic and join p2pool network
247 class Node(p2p.Node):
248 def handle_shares(self, shares, peer):
250 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
254 if share.hash in tracker.shares:
255 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
264 if shares and peer is not None:
265 peer_heads.setdefault(shares[0].hash, set()).add(peer)
271 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273 def handle_share_hashes(self, hashes, peer):
276 for share_hash in hashes:
277 if share_hash in tracker.shares:
279 last_request_time, count = requested.get(share_hash, (None, 0))
280 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283 get_hashes.append(share_hash)
284 requested[share_hash] = t, count + 1
286 if hashes and peer is not None:
287 peer_heads.setdefault(hashes[0], set()).add(peer)
289 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291 def handle_get_shares(self, hashes, parents, stops, peer):
292 parents = min(parents, 1000//len(hashes))
295 for share_hash in hashes:
296 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297 if share.hash in stops:
300 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
303 @deferral.retry('Error submitting block: (will retry)', 10, 10)
304 @defer.inlineCallbacks
305 def submit_block(block, ignore_failure):
306 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
311 @tracker.verified.added.watch
313 if share.pow_hash <= share.header['bits'].target:
314 submit_block(share.as_block(tracker), ignore_failure=True)
316 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
319 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
321 @defer.inlineCallbacks
324 ip, port = x.split(':')
325 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
327 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
330 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
332 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
334 print >>sys.stderr, "error reading addrs"
335 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
338 if addr not in addrs:
339 addrs[addr] = (0, time.time(), time.time())
343 connect_addrs = set()
344 for addr_df in map(parse, args.p2pool_nodes):
346 connect_addrs.add((yield addr_df))
351 best_share_hash_func=lambda: current_work.value['best_share_hash'],
352 port=args.p2pool_port,
355 connect_addrs=connect_addrs,
356 max_incoming_conns=args.p2pool_conns,
360 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
362 # send share when the chain changes to their chain
363 def work_changed(new_work):
364 #print 'Work changed:', new_work
366 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
367 if share.hash in shared_share_hashes:
369 shared_share_hashes.add(share.hash)
372 for peer in p2p_node.peers.itervalues():
373 peer.sendShares([share for share in shares if share.peer is not peer])
375 current_work.changed.watch(work_changed)
378 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
380 if share.hash in tracker.verified.shares:
381 ss.add_verified_hash(share.hash)
382 task.LoopingCall(save_shares).start(60)
388 @defer.inlineCallbacks
392 is_lan, lan_ip = yield ipdiscover.get_local_ip()
394 pm = yield portmapper.get_port_mapper()
395 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
396 except defer.TimeoutError:
400 log.err(None, 'UPnP error:')
401 yield deferral.sleep(random.expovariate(1/120))
404 # start listening for workers with a JSON-RPC server
406 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
410 removed_unstales_var = variable.Variable((0, 0, 0))
411 removed_doa_unstales_var = variable.Variable(0)
412 @tracker.verified.removed.watch
414 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
416 removed_unstales_var.set((
417 removed_unstales_var.value[0] + 1,
418 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
419 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
421 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
422 removed_doa_unstales.set(removed_doa_unstales.value + 1)
424 def get_stale_counts():
425 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
426 my_shares = len(my_share_hashes)
427 my_doa_shares = len(my_doa_share_hashes)
428 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
429 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
430 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
431 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
432 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
434 my_shares_not_in_chain = my_shares - my_shares_in_chain
435 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
437 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
440 pseudoshare_received = variable.Event()
441 share_received = variable.Event()
442 local_rate_monitor = math.RateMonitor(10*60)
444 class WorkerBridge(worker_interface.WorkerBridge):
446 worker_interface.WorkerBridge.__init__(self)
447 self.new_work_event = current_work.changed
448 self.recent_shares_ts_work = []
450 def get_user_details(self, request):
451 user = request.getUser() if request.getUser() is not None else ''
453 desired_pseudoshare_target = None
455 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
457 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
461 desired_share_target = 2**256 - 1
463 user, min_diff_str = user.rsplit('/', 1)
465 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
469 if random.uniform(0, 100) < args.worker_fee:
470 pubkey_hash = my_pubkey_hash
473 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
475 pubkey_hash = my_pubkey_hash
477 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
479 def preprocess_request(self, request):
480 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
481 return pubkey_hash, desired_share_target, desired_pseudoshare_target
483 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
484 if len(p2p_node.peers) == 0 and net.PERSIST:
485 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
486 if current_work.value['best_share_hash'] is None and net.PERSIST:
487 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
488 if time.time() > current_work2.value['last_update'] + 60:
489 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
491 if current_work.value['mm_chains']:
492 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
493 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
494 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
495 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
499 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
505 share_info, generate_tx = p2pool_data.Share.generate_transaction(
508 previous_share_hash=current_work.value['best_share_hash'],
509 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
510 nonce=random.randrange(2**32),
511 pubkey_hash=pubkey_hash,
512 subsidy=current_work2.value['subsidy'],
513 donation=math.perfect_round(65535*args.donation_percentage/100),
514 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
515 253 if orphans > orphans_recorded_in_chain else
516 254 if doas > doas_recorded_in_chain else
518 )(*get_stale_counts()),
521 block_target=current_work.value['bits'].target,
522 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
523 desired_target=desired_share_target,
524 ref_merkle_link=dict(branch=[], index=0),
528 target = net.PARENT.SANE_MAX_TARGET
529 if desired_pseudoshare_target is None:
530 if len(self.recent_shares_ts_work) == 50:
531 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
532 target = min(target, 2**256//hash_rate)
534 target = min(target, desired_pseudoshare_target)
535 target = max(target, share_info['bits'].target)
536 for aux_work in current_work.value['mm_chains'].itervalues():
537 target = max(target, aux_work['target'])
539 transactions = [generate_tx] + list(current_work2.value['transactions'])
540 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
541 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
543 getwork_time = time.time()
544 merkle_link = current_work2.value['merkle_link']
546 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
547 bitcoin_data.target_to_difficulty(target),
548 bitcoin_data.target_to_difficulty(share_info['bits'].target),
549 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
550 len(current_work2.value['transactions']),
553 ba = bitcoin_getwork.BlockAttempt(
554 version=current_work.value['version'],
555 previous_block=current_work.value['previous_block'],
556 merkle_root=merkle_root,
557 timestamp=current_work2.value['time'],
558 bits=current_work.value['bits'],
562 received_header_hashes = set()
564 def got_response(header, request):
565 user, _, _, _ = self.get_user_details(request)
566 assert header['merkle_root'] == merkle_root
568 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
573 if pow_hash <= header['bits'].target or p2pool.DEBUG:
574 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
575 if pow_hash <= header['bits'].target:
577 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
580 log.err(None, 'Error while processing potential block:')
582 for aux_work, index, hashes in mm_later:
584 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
585 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
586 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
587 bitcoin_data.aux_pow_type.pack(dict(
590 block_hash=header_hash,
591 merkle_link=merkle_link,
593 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
594 parent_block_header=header,
599 if result != (pow_hash <= aux_work['target']):
600 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
602 print 'Merged block submittal result: %s' % (result,)
605 log.err(err, 'Error submitting merged block:')
607 log.err(None, 'Error while processing merged mining POW:')
609 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
610 min_header = dict(header);del min_header['merkle_root']
611 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
612 share = p2pool_data.Share(net, None, dict(
613 min_header=min_header, share_info=share_info, hash_link=hash_link,
614 ref_merkle_link=dict(branch=[], index=0),
615 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
617 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
619 p2pool_data.format_hash(share.hash),
620 p2pool_data.format_hash(share.previous_hash),
621 time.time() - getwork_time,
622 ' DEAD ON ARRIVAL' if not on_time else '',
624 my_share_hashes.add(share.hash)
626 my_doa_share_hashes.add(share.hash)
630 tracker.verified.add(share)
634 if pow_hash <= header['bits'].target or p2pool.DEBUG:
635 for peer in p2p_node.peers.itervalues():
636 peer.sendShares([share])
637 shared_share_hashes.add(share.hash)
639 log.err(None, 'Error forwarding block solution:')
641 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
643 if pow_hash > target:
644 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
645 print ' Hash: %56x' % (pow_hash,)
646 print ' Target: %56x' % (target,)
647 elif header_hash in received_header_hashes:
648 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
650 received_header_hashes.add(header_hash)
652 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
653 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
654 while len(self.recent_shares_ts_work) > 50:
655 self.recent_shares_ts_work.pop(0)
656 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
660 return ba, got_response
662 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
664 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
665 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
667 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
669 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
676 @defer.inlineCallbacks
679 flag = factory.new_block.get_deferred()
681 yield set_real_work1()
684 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
689 print 'Started successfully!'
690 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
691 if args.donation_percentage > 0.51:
692 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
693 elif args.donation_percentage < 0.49:
694 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
696 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
697 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
701 if hasattr(signal, 'SIGALRM'):
702 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
703 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
705 signal.siginterrupt(signal.SIGALRM, False)
706 task.LoopingCall(signal.alarm, 30).start(1)
708 if args.irc_announce:
709 from twisted.words.protocols import irc
710 class IRCClient(irc.IRCClient):
711 nickname = 'p2pool%02i' % (random.randrange(100),)
712 channel = net.ANNOUNCE_CHANNEL
713 def lineReceived(self, line):
716 irc.IRCClient.lineReceived(self, line)
718 irc.IRCClient.signedOn(self)
719 self.factory.resetDelay()
720 self.join(self.channel)
721 @defer.inlineCallbacks
722 def new_share(share):
723 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
724 yield deferral.sleep(random.expovariate(1/60))
725 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
726 if message not in self.recent_messages:
727 self.say(self.channel, message)
728 self._remember_message(message)
729 self.watch_id = tracker.verified.added.watch(new_share)
730 self.recent_messages = []
731 def _remember_message(self, message):
732 self.recent_messages.append(message)
733 while len(self.recent_messages) > 100:
734 self.recent_messages.pop(0)
735 def privmsg(self, user, channel, message):
736 if channel == self.channel:
737 self._remember_message(message)
738 def connectionLost(self, reason):
739 tracker.verified.added.unwatch(self.watch_id)
740 print 'IRC connection lost:', reason.getErrorMessage()
741 class IRCClientFactory(protocol.ReconnectingClientFactory):
743 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
745 @defer.inlineCallbacks
750 yield deferral.sleep(3)
752 if time.time() > current_work2.value['last_update'] + 60:
753 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
755 height = tracker.get_height(current_work.value['best_share_hash'])
756 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
758 len(tracker.verified.shares),
761 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
762 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
764 datums, dt = local_rate_monitor.get_datums_in_last()
765 my_att_s = sum(datum['work']/dt for datum in datums)
766 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
767 math.format(int(my_att_s)),
769 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
770 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
774 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
775 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
776 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
778 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
779 shares, stale_orphan_shares, stale_doa_shares,
780 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
781 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
782 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
784 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
785 math.format(int(real_att_s)),
787 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
790 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
791 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
792 if majority_desired_version not in [0, 1]:
793 print >>sys.stderr, '#'*40
794 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
795 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
796 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
797 print >>sys.stderr, '#'*40
799 if this_str != last_str or time.time() > last_time + 15:
802 last_time = time.time()
808 log.err(None, 'Fatal error:')
811 class FixedArgumentParser(argparse.ArgumentParser):
812 def _read_args_from_files(self, arg_strings):
813 # expand arguments referencing files
815 for arg_string in arg_strings:
817 # for regular arguments, just add them back into the list
818 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
819 new_arg_strings.append(arg_string)
821 # replace arguments referencing files with the file content
824 args_file = open(arg_string[1:])
827 for arg_line in args_file.read().splitlines():
828 for arg in self.convert_arg_line_to_args(arg_line):
829 arg_strings.append(arg)
830 arg_strings = self._read_args_from_files(arg_strings)
831 new_arg_strings.extend(arg_strings)
835 err = sys.exc_info()[1]
838 # return the modified argument list
839 return new_arg_strings
841 def convert_arg_line_to_args(self, arg_line):
842 return [arg for arg in arg_line.split() if arg.strip()]
845 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
847 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
848 parser.add_argument('--version', action='version', version=p2pool.__version__)
849 parser.add_argument('--net',
850 help='use specified network (default: bitcoin)',
851 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
852 parser.add_argument('--testnet',
853 help='''use the network's testnet''',
854 action='store_const', const=True, default=False, dest='testnet')
855 parser.add_argument('--debug',
856 help='enable debugging mode',
857 action='store_const', const=True, default=False, dest='debug')
858 parser.add_argument('-a', '--address',
859 help='generate payouts to this address (default: <address requested from bitcoind>)',
860 type=str, action='store', default=None, dest='address')
861 parser.add_argument('--datadir',
862 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
863 type=str, action='store', default=None, dest='datadir')
864 parser.add_argument('--logfile',
865 help='''log to this file (default: data/<NET>/log)''',
866 type=str, action='store', default=None, dest='logfile')
867 parser.add_argument('--merged',
868 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
869 type=str, action='append', default=[], dest='merged_urls')
870 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
871 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
872 type=float, action='store', default=0.5, dest='donation_percentage')
873 parser.add_argument('--irc-announce',
874 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
875 action='store_true', default=False, dest='irc_announce')
877 p2pool_group = parser.add_argument_group('p2pool interface')
878 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
879 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
880 type=int, action='store', default=None, dest='p2pool_port')
881 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
882 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
883 type=str, action='append', default=[], dest='p2pool_nodes')
884 parser.add_argument('--disable-upnp',
885 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
886 action='store_false', default=True, dest='upnp')
887 p2pool_group.add_argument('--max-conns', metavar='CONNS',
888 help='maximum incoming connections (default: 40)',
889 type=int, action='store', default=40, dest='p2pool_conns')
891 worker_group = parser.add_argument_group('worker interface')
892 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
893 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
894 type=str, action='store', default=None, dest='worker_endpoint')
895 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
896 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
897 type=float, action='store', default=0, dest='worker_fee')
899 bitcoind_group = parser.add_argument_group('bitcoind interface')
900 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
901 help='connect to this address (default: 127.0.0.1)',
902 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
903 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
904 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
905 type=int, action='store', default=None, dest='bitcoind_rpc_port')
906 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
907 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
908 type=int, action='store', default=None, dest='bitcoind_p2p_port')
910 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
911 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
912 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
914 args = parser.parse_args()
919 net_name = args.net_name + ('_testnet' if args.testnet else '')
920 net = networks.nets[net_name]
922 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
923 if not os.path.exists(datadir_path):
924 os.makedirs(datadir_path)
926 if len(args.bitcoind_rpc_userpass) > 2:
927 parser.error('a maximum of two arguments are allowed')
928 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
930 if args.bitcoind_rpc_password is None:
931 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
932 parser.error('This network has no configuration file function. Manually enter your RPC password.')
933 conf_path = net.PARENT.CONF_FILE_FUNC()
934 if not os.path.exists(conf_path):
935 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
936 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
939 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
940 with open(conf_path, 'rb') as f:
941 cp = ConfigParser.RawConfigParser()
942 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
943 for conf_name, var_name, var_type in [
944 ('rpcuser', 'bitcoind_rpc_username', str),
945 ('rpcpassword', 'bitcoind_rpc_password', str),
946 ('rpcport', 'bitcoind_rpc_port', int),
947 ('port', 'bitcoind_p2p_port', int),
949 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
950 setattr(args, var_name, var_type(cp.get('x', conf_name)))
951 if args.bitcoind_rpc_password is None:
952 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
954 if args.bitcoind_rpc_username is None:
955 args.bitcoind_rpc_username = ''
957 if args.bitcoind_rpc_port is None:
958 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
960 if args.bitcoind_p2p_port is None:
961 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
963 if args.p2pool_port is None:
964 args.p2pool_port = net.P2P_PORT
966 if args.worker_endpoint is None:
967 worker_endpoint = '', net.WORKER_PORT
968 elif ':' not in args.worker_endpoint:
969 worker_endpoint = '', int(args.worker_endpoint)
971 addr, port = args.worker_endpoint.rsplit(':', 1)
972 worker_endpoint = addr, int(port)
974 if args.address is not None:
976 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
978 parser.error('error parsing address: ' + repr(e))
980 args.pubkey_hash = None
982 def separate_url(url):
983 s = urlparse.urlsplit(url)
984 if '@' not in s.netloc:
985 parser.error('merged url netloc must contain an "@"')
986 userpass, new_netloc = s.netloc.rsplit('@', 1)
987 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
988 merged_urls = map(separate_url, args.merged_urls)
990 if args.logfile is None:
991 args.logfile = os.path.join(datadir_path, 'log')
993 logfile = logging.LogFile(args.logfile)
994 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
995 sys.stdout = logging.AbortPipe(pipe)
996 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
997 if hasattr(signal, "SIGUSR1"):
998 def sigusr1(signum, frame):
999 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1001 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1002 signal.signal(signal.SIGUSR1, sigusr1)
1003 task.LoopingCall(logfile.reopen).start(5)
1005 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)