1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58 @deferral.retry('Error while checking Bitcoin connection:', 1)
59 @defer.inlineCallbacks
61 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 raise deferral.RetrySilentlyException()
64 v = (yield bitcoind.rpc_getinfo())['version']
65 temp_work = yield getwork(bitcoind)
66 if not net.VERSION_CHECK((v//10000, v//100%100, v%100), temp_work):
67 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
68 raise deferral.RetrySilentlyException()
69 defer.returnValue(temp_work)
70 temp_work = yield check()
72 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
75 # connect to bitcoind over bitcoin-p2p
76 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
77 factory = bitcoin_p2p.ClientFactory(net.PARENT)
78 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
79 yield factory.getProtocol() # waits until handshake is successful
83 print 'Determining payout address...'
84 if args.pubkey_hash is None:
85 address_path = os.path.join(datadir_path, 'cached_payout_address')
87 if os.path.exists(address_path):
88 with open(address_path, 'rb') as f:
89 address = f.read().strip('\r\n')
90 print ' Loaded cached address: %s...' % (address,)
94 if address is not None:
95 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
96 if not res['isvalid'] or not res['ismine']:
97 print ' Cached address is either invalid or not controlled by local bitcoind!'
101 print ' Getting payout address from bitcoind...'
102 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
104 with open(address_path, 'wb') as f:
107 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
109 my_pubkey_hash = args.pubkey_hash
110 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
113 my_share_hashes = set()
114 my_doa_share_hashes = set()
116 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
117 shared_share_hashes = set()
118 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
119 known_verified = set()
121 print "Loading shares..."
122 for i, (mode, contents) in enumerate(ss.get_shares()):
124 if contents.hash in tracker.shares:
126 shared_share_hashes.add(contents.hash)
127 contents.time_seen = 0
128 tracker.add(contents)
129 if len(tracker.shares) % 1000 == 0 and tracker.shares:
130 print " %i" % (len(tracker.shares),)
131 elif mode == 'verified_hash':
132 known_verified.add(contents)
134 raise AssertionError()
135 print " ...inserting %i verified shares..." % (len(known_verified),)
136 for h in known_verified:
137 if h not in tracker.shares:
138 ss.forget_verified_share(h)
140 tracker.verified.add(tracker.shares[h])
141 print " ...done loading %i shares!" % (len(tracker.shares),)
143 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
144 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
145 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
147 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
149 pre_current_work = variable.Variable(None)
150 pre_merged_work = variable.Variable({})
151 # information affecting work that should trigger a long-polling update
152 current_work = variable.Variable(None)
153 # information affecting work that should not trigger a long-polling update
154 current_work2 = variable.Variable(None)
156 requested = expiring_dict.ExpiringDict(300)
158 print 'Initializing work...'
159 @defer.inlineCallbacks
160 def set_real_work1():
161 work = yield getwork(bitcoind)
162 current_work2.set(dict(
164 transactions=work['transactions'],
165 merkle_link=work['merkle_link'],
166 subsidy=work['subsidy'],
167 clock_offset=time.time() - work['time'],
168 last_update=time.time(),
169 )) # second set first because everything hooks on the first
170 pre_current_work.set(dict(
171 version=work['version'],
172 previous_block=work['previous_block_hash'],
174 coinbaseflags=work['coinbaseflags'],
176 yield set_real_work1()
178 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
180 def set_real_work2():
181 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
183 t = dict(pre_current_work.value)
184 t['best_share_hash'] = best
185 t['mm_chains'] = pre_merged_work.value
189 for peer2, share_hash in desired:
190 if share_hash not in tracker.tails: # was received in the time tracker.think was running
192 last_request_time, count = requested.get(share_hash, (None, 0))
193 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
195 potential_peers = set()
196 for head in tracker.tails[share_hash]:
197 potential_peers.update(peer_heads.get(head, set()))
198 potential_peers = [peer for peer in potential_peers if peer.connected2]
199 if count == 0 and peer2 is not None and peer2.connected2:
202 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
206 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
210 stops=list(set(tracker.heads) | set(
211 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
214 requested[share_hash] = t, count + 1
215 pre_current_work.changed.watch(lambda _: set_real_work2())
216 pre_merged_work.changed.watch(lambda _: set_real_work2())
222 @defer.inlineCallbacks
223 def set_merged_work(merged_url, merged_userpass):
224 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
226 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
227 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
228 hash=int(auxblock['hash'], 16),
229 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
230 merged_proxy=merged_proxy,
232 yield deferral.sleep(1)
233 for merged_url, merged_userpass in merged_urls:
234 set_merged_work(merged_url, merged_userpass)
236 @pre_merged_work.changed.watch
237 def _(new_merged_work):
238 print 'Got new merged mining work!'
240 # setup p2p logic and join p2pool network
242 class Node(p2p.Node):
243 def handle_shares(self, shares, peer):
245 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
249 if share.hash in tracker.shares:
250 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
255 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
259 if shares and peer is not None:
260 peer_heads.setdefault(shares[0].hash, set()).add(peer)
266 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268 def handle_share_hashes(self, hashes, peer):
271 for share_hash in hashes:
272 if share_hash in tracker.shares:
274 last_request_time, count = requested.get(share_hash, (None, 0))
275 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
278 get_hashes.append(share_hash)
279 requested[share_hash] = t, count + 1
281 if hashes and peer is not None:
282 peer_heads.setdefault(hashes[0], set()).add(peer)
284 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286 def handle_get_shares(self, hashes, parents, stops, peer):
287 parents = min(parents, 1000//len(hashes))
290 for share_hash in hashes:
291 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
292 if share.hash in stops:
295 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
296 peer.sendShares(shares)
298 @deferral.retry('Error submitting block: (will retry)', 10, 10)
299 @defer.inlineCallbacks
300 def submit_block(block, ignore_failure):
301 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
302 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
303 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
304 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
306 @tracker.verified.added.watch
308 if share.pow_hash <= share.header['bits'].target:
309 submit_block(share.as_block(tracker), ignore_failure=True)
311 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
313 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
315 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
317 @defer.inlineCallbacks
320 ip, port = x.split(':')
321 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
323 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
326 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
328 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
330 print >>sys.stderr, "error reading addrs"
331 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
334 if addr not in addrs:
335 addrs[addr] = (0, time.time(), time.time())
339 connect_addrs = set()
340 for addr_df in map(parse, args.p2pool_nodes):
342 connect_addrs.add((yield addr_df))
347 best_share_hash_func=lambda: current_work.value['best_share_hash'],
348 port=args.p2pool_port,
351 connect_addrs=connect_addrs,
352 max_incoming_conns=args.p2pool_conns,
356 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
358 # send share when the chain changes to their chain
359 def work_changed(new_work):
360 #print 'Work changed:', new_work
362 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
363 if share.hash in shared_share_hashes:
365 shared_share_hashes.add(share.hash)
368 for peer in p2p_node.peers.itervalues():
369 peer.sendShares([share for share in shares if share.peer is not peer])
371 current_work.changed.watch(work_changed)
374 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
376 if share.hash in tracker.verified.shares:
377 ss.add_verified_hash(share.hash)
378 task.LoopingCall(save_shares).start(60)
384 @defer.inlineCallbacks
388 is_lan, lan_ip = yield ipdiscover.get_local_ip()
390 pm = yield portmapper.get_port_mapper()
391 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
392 except defer.TimeoutError:
396 log.err(None, 'UPnP error:')
397 yield deferral.sleep(random.expovariate(1/120))
400 # start listening for workers with a JSON-RPC server
402 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
404 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
405 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
406 vip_pass = f.read().strip('\r\n')
408 vip_pass = '%016x' % (random.randrange(2**64),)
409 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
411 print ' Worker password:', vip_pass, '(only required for generating graphs)'
415 removed_unstales_var = variable.Variable((0, 0, 0))
416 removed_doa_unstales_var = variable.Variable(0)
417 @tracker.verified.removed.watch
419 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
420 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
421 removed_unstales_var.set((
422 removed_unstales_var.value[0] + 1,
423 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
424 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
426 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427 removed_doa_unstales.set(removed_doa_unstales.value + 1)
429 def get_stale_counts():
430 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
431 my_shares = len(my_share_hashes)
432 my_doa_shares = len(my_doa_share_hashes)
433 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
434 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
435 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
436 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
437 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
439 my_shares_not_in_chain = my_shares - my_shares_in_chain
440 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
442 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
445 pseudoshare_received = variable.Event()
446 local_rate_monitor = math.RateMonitor(10*60)
448 class WorkerBridge(worker_interface.WorkerBridge):
450 worker_interface.WorkerBridge.__init__(self)
451 self.new_work_event = current_work.changed
452 self.recent_shares_ts_work = []
454 def preprocess_request(self, request):
455 user = request.getUser() if request.getUser() is not None else ''
457 desired_pseudoshare_target = None
459 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
461 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
465 desired_share_target = 2**256 - 1
467 user, min_diff_str = user.rsplit('/', 1)
469 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
473 if random.uniform(0, 100) < args.worker_fee:
474 pubkey_hash = my_pubkey_hash
477 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
479 pubkey_hash = my_pubkey_hash
481 return pubkey_hash, desired_share_target, desired_pseudoshare_target
483 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
484 if len(p2p_node.peers) == 0 and net.PERSIST:
485 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
486 if current_work.value['best_share_hash'] is None and net.PERSIST:
487 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
488 if time.time() > current_work2.value['last_update'] + 60:
489 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
491 if current_work.value['mm_chains']:
492 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
493 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
494 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
495 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
499 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
504 share_info, generate_tx = p2pool_data.Share.generate_transaction(
507 previous_share_hash=current_work.value['best_share_hash'],
508 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
509 nonce=random.randrange(2**32),
510 pubkey_hash=pubkey_hash,
511 subsidy=current_work2.value['subsidy'],
512 donation=math.perfect_round(65535*args.donation_percentage/100),
513 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
514 253 if orphans > orphans_recorded_in_chain else
515 254 if doas > doas_recorded_in_chain else
517 )(*get_stale_counts()),
519 block_target=current_work.value['bits'].target,
520 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
521 desired_target=desired_share_target,
525 target = net.PARENT.SANE_MAX_TARGET
526 if desired_pseudoshare_target is None:
527 if len(self.recent_shares_ts_work) == 50:
528 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
529 target = min(target, 2**256//hash_rate)
531 target = min(target, desired_pseudoshare_target)
532 target = max(target, share_info['bits'].target)
533 for aux_work in current_work.value['mm_chains'].itervalues():
534 target = max(target, aux_work['target'])
536 transactions = [generate_tx] + list(current_work2.value['transactions'])
537 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
538 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
540 getwork_time = time.time()
541 merkle_link = current_work2.value['merkle_link']
543 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
544 bitcoin_data.target_to_difficulty(target),
545 bitcoin_data.target_to_difficulty(share_info['bits'].target),
546 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
547 len(current_work2.value['transactions']),
550 ba = bitcoin_getwork.BlockAttempt(
551 version=current_work.value['version'],
552 previous_block=current_work.value['previous_block'],
553 merkle_root=merkle_root,
554 timestamp=current_work2.value['time'],
555 bits=current_work.value['bits'],
559 received_header_hashes = set()
561 def got_response(header, request):
562 assert header['merkle_root'] == merkle_root
564 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
565 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
566 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
569 if pow_hash <= header['bits'].target or p2pool.DEBUG:
570 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
571 if pow_hash <= header['bits'].target:
573 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
575 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
577 log.err(None, 'Error while processing potential block:')
579 for aux_work, index, hashes in mm_later:
581 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
582 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
583 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
584 bitcoin_data.aux_pow_type.pack(dict(
587 block_hash=header_hash,
588 merkle_link=merkle_link,
590 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
591 parent_block_header=header,
596 if result != (pow_hash <= aux_work['target']):
597 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
599 print 'Merged block submittal result: %s' % (result,)
602 log.err(err, 'Error submitting merged block:')
604 log.err(None, 'Error while processing merged mining POW:')
606 if pow_hash <= share_info['bits'].target:
607 min_header = dict(header);del min_header['merkle_root']
608 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
609 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
611 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
613 p2pool_data.format_hash(share.hash),
614 p2pool_data.format_hash(share.previous_hash),
615 time.time() - getwork_time,
616 ' DEAD ON ARRIVAL' if not on_time else '',
618 my_share_hashes.add(share.hash)
620 my_doa_share_hashes.add(share.hash)
624 tracker.verified.add(share)
628 if pow_hash <= header['bits'].target or p2pool.DEBUG:
629 for peer in p2p_node.peers.itervalues():
630 peer.sendShares([share])
631 shared_share_hashes.add(share.hash)
633 log.err(None, 'Error forwarding block solution:')
635 if pow_hash > target:
636 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
637 print ' Hash: %56x' % (pow_hash,)
638 print ' Target: %56x' % (target,)
639 elif header_hash in received_header_hashes:
640 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
642 received_header_hashes.add(header_hash)
644 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
645 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
646 while len(self.recent_shares_ts_work) > 50:
647 self.recent_shares_ts_work.pop(0)
648 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
652 return ba, got_response
654 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
656 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
657 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
659 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
661 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
668 @defer.inlineCallbacks
671 flag = factory.new_block.get_deferred()
673 yield set_real_work1()
676 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
681 print 'Started successfully!'
685 if hasattr(signal, 'SIGALRM'):
686 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
687 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
689 signal.siginterrupt(signal.SIGALRM, False)
690 task.LoopingCall(signal.alarm, 30).start(1)
692 if args.irc_announce:
693 from twisted.words.protocols import irc
694 class IRCClient(irc.IRCClient):
695 nickname = 'p2pool%02i' % (random.randrange(100),)
696 channel = net.ANNOUNCE_CHANNEL
697 def lineReceived(self, line):
699 irc.IRCClient.lineReceived(self, line)
701 irc.IRCClient.signedOn(self)
702 self.factory.resetDelay()
703 self.join(self.channel)
704 @defer.inlineCallbacks
705 def new_share(share):
706 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
707 yield deferral.sleep(random.expovariate(1/60))
708 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
709 if message not in self.recent_messages:
710 self.say(self.channel, message)
711 self._remember_message(message)
712 self.watch_id = tracker.verified.added.watch(new_share)
713 self.recent_messages = []
714 def _remember_message(self, message):
715 self.recent_messages.append(message)
716 while len(self.recent_message) > 100:
717 self.recent_messages.pop(0)
718 def privmsg(self, user, channel, message):
719 if channel == self.channel:
720 self._remember_message(message)
721 def connectionLost(self, reason):
722 tracker.verified.added.unwatch(self.watch_id)
723 print 'IRC connection lost:', reason.getErrorMessage()
724 class IRCClientFactory(protocol.ReconnectingClientFactory):
726 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
728 @defer.inlineCallbacks
733 yield deferral.sleep(3)
735 if time.time() > current_work2.value['last_update'] + 60:
736 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
738 height = tracker.get_height(current_work.value['best_share_hash'])
739 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
741 len(tracker.verified.shares),
744 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
745 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
747 datums, dt = local_rate_monitor.get_datums_in_last()
748 my_att_s = sum(datum['work']/dt for datum in datums)
749 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
750 math.format(int(my_att_s)),
752 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
753 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
757 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
758 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
759 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
761 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
762 shares, stale_orphan_shares, stale_doa_shares,
763 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
764 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
765 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
767 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
768 math.format(int(real_att_s)),
770 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
773 if this_str != last_str or time.time() > last_time + 15:
776 last_time = time.time()
782 log.err(None, 'Fatal error:')
785 class FixedArgumentParser(argparse.ArgumentParser):
786 def _read_args_from_files(self, arg_strings):
787 # expand arguments referencing files
789 for arg_string in arg_strings:
791 # for regular arguments, just add them back into the list
792 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
793 new_arg_strings.append(arg_string)
795 # replace arguments referencing files with the file content
798 args_file = open(arg_string[1:])
801 for arg_line in args_file.read().splitlines():
802 for arg in self.convert_arg_line_to_args(arg_line):
803 arg_strings.append(arg)
804 arg_strings = self._read_args_from_files(arg_strings)
805 new_arg_strings.extend(arg_strings)
809 err = sys.exc_info()[1]
812 # return the modified argument list
813 return new_arg_strings
815 def convert_arg_line_to_args(self, arg_line):
816 return [arg for arg in arg_line.split() if arg.strip()]
819 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
821 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
822 parser.add_argument('--version', action='version', version=p2pool.__version__)
823 parser.add_argument('--net',
824 help='use specified network (default: bitcoin)',
825 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
826 parser.add_argument('--testnet',
827 help='''use the network's testnet''',
828 action='store_const', const=True, default=False, dest='testnet')
829 parser.add_argument('--debug',
830 help='enable debugging mode',
831 action='store_const', const=True, default=False, dest='debug')
832 parser.add_argument('-a', '--address',
833 help='generate payouts to this address (default: <address requested from bitcoind>)',
834 type=str, action='store', default=None, dest='address')
835 parser.add_argument('--datadir',
836 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
837 type=str, action='store', default=None, dest='datadir')
838 parser.add_argument('--logfile',
839 help='''log to this file (default: data/<NET>/log)''',
840 type=str, action='store', default=None, dest='logfile')
841 parser.add_argument('--merged',
842 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
843 type=str, action='append', default=[], dest='merged_urls')
844 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
845 help='donate this percentage of work to author of p2pool (default: 0.5)',
846 type=float, action='store', default=0.5, dest='donation_percentage')
847 parser.add_argument('--irc-announce',
848 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
849 action='store_true', default=False, dest='irc_announce')
851 p2pool_group = parser.add_argument_group('p2pool interface')
852 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
853 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
854 type=int, action='store', default=None, dest='p2pool_port')
855 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
856 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
857 type=str, action='append', default=[], dest='p2pool_nodes')
858 parser.add_argument('--disable-upnp',
859 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
860 action='store_false', default=True, dest='upnp')
861 p2pool_group.add_argument('--max-conns', metavar='CONNS',
862 help='maximum incoming connections (default: 40)',
863 type=int, action='store', default=40, dest='p2pool_conns')
865 worker_group = parser.add_argument_group('worker interface')
866 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
867 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
868 type=str, action='store', default=None, dest='worker_endpoint')
869 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
870 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
871 type=float, action='store', default=0, dest='worker_fee')
873 bitcoind_group = parser.add_argument_group('bitcoind interface')
874 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
875 help='connect to this address (default: 127.0.0.1)',
876 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
877 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
878 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
879 type=int, action='store', default=None, dest='bitcoind_rpc_port')
880 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
881 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
882 type=int, action='store', default=None, dest='bitcoind_p2p_port')
884 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
885 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
886 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
888 args = parser.parse_args()
893 net_name = args.net_name + ('_testnet' if args.testnet else '')
894 net = networks.nets[net_name]
896 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
897 if not os.path.exists(datadir_path):
898 os.makedirs(datadir_path)
900 if len(args.bitcoind_rpc_userpass) > 2:
901 parser.error('a maximum of two arguments are allowed')
902 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
904 if args.bitcoind_rpc_password is None:
905 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
906 parser.error('This network has no configuration file function. Manually enter your RPC password.')
907 conf_path = net.PARENT.CONF_FILE_FUNC()
908 if not os.path.exists(conf_path):
909 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
910 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
913 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
914 with open(conf_path, 'rb') as f:
915 cp = ConfigParser.RawConfigParser()
916 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
917 for conf_name, var_name, var_type in [
918 ('rpcuser', 'bitcoind_rpc_username', str),
919 ('rpcpassword', 'bitcoind_rpc_password', str),
920 ('rpcport', 'bitcoind_rpc_port', int),
921 ('port', 'bitcoind_p2p_port', int),
923 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
924 setattr(args, var_name, var_type(cp.get('x', conf_name)))
925 if args.bitcoind_rpc_password is None:
926 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
928 if args.bitcoind_rpc_username is None:
929 args.bitcoind_rpc_username = ''
931 if args.bitcoind_rpc_port is None:
932 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
934 if args.bitcoind_p2p_port is None:
935 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
937 if args.p2pool_port is None:
938 args.p2pool_port = net.P2P_PORT
940 if args.worker_endpoint is None:
941 worker_endpoint = '', net.WORKER_PORT
942 elif ':' not in args.worker_endpoint:
943 worker_endpoint = '', int(args.worker_endpoint)
945 addr, port = args.worker_endpoint.rsplit(':', 1)
946 worker_endpoint = addr, int(port)
948 if args.address is not None:
950 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
952 parser.error('error parsing address: ' + repr(e))
954 args.pubkey_hash = None
956 def separate_url(url):
957 s = urlparse.urlsplit(url)
958 if '@' not in s.netloc:
959 parser.error('merged url netloc must contain an "@"')
960 userpass, new_netloc = s.netloc.rsplit('@', 1)
961 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
962 merged_urls = map(separate_url, args.merged_urls)
964 if args.logfile is None:
965 args.logfile = os.path.join(datadir_path, 'log')
967 logfile = logging.LogFile(args.logfile)
968 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
969 sys.stdout = logging.AbortPipe(pipe)
970 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
971 if hasattr(signal, "SIGUSR1"):
972 def sigusr1(signum, frame):
973 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
975 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
976 signal.signal(signal.SIGUSR1, sigusr1)
977 task.LoopingCall(logfile.reopen).start(5)
979 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)