1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58 @deferral.retry('Error while checking Bitcoin connection:', 1)
59 @defer.inlineCallbacks
61 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 raise deferral.RetrySilentlyException()
64 temp_work = yield getwork(bitcoind)
65 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
66 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
67 raise deferral.RetrySilentlyException()
68 defer.returnValue(temp_work)
69 temp_work = yield check()
71 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
74 # connect to bitcoind over bitcoin-p2p
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin_p2p.ClientFactory(net.PARENT)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 yield factory.getProtocol() # waits until handshake is successful
82 print 'Determining payout address...'
83 if args.pubkey_hash is None:
84 address_path = os.path.join(datadir_path, 'cached_payout_address')
86 if os.path.exists(address_path):
87 with open(address_path, 'rb') as f:
88 address = f.read().strip('\r\n')
89 print ' Loaded cached address: %s...' % (address,)
93 if address is not None:
94 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95 if not res['isvalid'] or not res['ismine']:
96 print ' Cached address is either invalid or not controlled by local bitcoind!'
100 print ' Getting payout address from bitcoind...'
101 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
103 with open(address_path, 'wb') as f:
106 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
108 my_pubkey_hash = args.pubkey_hash
109 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
112 my_share_hashes = set()
113 my_doa_share_hashes = set()
115 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
116 shared_share_hashes = set()
117 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
118 known_verified = set()
120 print "Loading shares..."
121 for i, (mode, contents) in enumerate(ss.get_shares()):
123 if contents.hash in tracker.shares:
125 shared_share_hashes.add(contents.hash)
126 contents.time_seen = 0
127 tracker.add(contents)
128 if len(tracker.shares) % 1000 == 0 and tracker.shares:
129 print " %i" % (len(tracker.shares),)
130 elif mode == 'verified_hash':
131 known_verified.add(contents)
133 raise AssertionError()
134 print " ...inserting %i verified shares..." % (len(known_verified),)
135 for h in known_verified:
136 if h not in tracker.shares:
137 ss.forget_verified_share(h)
139 tracker.verified.add(tracker.shares[h])
140 print " ...done loading %i shares!" % (len(tracker.shares),)
142 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
143 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
144 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
146 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
148 pre_current_work = variable.Variable(None)
149 pre_merged_work = variable.Variable({})
150 # information affecting work that should trigger a long-polling update
151 current_work = variable.Variable(None)
152 # information affecting work that should not trigger a long-polling update
153 current_work2 = variable.Variable(None)
155 requested = expiring_dict.ExpiringDict(300)
157 print 'Initializing work...'
158 @defer.inlineCallbacks
159 def set_real_work1():
160 work = yield getwork(bitcoind)
161 current_work2.set(dict(
163 transactions=work['transactions'],
164 merkle_link=work['merkle_link'],
165 subsidy=work['subsidy'],
166 clock_offset=time.time() - work['time'],
167 last_update=time.time(),
168 )) # second set first because everything hooks on the first
169 pre_current_work.set(dict(
170 version=work['version'],
171 previous_block=work['previous_block_hash'],
173 coinbaseflags=work['coinbaseflags'],
175 yield set_real_work1()
177 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
179 def set_real_work2():
180 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
182 t = dict(pre_current_work.value)
183 t['best_share_hash'] = best
184 t['mm_chains'] = pre_merged_work.value
188 for peer2, share_hash in desired:
189 if share_hash not in tracker.tails: # was received in the time tracker.think was running
191 last_request_time, count = requested.get(share_hash, (None, 0))
192 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
194 potential_peers = set()
195 for head in tracker.tails[share_hash]:
196 potential_peers.update(peer_heads.get(head, set()))
197 potential_peers = [peer for peer in potential_peers if peer.connected2]
198 if count == 0 and peer2 is not None and peer2.connected2:
201 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
205 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
209 stops=list(set(tracker.heads) | set(
210 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213 requested[share_hash] = t, count + 1
214 pre_current_work.changed.watch(lambda _: set_real_work2())
215 pre_merged_work.changed.watch(lambda _: set_real_work2())
221 @defer.inlineCallbacks
222 def set_merged_work(merged_url, merged_userpass):
223 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
225 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
226 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
227 hash=int(auxblock['hash'], 16),
228 target='p2pool' if auxblock['target'] == 'p2pool' else pack.IntType(256).unpack(auxblock['target'].decode('hex')),
229 merged_proxy=merged_proxy,
231 yield deferral.sleep(1)
232 for merged_url, merged_userpass in merged_urls:
233 set_merged_work(merged_url, merged_userpass)
235 @pre_merged_work.changed.watch
236 def _(new_merged_work):
237 print 'Got new merged mining work!'
239 # setup p2p logic and join p2pool network
241 class Node(p2p.Node):
242 def handle_shares(self, shares, peer):
244 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
248 if share.hash in tracker.shares:
249 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
254 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
258 if shares and peer is not None:
259 peer_heads.setdefault(shares[0].hash, set()).add(peer)
265 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
267 def handle_share_hashes(self, hashes, peer):
270 for share_hash in hashes:
271 if share_hash in tracker.shares:
273 last_request_time, count = requested.get(share_hash, (None, 0))
274 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
276 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
277 get_hashes.append(share_hash)
278 requested[share_hash] = t, count + 1
280 if hashes and peer is not None:
281 peer_heads.setdefault(hashes[0], set()).add(peer)
283 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
285 def handle_get_shares(self, hashes, parents, stops, peer):
286 parents = min(parents, 1000//len(hashes))
289 for share_hash in hashes:
290 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
291 if share.hash in stops:
294 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 @deferral.retry('Error submitting block: (will retry)', 10, 10)
298 @defer.inlineCallbacks
299 def submit_block(block, ignore_failure):
300 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
301 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
302 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
303 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
305 @tracker.verified.added.watch
307 if share.pow_hash <= share.header['bits'].target:
308 submit_block(share.as_block(tracker), ignore_failure=True)
310 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
312 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
314 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
316 @defer.inlineCallbacks
319 ip, port = x.split(':')
320 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
322 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
325 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
327 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
329 print >>sys.stderr, "error reading addrs"
330 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
333 if addr not in addrs:
334 addrs[addr] = (0, time.time(), time.time())
338 connect_addrs = set()
339 for addr_df in map(parse, args.p2pool_nodes):
341 connect_addrs.add((yield addr_df))
346 best_share_hash_func=lambda: current_work.value['best_share_hash'],
347 port=args.p2pool_port,
350 connect_addrs=connect_addrs,
351 max_incoming_conns=args.p2pool_conns,
355 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
357 # send share when the chain changes to their chain
358 def work_changed(new_work):
359 #print 'Work changed:', new_work
361 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
362 if share.hash in shared_share_hashes:
364 shared_share_hashes.add(share.hash)
367 for peer in p2p_node.peers.itervalues():
368 peer.sendShares([share for share in shares if share.peer is not peer])
370 current_work.changed.watch(work_changed)
373 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
375 if share.hash in tracker.verified.shares:
376 ss.add_verified_hash(share.hash)
377 task.LoopingCall(save_shares).start(60)
383 @defer.inlineCallbacks
387 is_lan, lan_ip = yield ipdiscover.get_local_ip()
389 pm = yield portmapper.get_port_mapper()
390 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
391 except defer.TimeoutError:
395 log.err(None, 'UPnP error:')
396 yield deferral.sleep(random.expovariate(1/120))
399 # start listening for workers with a JSON-RPC server
401 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
405 removed_unstales_var = variable.Variable((0, 0, 0))
406 removed_doa_unstales_var = variable.Variable(0)
407 @tracker.verified.removed.watch
409 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
410 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
411 removed_unstales_var.set((
412 removed_unstales_var.value[0] + 1,
413 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
414 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
416 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
417 removed_doa_unstales.set(removed_doa_unstales.value + 1)
419 def get_stale_counts():
420 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
421 my_shares = len(my_share_hashes)
422 my_doa_shares = len(my_doa_share_hashes)
423 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
424 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
425 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
426 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
427 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
429 my_shares_not_in_chain = my_shares - my_shares_in_chain
430 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
432 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
435 pseudoshare_received = variable.Event()
436 share_received = variable.Event()
437 local_rate_monitor = math.RateMonitor(10*60)
439 class WorkerBridge(worker_interface.WorkerBridge):
441 worker_interface.WorkerBridge.__init__(self)
442 self.new_work_event = current_work.changed
443 self.recent_shares_ts_work = []
445 def preprocess_request(self, request):
446 user = request.getUser() if request.getUser() is not None else ''
448 desired_pseudoshare_target = None
450 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
452 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
456 desired_share_target = 2**256 - 1
458 user, min_diff_str = user.rsplit('/', 1)
460 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
464 if random.uniform(0, 100) < args.worker_fee:
465 pubkey_hash = my_pubkey_hash
468 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
470 pubkey_hash = my_pubkey_hash
472 return pubkey_hash, desired_share_target, desired_pseudoshare_target
474 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
475 if len(p2p_node.peers) == 0 and net.PERSIST:
476 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
477 if current_work.value['best_share_hash'] is None and net.PERSIST:
478 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
479 if time.time() > current_work2.value['last_update'] + 60:
480 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
482 if current_work.value['mm_chains']:
483 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
484 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
485 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
486 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
490 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
496 share_info, generate_tx = p2pool_data.Share.generate_transaction(
499 previous_share_hash=current_work.value['best_share_hash'],
500 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
501 nonce=random.randrange(2**32),
502 pubkey_hash=pubkey_hash,
503 subsidy=current_work2.value['subsidy'],
504 donation=math.perfect_round(65535*args.donation_percentage/100),
505 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
506 253 if orphans > orphans_recorded_in_chain else
507 254 if doas > doas_recorded_in_chain else
509 )(*get_stale_counts()),
512 block_target=current_work.value['bits'].target,
513 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
514 desired_target=desired_share_target,
515 ref_merkle_link=dict(branch=[], index=0),
519 mm_later = [(dict(aux_work, target=aux_work['target'] if aux_work['target'] != 'p2pool' else share_info['bits'].target), index, hashes) for aux_work, index, hashes in mm_later]
521 target = net.PARENT.SANE_MAX_TARGET
522 if desired_pseudoshare_target is None:
523 if len(self.recent_shares_ts_work) == 50:
524 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
525 target = min(target, 2**256//hash_rate)
527 target = min(target, desired_pseudoshare_target)
528 target = max(target, share_info['bits'].target)
529 for aux_work, index, hashes in mm_later:
530 target = max(target, aux_work['target'])
532 transactions = [generate_tx] + list(current_work2.value['transactions'])
533 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
534 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
536 getwork_time = time.time()
537 merkle_link = current_work2.value['merkle_link']
539 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
540 bitcoin_data.target_to_difficulty(target),
541 bitcoin_data.target_to_difficulty(share_info['bits'].target),
542 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
543 len(current_work2.value['transactions']),
546 ba = bitcoin_getwork.BlockAttempt(
547 version=current_work.value['version'],
548 previous_block=current_work.value['previous_block'],
549 merkle_root=merkle_root,
550 timestamp=current_work2.value['time'],
551 bits=current_work.value['bits'],
555 received_header_hashes = set()
557 def got_response(header, request):
558 assert header['merkle_root'] == merkle_root
560 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
561 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
562 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
565 if pow_hash <= header['bits'].target or p2pool.DEBUG:
566 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
567 if pow_hash <= header['bits'].target:
569 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
571 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
573 log.err(None, 'Error while processing potential block:')
575 for aux_work, index, hashes in mm_later:
577 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
578 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
579 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
580 bitcoin_data.aux_pow_type.pack(dict(
583 block_hash=header_hash,
584 merkle_link=merkle_link,
586 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
587 parent_block_header=header,
592 if result != (pow_hash <= aux_work['target']):
593 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
595 print 'Merged block submittal result: %s' % (result,)
598 log.err(err, 'Error submitting merged block:')
600 log.err(None, 'Error while processing merged mining POW:')
602 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
603 min_header = dict(header);del min_header['merkle_root']
604 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
605 share = p2pool_data.Share(net, None, dict(
606 min_header=min_header, share_info=share_info, hash_link=hash_link,
607 ref_merkle_link=dict(branch=[], index=0),
608 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
610 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
612 p2pool_data.format_hash(share.hash),
613 p2pool_data.format_hash(share.previous_hash),
614 time.time() - getwork_time,
615 ' DEAD ON ARRIVAL' if not on_time else '',
617 my_share_hashes.add(share.hash)
619 my_doa_share_hashes.add(share.hash)
623 tracker.verified.add(share)
627 if pow_hash <= header['bits'].target or p2pool.DEBUG:
628 for peer in p2p_node.peers.itervalues():
629 peer.sendShares([share])
630 shared_share_hashes.add(share.hash)
632 log.err(None, 'Error forwarding block solution:')
634 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
636 if pow_hash > target:
637 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
638 print ' Hash: %56x' % (pow_hash,)
639 print ' Target: %56x' % (target,)
640 elif header_hash in received_header_hashes:
641 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
643 received_header_hashes.add(header_hash)
645 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
646 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
647 while len(self.recent_shares_ts_work) > 50:
648 self.recent_shares_ts_work.pop(0)
649 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
653 return ba, got_response
655 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
657 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received, share_received)
658 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
660 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
662 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
669 @defer.inlineCallbacks
672 flag = factory.new_block.get_deferred()
674 yield set_real_work1()
677 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
682 print 'Started successfully!'
683 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
684 if args.donation_percentage > 0.51:
685 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
686 elif args.donation_percentage < 0.49:
687 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
689 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
690 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
694 if hasattr(signal, 'SIGALRM'):
695 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
696 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
698 signal.siginterrupt(signal.SIGALRM, False)
699 task.LoopingCall(signal.alarm, 30).start(1)
701 if args.irc_announce:
702 from twisted.words.protocols import irc
703 class IRCClient(irc.IRCClient):
704 nickname = 'p2pool%02i' % (random.randrange(100),)
705 channel = net.ANNOUNCE_CHANNEL
706 def lineReceived(self, line):
709 irc.IRCClient.lineReceived(self, line)
711 irc.IRCClient.signedOn(self)
712 self.factory.resetDelay()
713 self.join(self.channel)
714 @defer.inlineCallbacks
715 def new_share(share):
716 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
717 yield deferral.sleep(random.expovariate(1/60))
718 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
719 if message not in self.recent_messages:
720 self.say(self.channel, message)
721 self._remember_message(message)
722 self.watch_id = tracker.verified.added.watch(new_share)
723 self.recent_messages = []
724 def _remember_message(self, message):
725 self.recent_messages.append(message)
726 while len(self.recent_messages) > 100:
727 self.recent_messages.pop(0)
728 def privmsg(self, user, channel, message):
729 if channel == self.channel:
730 self._remember_message(message)
731 def connectionLost(self, reason):
732 tracker.verified.added.unwatch(self.watch_id)
733 print 'IRC connection lost:', reason.getErrorMessage()
734 class IRCClientFactory(protocol.ReconnectingClientFactory):
736 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
738 @defer.inlineCallbacks
743 yield deferral.sleep(3)
745 if time.time() > current_work2.value['last_update'] + 60:
746 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
748 height = tracker.get_height(current_work.value['best_share_hash'])
749 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
751 len(tracker.verified.shares),
754 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
755 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
757 datums, dt = local_rate_monitor.get_datums_in_last()
758 my_att_s = sum(datum['work']/dt for datum in datums)
759 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
760 math.format(int(my_att_s)),
762 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
763 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
767 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
768 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
769 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
771 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
772 shares, stale_orphan_shares, stale_doa_shares,
773 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
774 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
775 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
777 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
778 math.format(int(real_att_s)),
780 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
783 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
784 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
785 if majority_desired_version not in [0, 1]:
786 print >>sys.stderr, '#'*40
787 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
788 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
789 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
790 print >>sys.stderr, '#'*40
792 if this_str != last_str or time.time() > last_time + 15:
795 last_time = time.time()
801 log.err(None, 'Fatal error:')
804 class FixedArgumentParser(argparse.ArgumentParser):
805 def _read_args_from_files(self, arg_strings):
806 # expand arguments referencing files
808 for arg_string in arg_strings:
810 # for regular arguments, just add them back into the list
811 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
812 new_arg_strings.append(arg_string)
814 # replace arguments referencing files with the file content
817 args_file = open(arg_string[1:])
820 for arg_line in args_file.read().splitlines():
821 for arg in self.convert_arg_line_to_args(arg_line):
822 arg_strings.append(arg)
823 arg_strings = self._read_args_from_files(arg_strings)
824 new_arg_strings.extend(arg_strings)
828 err = sys.exc_info()[1]
831 # return the modified argument list
832 return new_arg_strings
834 def convert_arg_line_to_args(self, arg_line):
835 return [arg for arg in arg_line.split() if arg.strip()]
838 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
840 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
841 parser.add_argument('--version', action='version', version=p2pool.__version__)
842 parser.add_argument('--net',
843 help='use specified network (default: bitcoin)',
844 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
845 parser.add_argument('--testnet',
846 help='''use the network's testnet''',
847 action='store_const', const=True, default=False, dest='testnet')
848 parser.add_argument('--debug',
849 help='enable debugging mode',
850 action='store_const', const=True, default=False, dest='debug')
851 parser.add_argument('-a', '--address',
852 help='generate payouts to this address (default: <address requested from bitcoind>)',
853 type=str, action='store', default=None, dest='address')
854 parser.add_argument('--datadir',
855 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
856 type=str, action='store', default=None, dest='datadir')
857 parser.add_argument('--logfile',
858 help='''log to this file (default: data/<NET>/log)''',
859 type=str, action='store', default=None, dest='logfile')
860 parser.add_argument('--merged',
861 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
862 type=str, action='append', default=[], dest='merged_urls')
863 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
864 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
865 type=float, action='store', default=0.5, dest='donation_percentage')
866 parser.add_argument('--irc-announce',
867 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
868 action='store_true', default=False, dest='irc_announce')
870 p2pool_group = parser.add_argument_group('p2pool interface')
871 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
872 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
873 type=int, action='store', default=None, dest='p2pool_port')
874 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
875 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
876 type=str, action='append', default=[], dest='p2pool_nodes')
877 parser.add_argument('--disable-upnp',
878 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
879 action='store_false', default=True, dest='upnp')
880 p2pool_group.add_argument('--max-conns', metavar='CONNS',
881 help='maximum incoming connections (default: 40)',
882 type=int, action='store', default=40, dest='p2pool_conns')
884 worker_group = parser.add_argument_group('worker interface')
885 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
886 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
887 type=str, action='store', default=None, dest='worker_endpoint')
888 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
889 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
890 type=float, action='store', default=0, dest='worker_fee')
892 bitcoind_group = parser.add_argument_group('bitcoind interface')
893 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
894 help='connect to this address (default: 127.0.0.1)',
895 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
896 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
897 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
898 type=int, action='store', default=None, dest='bitcoind_rpc_port')
899 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
900 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
901 type=int, action='store', default=None, dest='bitcoind_p2p_port')
903 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
904 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
905 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
907 args = parser.parse_args()
912 net_name = args.net_name + ('_testnet' if args.testnet else '')
913 net = networks.nets[net_name]
915 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
916 if not os.path.exists(datadir_path):
917 os.makedirs(datadir_path)
919 if len(args.bitcoind_rpc_userpass) > 2:
920 parser.error('a maximum of two arguments are allowed')
921 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
923 if args.bitcoind_rpc_password is None:
924 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
925 parser.error('This network has no configuration file function. Manually enter your RPC password.')
926 conf_path = net.PARENT.CONF_FILE_FUNC()
927 if not os.path.exists(conf_path):
928 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
929 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
932 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
933 with open(conf_path, 'rb') as f:
934 cp = ConfigParser.RawConfigParser()
935 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
936 for conf_name, var_name, var_type in [
937 ('rpcuser', 'bitcoind_rpc_username', str),
938 ('rpcpassword', 'bitcoind_rpc_password', str),
939 ('rpcport', 'bitcoind_rpc_port', int),
940 ('port', 'bitcoind_p2p_port', int),
942 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
943 setattr(args, var_name, var_type(cp.get('x', conf_name)))
944 if args.bitcoind_rpc_password is None:
945 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
947 if args.bitcoind_rpc_username is None:
948 args.bitcoind_rpc_username = ''
950 if args.bitcoind_rpc_port is None:
951 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
953 if args.bitcoind_p2p_port is None:
954 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
956 if args.p2pool_port is None:
957 args.p2pool_port = net.P2P_PORT
959 if args.worker_endpoint is None:
960 worker_endpoint = '', net.WORKER_PORT
961 elif ':' not in args.worker_endpoint:
962 worker_endpoint = '', int(args.worker_endpoint)
964 addr, port = args.worker_endpoint.rsplit(':', 1)
965 worker_endpoint = addr, int(port)
967 if args.address is not None:
969 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
971 parser.error('error parsing address: ' + repr(e))
973 args.pubkey_hash = None
975 def separate_url(url):
976 s = urlparse.urlsplit(url)
977 if '@' not in s.netloc:
978 parser.error('merged url netloc must contain an "@"')
979 userpass, new_netloc = s.netloc.rsplit('@', 1)
980 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
981 merged_urls = map(separate_url, args.merged_urls)
983 if args.logfile is None:
984 args.logfile = os.path.join(datadir_path, 'log')
986 logfile = logging.LogFile(args.logfile)
987 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
988 sys.stdout = logging.AbortPipe(pipe)
989 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
990 if hasattr(signal, "SIGUSR1"):
991 def sigusr1(signum, frame):
992 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
994 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
995 signal.signal(signal.SIGUSR1, sigusr1)
996 task.LoopingCall(logfile.reopen).start(5)
998 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)