1 from __future__ import division
16 from twisted.internet import iocpreactor
21 print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
37 work = yield bitcoind.rpc_getmemorypool()
38 except jsonrpc.Error, e:
39 if e.code == -32601: # Method not found
40 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41 raise deferral.RetrySilentlyException()
43 packed_transactions = [x.decode('hex') for x in work['transactions']]
44 defer.returnValue(dict(
45 version=work['version'],
46 previous_block_hash=int(work['previousblockhash'], 16),
47 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49 subsidy=work['coinbasevalue'],
51 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58 print 'p2pool (version %s)' % (p2pool.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getmemorypool
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65 @deferral.retry('Error while checking Bitcoin connection:', 1)
66 @defer.inlineCallbacks
68 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70 raise deferral.RetrySilentlyException()
71 temp_work = yield getwork(bitcoind)
72 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74 raise deferral.RetrySilentlyException()
75 defer.returnValue(temp_work)
76 temp_work = yield check()
78 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
81 # connect to bitcoind over bitcoin-p2p
82 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83 factory = bitcoin_p2p.ClientFactory(net.PARENT)
84 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85 yield factory.getProtocol() # waits until handshake is successful
89 print 'Determining payout address...'
90 if args.pubkey_hash is None:
91 address_path = os.path.join(datadir_path, 'cached_payout_address')
93 if os.path.exists(address_path):
94 with open(address_path, 'rb') as f:
95 address = f.read().strip('\r\n')
96 print ' Loaded cached address: %s...' % (address,)
100 if address is not None:
101 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102 if not res['isvalid'] or not res['ismine']:
103 print ' Cached address is either invalid or not controlled by local bitcoind!'
107 print ' Getting payout address from bitcoind...'
108 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110 with open(address_path, 'wb') as f:
113 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115 my_pubkey_hash = args.pubkey_hash
116 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
119 my_share_hashes = set()
120 my_doa_share_hashes = set()
122 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123 shared_share_hashes = set()
124 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125 known_verified = set()
126 print "Loading shares..."
127 for i, (mode, contents) in enumerate(ss.get_shares()):
129 if contents.hash in tracker.shares:
131 shared_share_hashes.add(contents.hash)
132 contents.time_seen = 0
133 tracker.add(contents)
134 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135 print " %i" % (len(tracker.shares),)
136 elif mode == 'verified_hash':
137 known_verified.add(contents)
139 raise AssertionError()
140 print " ...inserting %i verified shares..." % (len(known_verified),)
141 for h in known_verified:
142 if h not in tracker.shares:
143 ss.forget_verified_share(h)
145 tracker.verified.add(tracker.shares[h])
146 print " ...done loading %i shares!" % (len(tracker.shares),)
148 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154 pre_current_work = variable.Variable(None)
155 pre_merged_work = variable.Variable({})
156 # information affecting work that should trigger a long-polling update
157 current_work = variable.Variable(None)
158 # information affecting work that should not trigger a long-polling update
159 current_work2 = variable.Variable(None)
161 requested = expiring_dict.ExpiringDict(300)
163 print 'Initializing work...'
164 @defer.inlineCallbacks
165 def set_real_work1():
166 work = yield getwork(bitcoind)
167 current_work2.set(dict(
169 transactions=work['transactions'],
170 merkle_link=work['merkle_link'],
171 subsidy=work['subsidy'],
172 clock_offset=time.time() - work['time'],
173 last_update=time.time(),
174 )) # second set first because everything hooks on the first
175 pre_current_work.set(dict(
176 version=work['version'],
177 previous_block=work['previous_block_hash'],
179 coinbaseflags=work['coinbaseflags'],
181 yield set_real_work1()
183 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185 def set_real_work2():
186 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188 t = dict(pre_current_work.value)
189 t['best_share_hash'] = best
190 t['mm_chains'] = pre_merged_work.value
194 for peer2, share_hash in desired:
195 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197 last_request_time, count = requested.get(share_hash, (None, 0))
198 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200 potential_peers = set()
201 for head in tracker.tails[share_hash]:
202 potential_peers.update(peer_heads.get(head, set()))
203 potential_peers = [peer for peer in potential_peers if peer.connected2]
204 if count == 0 and peer2 is not None and peer2.connected2:
207 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
211 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
215 stops=list(set(tracker.heads) | set(
216 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
219 requested[share_hash] = t, count + 1
220 pre_current_work.changed.watch(lambda _: set_real_work2())
221 pre_merged_work.changed.watch(lambda _: set_real_work2())
227 @defer.inlineCallbacks
228 def set_merged_work(merged_url, merged_userpass):
229 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233 hash=int(auxblock['hash'], 16),
234 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235 merged_proxy=merged_proxy,
237 yield deferral.sleep(1)
238 for merged_url, merged_userpass in merged_urls:
239 set_merged_work(merged_url, merged_userpass)
241 @pre_merged_work.changed.watch
242 def _(new_merged_work):
243 print 'Got new merged mining work!'
245 # setup p2p logic and join p2pool network
247 class Node(p2p.Node):
248 def handle_shares(self, shares, peer):
250 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
254 if share.hash in tracker.shares:
255 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
264 if shares and peer is not None:
265 peer_heads.setdefault(shares[0].hash, set()).add(peer)
271 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273 def handle_share_hashes(self, hashes, peer):
276 for share_hash in hashes:
277 if share_hash in tracker.shares:
279 last_request_time, count = requested.get(share_hash, (None, 0))
280 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283 get_hashes.append(share_hash)
284 requested[share_hash] = t, count + 1
286 if hashes and peer is not None:
287 peer_heads.setdefault(hashes[0], set()).add(peer)
289 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291 def handle_get_shares(self, hashes, parents, stops, peer):
292 parents = min(parents, 1000//len(hashes))
295 for share_hash in hashes:
296 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297 if share.hash in stops:
300 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
303 @deferral.retry('Error submitting block: (will retry)', 10, 10)
304 @defer.inlineCallbacks
305 def submit_block(block, ignore_failure):
306 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
311 @tracker.verified.added.watch
313 if share.pow_hash <= share.header['bits'].target:
314 submit_block(share.as_block(tracker), ignore_failure=True)
316 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
319 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
321 @defer.inlineCallbacks
324 ip, port = x.split(':')
325 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
327 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
330 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
332 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
334 print >>sys.stderr, "error reading addrs"
335 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
338 if addr not in addrs:
339 addrs[addr] = (0, time.time(), time.time())
343 connect_addrs = set()
344 for addr_df in map(parse, args.p2pool_nodes):
346 connect_addrs.add((yield addr_df))
351 best_share_hash_func=lambda: current_work.value['best_share_hash'],
352 port=args.p2pool_port,
355 connect_addrs=connect_addrs,
356 max_incoming_conns=args.p2pool_conns,
360 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
362 # send share when the chain changes to their chain
363 def work_changed(new_work):
364 #print 'Work changed:', new_work
366 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
367 if share.hash in shared_share_hashes:
369 shared_share_hashes.add(share.hash)
372 for peer in p2p_node.peers.itervalues():
373 peer.sendShares([share for share in shares if share.peer is not peer])
375 current_work.changed.watch(work_changed)
378 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
380 if share.hash in tracker.verified.shares:
381 ss.add_verified_hash(share.hash)
382 task.LoopingCall(save_shares).start(60)
388 @defer.inlineCallbacks
392 is_lan, lan_ip = yield ipdiscover.get_local_ip()
394 pm = yield portmapper.get_port_mapper()
395 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
396 except defer.TimeoutError:
400 log.err(None, 'UPnP error:')
401 yield deferral.sleep(random.expovariate(1/120))
404 # start listening for workers with a JSON-RPC server
406 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
410 removed_unstales_var = variable.Variable((0, 0, 0))
411 removed_doa_unstales_var = variable.Variable(0)
412 @tracker.verified.removed.watch
414 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
416 removed_unstales_var.set((
417 removed_unstales_var.value[0] + 1,
418 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
419 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
421 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
422 removed_doa_unstales.set(removed_doa_unstales.value + 1)
424 def get_stale_counts():
425 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
426 my_shares = len(my_share_hashes)
427 my_doa_shares = len(my_doa_share_hashes)
428 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
429 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
430 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
431 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
432 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
434 my_shares_not_in_chain = my_shares - my_shares_in_chain
435 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
437 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
440 pseudoshare_received = variable.Event()
441 share_received = variable.Event()
442 local_rate_monitor = math.RateMonitor(10*60)
444 class WorkerBridge(worker_interface.WorkerBridge):
446 worker_interface.WorkerBridge.__init__(self)
447 self.new_work_event = current_work.changed
448 self.recent_shares_ts_work = []
450 def preprocess_request(self, request):
451 user = request.getUser() if request.getUser() is not None else ''
453 desired_pseudoshare_target = None
455 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
457 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
461 desired_share_target = 2**256 - 1
463 user, min_diff_str = user.rsplit('/', 1)
465 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
469 if random.uniform(0, 100) < args.worker_fee:
470 pubkey_hash = my_pubkey_hash
473 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
475 pubkey_hash = my_pubkey_hash
477 return pubkey_hash, desired_share_target, desired_pseudoshare_target
479 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
480 if len(p2p_node.peers) == 0 and net.PERSIST:
481 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
482 if current_work.value['best_share_hash'] is None and net.PERSIST:
483 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
484 if time.time() > current_work2.value['last_update'] + 60:
485 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
487 if current_work.value['mm_chains']:
488 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
489 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
490 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
491 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
495 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
501 share_info, generate_tx = p2pool_data.Share.generate_transaction(
504 previous_share_hash=current_work.value['best_share_hash'],
505 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
506 nonce=random.randrange(2**32),
507 pubkey_hash=pubkey_hash,
508 subsidy=current_work2.value['subsidy'],
509 donation=math.perfect_round(65535*args.donation_percentage/100),
510 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
511 253 if orphans > orphans_recorded_in_chain else
512 254 if doas > doas_recorded_in_chain else
514 )(*get_stale_counts()),
517 block_target=current_work.value['bits'].target,
518 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
519 desired_target=desired_share_target,
520 ref_merkle_link=dict(branch=[], index=0),
524 target = net.PARENT.SANE_MAX_TARGET
525 if desired_pseudoshare_target is None:
526 if len(self.recent_shares_ts_work) == 50:
527 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
528 target = min(target, 2**256//hash_rate)
530 target = min(target, desired_pseudoshare_target)
531 target = max(target, share_info['bits'].target)
532 for aux_work in current_work.value['mm_chains'].itervalues():
533 target = max(target, aux_work['target'])
535 transactions = [generate_tx] + list(current_work2.value['transactions'])
536 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
537 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
539 getwork_time = time.time()
540 merkle_link = current_work2.value['merkle_link']
542 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
543 bitcoin_data.target_to_difficulty(target),
544 bitcoin_data.target_to_difficulty(share_info['bits'].target),
545 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
546 len(current_work2.value['transactions']),
549 ba = bitcoin_getwork.BlockAttempt(
550 version=current_work.value['version'],
551 previous_block=current_work.value['previous_block'],
552 merkle_root=merkle_root,
553 timestamp=current_work2.value['time'],
554 bits=current_work.value['bits'],
558 received_header_hashes = set()
560 def got_response(header, request):
561 assert header['merkle_root'] == merkle_root
563 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
564 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
565 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
568 if pow_hash <= header['bits'].target or p2pool.DEBUG:
569 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
570 if pow_hash <= header['bits'].target:
572 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
575 log.err(None, 'Error while processing potential block:')
577 for aux_work, index, hashes in mm_later:
579 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
580 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
581 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
582 bitcoin_data.aux_pow_type.pack(dict(
585 block_hash=header_hash,
586 merkle_link=merkle_link,
588 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
589 parent_block_header=header,
594 if result != (pow_hash <= aux_work['target']):
595 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
597 print 'Merged block submittal result: %s' % (result,)
600 log.err(err, 'Error submitting merged block:')
602 log.err(None, 'Error while processing merged mining POW:')
604 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
605 min_header = dict(header);del min_header['merkle_root']
606 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
607 share = p2pool_data.Share(net, None, dict(
608 min_header=min_header, share_info=share_info, hash_link=hash_link,
609 ref_merkle_link=dict(branch=[], index=0),
610 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
612 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
614 p2pool_data.format_hash(share.hash),
615 p2pool_data.format_hash(share.previous_hash),
616 time.time() - getwork_time,
617 ' DEAD ON ARRIVAL' if not on_time else '',
619 my_share_hashes.add(share.hash)
621 my_doa_share_hashes.add(share.hash)
625 tracker.verified.add(share)
629 if pow_hash <= header['bits'].target or p2pool.DEBUG:
630 for peer in p2p_node.peers.itervalues():
631 peer.sendShares([share])
632 shared_share_hashes.add(share.hash)
634 log.err(None, 'Error forwarding block solution:')
636 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
638 if pow_hash > target:
639 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
640 print ' Hash: %56x' % (pow_hash,)
641 print ' Target: %56x' % (target,)
642 elif header_hash in received_header_hashes:
643 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
645 received_header_hashes.add(header_hash)
647 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
648 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
649 while len(self.recent_shares_ts_work) > 50:
650 self.recent_shares_ts_work.pop(0)
651 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
655 return ba, got_response
657 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
659 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
660 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
662 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
664 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
671 @defer.inlineCallbacks
674 flag = factory.new_block.get_deferred()
676 yield set_real_work1()
679 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
684 print 'Started successfully!'
685 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
686 if args.donation_percentage > 0.51:
687 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
688 elif args.donation_percentage < 0.49:
689 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
691 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
692 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
696 if hasattr(signal, 'SIGALRM'):
697 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
698 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
700 signal.siginterrupt(signal.SIGALRM, False)
701 task.LoopingCall(signal.alarm, 30).start(1)
703 if args.irc_announce:
704 from twisted.words.protocols import irc
705 class IRCClient(irc.IRCClient):
706 nickname = 'p2pool%02i' % (random.randrange(100),)
707 channel = net.ANNOUNCE_CHANNEL
708 def lineReceived(self, line):
711 irc.IRCClient.lineReceived(self, line)
713 irc.IRCClient.signedOn(self)
714 self.factory.resetDelay()
715 self.join(self.channel)
716 @defer.inlineCallbacks
717 def new_share(share):
718 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
719 yield deferral.sleep(random.expovariate(1/60))
720 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
721 if message not in self.recent_messages:
722 self.say(self.channel, message)
723 self._remember_message(message)
724 self.watch_id = tracker.verified.added.watch(new_share)
725 self.recent_messages = []
726 def _remember_message(self, message):
727 self.recent_messages.append(message)
728 while len(self.recent_messages) > 100:
729 self.recent_messages.pop(0)
730 def privmsg(self, user, channel, message):
731 if channel == self.channel:
732 self._remember_message(message)
733 def connectionLost(self, reason):
734 tracker.verified.added.unwatch(self.watch_id)
735 print 'IRC connection lost:', reason.getErrorMessage()
736 class IRCClientFactory(protocol.ReconnectingClientFactory):
738 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
740 @defer.inlineCallbacks
745 yield deferral.sleep(3)
747 if time.time() > current_work2.value['last_update'] + 60:
748 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
750 height = tracker.get_height(current_work.value['best_share_hash'])
751 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
753 len(tracker.verified.shares),
756 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
757 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
759 datums, dt = local_rate_monitor.get_datums_in_last()
760 my_att_s = sum(datum['work']/dt for datum in datums)
761 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
762 math.format(int(my_att_s)),
764 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
765 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
769 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
770 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
771 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
773 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
774 shares, stale_orphan_shares, stale_doa_shares,
775 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
776 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
777 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
779 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
780 math.format(int(real_att_s)),
782 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
785 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
786 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
787 if majority_desired_version not in [0, 1]:
788 print >>sys.stderr, '#'*40
789 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
790 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
791 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
792 print >>sys.stderr, '#'*40
794 if this_str != last_str or time.time() > last_time + 15:
797 last_time = time.time()
803 log.err(None, 'Fatal error:')
806 class FixedArgumentParser(argparse.ArgumentParser):
807 def _read_args_from_files(self, arg_strings):
808 # expand arguments referencing files
810 for arg_string in arg_strings:
812 # for regular arguments, just add them back into the list
813 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
814 new_arg_strings.append(arg_string)
816 # replace arguments referencing files with the file content
819 args_file = open(arg_string[1:])
822 for arg_line in args_file.read().splitlines():
823 for arg in self.convert_arg_line_to_args(arg_line):
824 arg_strings.append(arg)
825 arg_strings = self._read_args_from_files(arg_strings)
826 new_arg_strings.extend(arg_strings)
830 err = sys.exc_info()[1]
833 # return the modified argument list
834 return new_arg_strings
836 def convert_arg_line_to_args(self, arg_line):
837 return [arg for arg in arg_line.split() if arg.strip()]
840 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
842 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
843 parser.add_argument('--version', action='version', version=p2pool.__version__)
844 parser.add_argument('--net',
845 help='use specified network (default: bitcoin)',
846 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
847 parser.add_argument('--testnet',
848 help='''use the network's testnet''',
849 action='store_const', const=True, default=False, dest='testnet')
850 parser.add_argument('--debug',
851 help='enable debugging mode',
852 action='store_const', const=True, default=False, dest='debug')
853 parser.add_argument('-a', '--address',
854 help='generate payouts to this address (default: <address requested from bitcoind>)',
855 type=str, action='store', default=None, dest='address')
856 parser.add_argument('--datadir',
857 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
858 type=str, action='store', default=None, dest='datadir')
859 parser.add_argument('--logfile',
860 help='''log to this file (default: data/<NET>/log)''',
861 type=str, action='store', default=None, dest='logfile')
862 parser.add_argument('--merged',
863 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
864 type=str, action='append', default=[], dest='merged_urls')
865 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
866 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
867 type=float, action='store', default=0.5, dest='donation_percentage')
868 parser.add_argument('--irc-announce',
869 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
870 action='store_true', default=False, dest='irc_announce')
872 p2pool_group = parser.add_argument_group('p2pool interface')
873 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
874 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
875 type=int, action='store', default=None, dest='p2pool_port')
876 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
877 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
878 type=str, action='append', default=[], dest='p2pool_nodes')
879 parser.add_argument('--disable-upnp',
880 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
881 action='store_false', default=True, dest='upnp')
882 p2pool_group.add_argument('--max-conns', metavar='CONNS',
883 help='maximum incoming connections (default: 40)',
884 type=int, action='store', default=40, dest='p2pool_conns')
886 worker_group = parser.add_argument_group('worker interface')
887 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
888 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
889 type=str, action='store', default=None, dest='worker_endpoint')
890 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
891 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
892 type=float, action='store', default=0, dest='worker_fee')
894 bitcoind_group = parser.add_argument_group('bitcoind interface')
895 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
896 help='connect to this address (default: 127.0.0.1)',
897 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
898 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
899 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
900 type=int, action='store', default=None, dest='bitcoind_rpc_port')
901 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
902 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
903 type=int, action='store', default=None, dest='bitcoind_p2p_port')
905 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
906 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
907 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
909 args = parser.parse_args()
914 net_name = args.net_name + ('_testnet' if args.testnet else '')
915 net = networks.nets[net_name]
917 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
918 if not os.path.exists(datadir_path):
919 os.makedirs(datadir_path)
921 if len(args.bitcoind_rpc_userpass) > 2:
922 parser.error('a maximum of two arguments are allowed')
923 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
925 if args.bitcoind_rpc_password is None:
926 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
927 parser.error('This network has no configuration file function. Manually enter your RPC password.')
928 conf_path = net.PARENT.CONF_FILE_FUNC()
929 if not os.path.exists(conf_path):
930 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
931 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
934 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
935 with open(conf_path, 'rb') as f:
936 cp = ConfigParser.RawConfigParser()
937 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
938 for conf_name, var_name, var_type in [
939 ('rpcuser', 'bitcoind_rpc_username', str),
940 ('rpcpassword', 'bitcoind_rpc_password', str),
941 ('rpcport', 'bitcoind_rpc_port', int),
942 ('port', 'bitcoind_p2p_port', int),
944 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
945 setattr(args, var_name, var_type(cp.get('x', conf_name)))
946 if args.bitcoind_rpc_password is None:
947 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
949 if args.bitcoind_rpc_username is None:
950 args.bitcoind_rpc_username = ''
952 if args.bitcoind_rpc_port is None:
953 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
955 if args.bitcoind_p2p_port is None:
956 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
958 if args.p2pool_port is None:
959 args.p2pool_port = net.P2P_PORT
961 if args.worker_endpoint is None:
962 worker_endpoint = '', net.WORKER_PORT
963 elif ':' not in args.worker_endpoint:
964 worker_endpoint = '', int(args.worker_endpoint)
966 addr, port = args.worker_endpoint.rsplit(':', 1)
967 worker_endpoint = addr, int(port)
969 if args.address is not None:
971 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
973 parser.error('error parsing address: ' + repr(e))
975 args.pubkey_hash = None
977 def separate_url(url):
978 s = urlparse.urlsplit(url)
979 if '@' not in s.netloc:
980 parser.error('merged url netloc must contain an "@"')
981 userpass, new_netloc = s.netloc.rsplit('@', 1)
982 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
983 merged_urls = map(separate_url, args.merged_urls)
985 if args.logfile is None:
986 args.logfile = os.path.join(datadir_path, 'log')
988 logfile = logging.LogFile(args.logfile)
989 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
990 sys.stdout = logging.AbortPipe(pipe)
991 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
992 if hasattr(signal, "SIGUSR1"):
993 def sigusr1(signum, frame):
994 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
996 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
997 signal.signal(signal.SIGUSR1, sigusr1)
998 task.LoopingCall(logfile.reopen).start(5)
1000 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)