1 from __future__ import division
16 from twisted.internet import iocpreactor
21 print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
37 work = yield bitcoind.rpc_getmemorypool()
38 except jsonrpc.Error, e:
39 if e.code == -32601: # Method not found
40 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41 raise deferral.RetrySilentlyException()
43 packed_transactions = [x.decode('hex') for x in work['transactions']]
44 defer.returnValue(dict(
45 version=work['version'],
46 previous_block_hash=int(work['previousblockhash'], 16),
47 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49 subsidy=work['coinbasevalue'],
51 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58 print 'p2pool (version %s)' % (p2pool.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getmemorypool
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65 @deferral.retry('Error while checking Bitcoin connection:', 1)
66 @defer.inlineCallbacks
68 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70 raise deferral.RetrySilentlyException()
71 temp_work = yield getwork(bitcoind)
72 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74 raise deferral.RetrySilentlyException()
75 defer.returnValue(temp_work)
76 temp_work = yield check()
78 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
81 # connect to bitcoind over bitcoin-p2p
82 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83 factory = bitcoin_p2p.ClientFactory(net.PARENT)
84 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85 yield factory.getProtocol() # waits until handshake is successful
89 print 'Determining payout address...'
90 if args.pubkey_hash is None:
91 address_path = os.path.join(datadir_path, 'cached_payout_address')
93 if os.path.exists(address_path):
94 with open(address_path, 'rb') as f:
95 address = f.read().strip('\r\n')
96 print ' Loaded cached address: %s...' % (address,)
100 if address is not None:
101 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102 if not res['isvalid'] or not res['ismine']:
103 print ' Cached address is either invalid or not controlled by local bitcoind!'
107 print ' Getting payout address from bitcoind...'
108 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110 with open(address_path, 'wb') as f:
113 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115 my_pubkey_hash = args.pubkey_hash
116 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
119 my_share_hashes = set()
120 my_doa_share_hashes = set()
122 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123 shared_share_hashes = set()
124 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125 known_verified = set()
126 print "Loading shares..."
127 for i, (mode, contents) in enumerate(ss.get_shares()):
129 if contents.hash in tracker.shares:
131 shared_share_hashes.add(contents.hash)
132 contents.time_seen = 0
133 tracker.add(contents)
134 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135 print " %i" % (len(tracker.shares),)
136 elif mode == 'verified_hash':
137 known_verified.add(contents)
139 raise AssertionError()
140 print " ...inserting %i verified shares..." % (len(known_verified),)
141 for h in known_verified:
142 if h not in tracker.shares:
143 ss.forget_verified_share(h)
145 tracker.verified.add(tracker.shares[h])
146 print " ...done loading %i shares!" % (len(tracker.shares),)
148 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154 pre_current_work = variable.Variable(None)
155 pre_merged_work = variable.Variable({})
156 # information affecting work that should trigger a long-polling update
157 current_work = variable.Variable(None)
158 # information affecting work that should not trigger a long-polling update
159 current_work2 = variable.Variable(None)
161 requested = expiring_dict.ExpiringDict(300)
163 print 'Initializing work...'
164 @defer.inlineCallbacks
165 def set_real_work1():
166 work = yield getwork(bitcoind)
167 current_work2.set(dict(
169 transactions=work['transactions'],
170 merkle_link=work['merkle_link'],
171 subsidy=work['subsidy'],
172 clock_offset=time.time() - work['time'],
173 last_update=time.time(),
174 )) # second set first because everything hooks on the first
175 pre_current_work.set(dict(
176 version=work['version'],
177 previous_block=work['previous_block_hash'],
179 coinbaseflags=work['coinbaseflags'],
181 yield set_real_work1()
183 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185 def set_real_work2():
186 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188 t = dict(pre_current_work.value)
189 t['best_share_hash'] = best
190 t['mm_chains'] = pre_merged_work.value
194 for peer2, share_hash in desired:
195 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197 last_request_time, count = requested.get(share_hash, (None, 0))
198 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200 potential_peers = set()
201 for head in tracker.tails[share_hash]:
202 potential_peers.update(peer_heads.get(head, set()))
203 potential_peers = [peer for peer in potential_peers if peer.connected2]
204 if count == 0 and peer2 is not None and peer2.connected2:
207 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
211 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
215 stops=list(set(tracker.heads) | set(
216 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
219 requested[share_hash] = t, count + 1
220 pre_current_work.changed.watch(lambda _: set_real_work2())
221 pre_merged_work.changed.watch(lambda _: set_real_work2())
227 @defer.inlineCallbacks
228 def set_merged_work(merged_url, merged_userpass):
229 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233 hash=int(auxblock['hash'], 16),
234 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235 merged_proxy=merged_proxy,
237 yield deferral.sleep(1)
238 for merged_url, merged_userpass in merged_urls:
239 set_merged_work(merged_url, merged_userpass)
241 @pre_merged_work.changed.watch
242 def _(new_merged_work):
243 print 'Got new merged mining work!'
245 # setup p2p logic and join p2pool network
247 class Node(p2p.Node):
248 def handle_shares(self, shares, peer):
250 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
254 if share.hash in tracker.shares:
255 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
264 if shares and peer is not None:
265 peer_heads.setdefault(shares[0].hash, set()).add(peer)
271 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273 def handle_share_hashes(self, hashes, peer):
276 for share_hash in hashes:
277 if share_hash in tracker.shares:
279 last_request_time, count = requested.get(share_hash, (None, 0))
280 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283 get_hashes.append(share_hash)
284 requested[share_hash] = t, count + 1
286 if hashes and peer is not None:
287 peer_heads.setdefault(hashes[0], set()).add(peer)
289 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291 def handle_get_shares(self, hashes, parents, stops, peer):
292 parents = min(parents, 1000//len(hashes))
295 for share_hash in hashes:
296 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297 if share.hash in stops:
300 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
303 @deferral.retry('Error submitting block: (will retry)', 10, 10)
304 @defer.inlineCallbacks
305 def submit_block(block, ignore_failure):
306 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
311 @tracker.verified.added.watch
313 if share.pow_hash <= share.header['bits'].target:
314 submit_block(share.as_block(tracker), ignore_failure=True)
316 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
318 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
319 broadcast_share(share.hash)
321 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
323 @defer.inlineCallbacks
326 ip, port = x.split(':')
327 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
329 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
332 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
334 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
336 print >>sys.stderr, "error reading addrs"
337 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
340 if addr not in addrs:
341 addrs[addr] = (0, time.time(), time.time())
345 connect_addrs = set()
346 for addr_df in map(parse, args.p2pool_nodes):
348 connect_addrs.add((yield addr_df))
353 best_share_hash_func=lambda: current_work.value['best_share_hash'],
354 port=args.p2pool_port,
357 connect_addrs=connect_addrs,
358 max_incoming_conns=args.p2pool_conns,
362 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
364 def broadcast_share(share_hash):
366 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
367 if share.hash in shared_share_hashes:
369 shared_share_hashes.add(share.hash)
372 for peer in p2p_node.peers.itervalues():
373 peer.sendShares([share for share in shares if share.peer is not peer])
375 # send share when the chain changes to their chain
376 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
379 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
381 if share.hash in tracker.verified.shares:
382 ss.add_verified_hash(share.hash)
383 task.LoopingCall(save_shares).start(60)
389 @defer.inlineCallbacks
393 is_lan, lan_ip = yield ipdiscover.get_local_ip()
395 pm = yield portmapper.get_port_mapper()
396 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
397 except defer.TimeoutError:
401 log.err(None, 'UPnP error:')
402 yield deferral.sleep(random.expovariate(1/120))
405 # start listening for workers with a JSON-RPC server
407 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
411 removed_unstales_var = variable.Variable((0, 0, 0))
412 removed_doa_unstales_var = variable.Variable(0)
413 @tracker.verified.removed.watch
415 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
416 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
417 removed_unstales_var.set((
418 removed_unstales_var.value[0] + 1,
419 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
420 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
422 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
423 removed_doa_unstales.set(removed_doa_unstales.value + 1)
425 def get_stale_counts():
426 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
427 my_shares = len(my_share_hashes)
428 my_doa_shares = len(my_doa_share_hashes)
429 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
430 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
431 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
432 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
433 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
435 my_shares_not_in_chain = my_shares - my_shares_in_chain
436 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
438 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
441 pseudoshare_received = variable.Event()
442 share_received = variable.Event()
443 local_rate_monitor = math.RateMonitor(10*60)
445 class WorkerBridge(worker_interface.WorkerBridge):
447 worker_interface.WorkerBridge.__init__(self)
448 self.new_work_event = current_work.changed
449 self.recent_shares_ts_work = []
451 def get_user_details(self, request):
452 user = request.getUser() if request.getUser() is not None else ''
454 desired_pseudoshare_target = None
456 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
458 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
462 desired_share_target = 2**256 - 1
464 user, min_diff_str = user.rsplit('/', 1)
466 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
470 if random.uniform(0, 100) < args.worker_fee:
471 pubkey_hash = my_pubkey_hash
474 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
476 pubkey_hash = my_pubkey_hash
478 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
480 def preprocess_request(self, request):
481 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
482 return pubkey_hash, desired_share_target, desired_pseudoshare_target
484 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
485 if len(p2p_node.peers) == 0 and net.PERSIST:
486 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
487 if current_work.value['best_share_hash'] is None and net.PERSIST:
488 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
489 if time.time() > current_work2.value['last_update'] + 60:
490 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
492 if current_work.value['mm_chains']:
493 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
494 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
495 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
496 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
500 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
506 share_info, generate_tx = p2pool_data.Share.generate_transaction(
509 previous_share_hash=current_work.value['best_share_hash'],
510 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
511 nonce=random.randrange(2**32),
512 pubkey_hash=pubkey_hash,
513 subsidy=current_work2.value['subsidy'],
514 donation=math.perfect_round(65535*args.donation_percentage/100),
515 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
516 253 if orphans > orphans_recorded_in_chain else
517 254 if doas > doas_recorded_in_chain else
519 )(*get_stale_counts()),
522 block_target=current_work.value['bits'].target,
523 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
524 desired_target=desired_share_target,
525 ref_merkle_link=dict(branch=[], index=0),
529 target = net.PARENT.SANE_MAX_TARGET
530 if desired_pseudoshare_target is None:
531 if len(self.recent_shares_ts_work) == 50:
532 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
533 target = min(target, 2**256//hash_rate)
535 target = min(target, desired_pseudoshare_target)
536 target = max(target, share_info['bits'].target)
537 for aux_work in current_work.value['mm_chains'].itervalues():
538 target = max(target, aux_work['target'])
540 transactions = [generate_tx] + list(current_work2.value['transactions'])
541 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
544 getwork_time = time.time()
545 merkle_link = current_work2.value['merkle_link']
547 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548 bitcoin_data.target_to_difficulty(target),
549 bitcoin_data.target_to_difficulty(share_info['bits'].target),
550 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551 len(current_work2.value['transactions']),
554 ba = bitcoin_getwork.BlockAttempt(
555 version=current_work.value['version'],
556 previous_block=current_work.value['previous_block'],
557 merkle_root=merkle_root,
558 timestamp=current_work2.value['time'],
559 bits=current_work.value['bits'],
563 received_header_hashes = set()
565 def got_response(header, request):
566 user, _, _, _ = self.get_user_details(request)
567 assert header['merkle_root'] == merkle_root
569 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
570 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
571 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
574 if pow_hash <= header['bits'].target or p2pool.DEBUG:
575 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
576 if pow_hash <= header['bits'].target:
578 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
581 log.err(None, 'Error while processing potential block:')
583 for aux_work, index, hashes in mm_later:
585 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
586 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
587 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
588 bitcoin_data.aux_pow_type.pack(dict(
591 block_hash=header_hash,
592 merkle_link=merkle_link,
594 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
595 parent_block_header=header,
600 if result != (pow_hash <= aux_work['target']):
601 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
603 print 'Merged block submittal result: %s' % (result,)
606 log.err(err, 'Error submitting merged block:')
608 log.err(None, 'Error while processing merged mining POW:')
610 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
611 min_header = dict(header);del min_header['merkle_root']
612 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
613 share = p2pool_data.Share(net, None, dict(
614 min_header=min_header, share_info=share_info, hash_link=hash_link,
615 ref_merkle_link=dict(branch=[], index=0),
616 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
618 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
620 p2pool_data.format_hash(share.hash),
621 p2pool_data.format_hash(share.previous_hash),
622 time.time() - getwork_time,
623 ' DEAD ON ARRIVAL' if not on_time else '',
625 my_share_hashes.add(share.hash)
627 my_doa_share_hashes.add(share.hash)
631 tracker.verified.add(share)
635 if pow_hash <= header['bits'].target or p2pool.DEBUG:
636 for peer in p2p_node.peers.itervalues():
637 peer.sendShares([share])
638 shared_share_hashes.add(share.hash)
640 log.err(None, 'Error forwarding block solution:')
642 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
644 if pow_hash > target:
645 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
646 print ' Hash: %56x' % (pow_hash,)
647 print ' Target: %56x' % (target,)
648 elif header_hash in received_header_hashes:
649 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
651 received_header_hashes.add(header_hash)
653 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
654 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
655 while len(self.recent_shares_ts_work) > 50:
656 self.recent_shares_ts_work.pop(0)
657 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
661 return ba, got_response
663 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
665 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
666 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
668 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
670 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
677 @defer.inlineCallbacks
680 flag = factory.new_block.get_deferred()
682 yield set_real_work1()
685 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
690 print 'Started successfully!'
691 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
692 if args.donation_percentage > 0.51:
693 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
694 elif args.donation_percentage < 0.49:
695 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
697 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
698 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
702 if hasattr(signal, 'SIGALRM'):
703 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
704 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
706 signal.siginterrupt(signal.SIGALRM, False)
707 task.LoopingCall(signal.alarm, 30).start(1)
709 if args.irc_announce:
710 from twisted.words.protocols import irc
711 class IRCClient(irc.IRCClient):
712 nickname = 'p2pool%02i' % (random.randrange(100),)
713 channel = net.ANNOUNCE_CHANNEL
714 def lineReceived(self, line):
717 irc.IRCClient.lineReceived(self, line)
719 irc.IRCClient.signedOn(self)
720 self.factory.resetDelay()
721 self.join(self.channel)
722 @defer.inlineCallbacks
723 def new_share(share):
724 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
725 yield deferral.sleep(random.expovariate(1/60))
726 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
727 if message not in self.recent_messages:
728 self.say(self.channel, message)
729 self._remember_message(message)
730 self.watch_id = tracker.verified.added.watch(new_share)
731 self.recent_messages = []
732 def _remember_message(self, message):
733 self.recent_messages.append(message)
734 while len(self.recent_messages) > 100:
735 self.recent_messages.pop(0)
736 def privmsg(self, user, channel, message):
737 if channel == self.channel:
738 self._remember_message(message)
739 def connectionLost(self, reason):
740 tracker.verified.added.unwatch(self.watch_id)
741 print 'IRC connection lost:', reason.getErrorMessage()
742 class IRCClientFactory(protocol.ReconnectingClientFactory):
744 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
746 @defer.inlineCallbacks
751 yield deferral.sleep(3)
753 if time.time() > current_work2.value['last_update'] + 60:
754 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
756 height = tracker.get_height(current_work.value['best_share_hash'])
757 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
759 len(tracker.verified.shares),
762 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
763 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
765 datums, dt = local_rate_monitor.get_datums_in_last()
766 my_att_s = sum(datum['work']/dt for datum in datums)
767 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
768 math.format(int(my_att_s)),
770 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
771 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
775 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
776 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
777 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
779 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
780 shares, stale_orphan_shares, stale_doa_shares,
781 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
782 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
783 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
785 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
786 math.format(int(real_att_s)),
788 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
791 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
792 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
793 if majority_desired_version not in [0, 1]:
794 print >>sys.stderr, '#'*40
795 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
796 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
797 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
798 print >>sys.stderr, '#'*40
800 if this_str != last_str or time.time() > last_time + 15:
803 last_time = time.time()
809 log.err(None, 'Fatal error:')
812 class FixedArgumentParser(argparse.ArgumentParser):
813 def _read_args_from_files(self, arg_strings):
814 # expand arguments referencing files
816 for arg_string in arg_strings:
818 # for regular arguments, just add them back into the list
819 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
820 new_arg_strings.append(arg_string)
822 # replace arguments referencing files with the file content
825 args_file = open(arg_string[1:])
828 for arg_line in args_file.read().splitlines():
829 for arg in self.convert_arg_line_to_args(arg_line):
830 arg_strings.append(arg)
831 arg_strings = self._read_args_from_files(arg_strings)
832 new_arg_strings.extend(arg_strings)
836 err = sys.exc_info()[1]
839 # return the modified argument list
840 return new_arg_strings
842 def convert_arg_line_to_args(self, arg_line):
843 return [arg for arg in arg_line.split() if arg.strip()]
846 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
848 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
849 parser.add_argument('--version', action='version', version=p2pool.__version__)
850 parser.add_argument('--net',
851 help='use specified network (default: bitcoin)',
852 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
853 parser.add_argument('--testnet',
854 help='''use the network's testnet''',
855 action='store_const', const=True, default=False, dest='testnet')
856 parser.add_argument('--debug',
857 help='enable debugging mode',
858 action='store_const', const=True, default=False, dest='debug')
859 parser.add_argument('-a', '--address',
860 help='generate payouts to this address (default: <address requested from bitcoind>)',
861 type=str, action='store', default=None, dest='address')
862 parser.add_argument('--datadir',
863 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
864 type=str, action='store', default=None, dest='datadir')
865 parser.add_argument('--logfile',
866 help='''log to this file (default: data/<NET>/log)''',
867 type=str, action='store', default=None, dest='logfile')
868 parser.add_argument('--merged',
869 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
870 type=str, action='append', default=[], dest='merged_urls')
871 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
872 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
873 type=float, action='store', default=0.5, dest='donation_percentage')
874 parser.add_argument('--irc-announce',
875 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
876 action='store_true', default=False, dest='irc_announce')
878 p2pool_group = parser.add_argument_group('p2pool interface')
879 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
880 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
881 type=int, action='store', default=None, dest='p2pool_port')
882 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
883 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
884 type=str, action='append', default=[], dest='p2pool_nodes')
885 parser.add_argument('--disable-upnp',
886 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
887 action='store_false', default=True, dest='upnp')
888 p2pool_group.add_argument('--max-conns', metavar='CONNS',
889 help='maximum incoming connections (default: 40)',
890 type=int, action='store', default=40, dest='p2pool_conns')
892 worker_group = parser.add_argument_group('worker interface')
893 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
894 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
895 type=str, action='store', default=None, dest='worker_endpoint')
896 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
897 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
898 type=float, action='store', default=0, dest='worker_fee')
900 bitcoind_group = parser.add_argument_group('bitcoind interface')
901 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
902 help='connect to this address (default: 127.0.0.1)',
903 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
904 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
905 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
906 type=int, action='store', default=None, dest='bitcoind_rpc_port')
907 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
908 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
909 type=int, action='store', default=None, dest='bitcoind_p2p_port')
911 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
912 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
913 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
915 args = parser.parse_args()
920 net_name = args.net_name + ('_testnet' if args.testnet else '')
921 net = networks.nets[net_name]
923 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
924 if not os.path.exists(datadir_path):
925 os.makedirs(datadir_path)
927 if len(args.bitcoind_rpc_userpass) > 2:
928 parser.error('a maximum of two arguments are allowed')
929 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
931 if args.bitcoind_rpc_password is None:
932 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
933 parser.error('This network has no configuration file function. Manually enter your RPC password.')
934 conf_path = net.PARENT.CONF_FILE_FUNC()
935 if not os.path.exists(conf_path):
936 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
937 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
940 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
941 with open(conf_path, 'rb') as f:
942 cp = ConfigParser.RawConfigParser()
943 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
944 for conf_name, var_name, var_type in [
945 ('rpcuser', 'bitcoind_rpc_username', str),
946 ('rpcpassword', 'bitcoind_rpc_password', str),
947 ('rpcport', 'bitcoind_rpc_port', int),
948 ('port', 'bitcoind_p2p_port', int),
950 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
951 setattr(args, var_name, var_type(cp.get('x', conf_name)))
952 if args.bitcoind_rpc_password is None:
953 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
955 if args.bitcoind_rpc_username is None:
956 args.bitcoind_rpc_username = ''
958 if args.bitcoind_rpc_port is None:
959 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
961 if args.bitcoind_p2p_port is None:
962 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
964 if args.p2pool_port is None:
965 args.p2pool_port = net.P2P_PORT
967 if args.worker_endpoint is None:
968 worker_endpoint = '', net.WORKER_PORT
969 elif ':' not in args.worker_endpoint:
970 worker_endpoint = '', int(args.worker_endpoint)
972 addr, port = args.worker_endpoint.rsplit(':', 1)
973 worker_endpoint = addr, int(port)
975 if args.address is not None:
977 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
979 parser.error('error parsing address: ' + repr(e))
981 args.pubkey_hash = None
983 def separate_url(url):
984 s = urlparse.urlsplit(url)
985 if '@' not in s.netloc:
986 parser.error('merged url netloc must contain an "@"')
987 userpass, new_netloc = s.netloc.rsplit('@', 1)
988 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
989 merged_urls = map(separate_url, args.merged_urls)
991 if args.logfile is None:
992 args.logfile = os.path.join(datadir_path, 'log')
994 logfile = logging.LogFile(args.logfile)
995 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
996 sys.stdout = logging.AbortPipe(pipe)
997 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
998 if hasattr(signal, "SIGUSR1"):
999 def sigusr1(signum, frame):
1000 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1002 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1003 signal.signal(signal.SIGUSR1, sigusr1)
1004 task.LoopingCall(logfile.reopen).start(5)
1006 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)