1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58 @deferral.retry('Error while checking Bitcoin connection:', 1)
59 @defer.inlineCallbacks
61 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 raise deferral.RetrySilentlyException()
64 v = (yield bitcoind.rpc_getinfo())['version']
65 temp_work = yield getwork(bitcoind)
66 major, minor, patch = v//10000, v//100%100, v%100
67 if not (major >= 7 or (major == 6 and patch >= 3) or (major == 5 and minor >= 4) or '/P2SH/' in temp_work['coinbaseflags']):
68 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
69 raise deferral.RetrySilentlyException()
70 defer.returnValue(temp_work)
71 temp_work = yield check()
73 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
76 # connect to bitcoind over bitcoin-p2p
77 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
78 factory = bitcoin_p2p.ClientFactory(net.PARENT)
79 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
80 yield factory.getProtocol() # waits until handshake is successful
84 print 'Determining payout address...'
85 if args.pubkey_hash is None:
86 address_path = os.path.join(datadir_path, 'cached_payout_address')
88 if os.path.exists(address_path):
89 with open(address_path, 'rb') as f:
90 address = f.read().strip('\r\n')
91 print ' Loaded cached address: %s...' % (address,)
95 if address is not None:
96 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
97 if not res['isvalid'] or not res['ismine']:
98 print ' Cached address is either invalid or not controlled by local bitcoind!'
102 print ' Getting payout address from bitcoind...'
103 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
105 with open(address_path, 'wb') as f:
108 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
110 my_pubkey_hash = args.pubkey_hash
111 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
114 my_share_hashes = set()
115 my_doa_share_hashes = set()
117 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
118 shared_share_hashes = set()
119 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
120 known_verified = set()
122 print "Loading shares..."
123 for i, (mode, contents) in enumerate(ss.get_shares()):
125 if contents.hash in tracker.shares:
127 shared_share_hashes.add(contents.hash)
128 contents.time_seen = 0
129 tracker.add(contents)
130 if len(tracker.shares) % 1000 == 0 and tracker.shares:
131 print " %i" % (len(tracker.shares),)
132 elif mode == 'verified_hash':
133 known_verified.add(contents)
135 raise AssertionError()
136 print " ...inserting %i verified shares..." % (len(known_verified),)
137 for h in known_verified:
138 if h not in tracker.shares:
139 ss.forget_verified_share(h)
141 tracker.verified.add(tracker.shares[h])
142 print " ...done loading %i shares!" % (len(tracker.shares),)
144 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
145 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
146 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
148 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
150 pre_current_work = variable.Variable(None)
151 pre_merged_work = variable.Variable({})
152 # information affecting work that should trigger a long-polling update
153 current_work = variable.Variable(None)
154 # information affecting work that should not trigger a long-polling update
155 current_work2 = variable.Variable(None)
157 requested = expiring_dict.ExpiringDict(300)
159 print 'Initializing work...'
160 @defer.inlineCallbacks
161 def set_real_work1():
162 work = yield getwork(bitcoind)
163 current_work2.set(dict(
165 transactions=work['transactions'],
166 merkle_branch=work['merkle_branch'],
167 subsidy=work['subsidy'],
168 clock_offset=time.time() - work['time'],
169 last_update=time.time(),
170 )) # second set first because everything hooks on the first
171 pre_current_work.set(dict(
172 version=work['version'],
173 previous_block=work['previous_block_hash'],
175 coinbaseflags=work['coinbaseflags'],
177 yield set_real_work1()
179 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
181 def set_real_work2():
182 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184 t = dict(pre_current_work.value)
185 t['best_share_hash'] = best
186 t['mm_chains'] = pre_merged_work.value
190 for peer2, share_hash in desired:
191 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193 last_request_time, count = requested.get(share_hash, (None, 0))
194 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196 potential_peers = set()
197 for head in tracker.tails[share_hash]:
198 potential_peers.update(peer_heads.get(head, set()))
199 potential_peers = [peer for peer in potential_peers if peer.connected2]
200 if count == 0 and peer2 is not None and peer2.connected2:
203 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
211 stops=list(set(tracker.heads) | set(
212 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215 requested[share_hash] = t, count + 1
216 pre_current_work.changed.watch(lambda _: set_real_work2())
217 pre_merged_work.changed.watch(lambda _: set_real_work2())
223 @defer.inlineCallbacks
224 def set_merged_work(merged_url, merged_userpass):
225 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
227 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229 hash=int(auxblock['hash'], 16),
230 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231 merged_proxy=merged_proxy,
233 yield deferral.sleep(1)
234 for merged_url, merged_userpass in merged_urls:
235 set_merged_work(merged_url, merged_userpass)
237 @pre_merged_work.changed.watch
238 def _(new_merged_work):
239 print 'Got new merged mining work!'
241 # setup p2p logic and join p2pool network
243 class Node(p2p.Node):
244 def handle_shares(self, shares, peer):
246 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250 if share.hash in tracker.shares:
251 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260 if shares and peer is not None:
261 peer_heads.setdefault(shares[0].hash, set()).add(peer)
267 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269 def handle_share_hashes(self, hashes, peer):
272 for share_hash in hashes:
273 if share_hash in tracker.shares:
275 last_request_time, count = requested.get(share_hash, (None, 0))
276 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279 get_hashes.append(share_hash)
280 requested[share_hash] = t, count + 1
282 if hashes and peer is not None:
283 peer_heads.setdefault(hashes[0], set()).add(peer)
285 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287 def handle_get_shares(self, hashes, parents, stops, peer):
288 parents = min(parents, 1000//len(hashes))
291 for share_hash in hashes:
292 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293 if share.hash in stops:
296 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 peer.sendShares(shares)
299 @deferral.retry('Error submitting block: (will retry)', 10, 10)
300 @defer.inlineCallbacks
301 def submit_block(block, ignore_failure):
302 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
303 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
304 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
305 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
307 @tracker.verified.added.watch
309 if share.pow_hash <= share.header['bits'].target:
310 submit_block(share.as_block(tracker), ignore_failure=True)
312 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
314 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
316 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
318 @defer.inlineCallbacks
321 ip, port = x.split(':')
322 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
324 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
327 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
329 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
331 print >>sys.stderr, "error reading addrs"
332 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
335 if addr not in addrs:
336 addrs[addr] = (0, time.time(), time.time())
340 connect_addrs = set()
341 for addr_df in map(parse, args.p2pool_nodes):
343 connect_addrs.add((yield addr_df))
348 best_share_hash_func=lambda: current_work.value['best_share_hash'],
349 port=args.p2pool_port,
352 connect_addrs=connect_addrs,
353 max_incoming_conns=args.p2pool_conns,
357 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
359 # send share when the chain changes to their chain
360 def work_changed(new_work):
361 #print 'Work changed:', new_work
363 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
364 if share.hash in shared_share_hashes:
366 shared_share_hashes.add(share.hash)
369 for peer in p2p_node.peers.itervalues():
370 peer.sendShares([share for share in shares if share.peer is not peer])
372 current_work.changed.watch(work_changed)
375 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
377 if share.hash in tracker.verified.shares:
378 ss.add_verified_hash(share.hash)
379 task.LoopingCall(save_shares).start(60)
385 @defer.inlineCallbacks
389 is_lan, lan_ip = yield ipdiscover.get_local_ip()
391 pm = yield portmapper.get_port_mapper()
392 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
393 except defer.TimeoutError:
397 log.err(None, 'UPnP error:')
398 yield deferral.sleep(random.expovariate(1/120))
401 # start listening for workers with a JSON-RPC server
403 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
405 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
406 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
407 vip_pass = f.read().strip('\r\n')
409 vip_pass = '%016x' % (random.randrange(2**64),)
410 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
412 print ' Worker password:', vip_pass, '(only required for generating graphs)'
416 removed_unstales_var = variable.Variable((0, 0, 0))
417 removed_doa_unstales_var = variable.Variable(0)
418 @tracker.verified.removed.watch
420 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
421 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
422 removed_unstales_var.set((
423 removed_unstales_var.value[0] + 1,
424 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
425 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
427 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
428 removed_doa_unstales.set(removed_doa_unstales.value + 1)
430 def get_stale_counts():
431 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
432 my_shares = len(my_share_hashes)
433 my_doa_shares = len(my_doa_share_hashes)
434 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
435 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
436 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
437 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
438 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
440 my_shares_not_in_chain = my_shares - my_shares_in_chain
441 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
443 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
446 pseudoshare_received = variable.Event()
447 local_rate_monitor = math.RateMonitor(10*60)
449 class WorkerBridge(worker_interface.WorkerBridge):
451 worker_interface.WorkerBridge.__init__(self)
452 self.new_work_event = current_work.changed
453 self.recent_shares_ts_work = []
455 def preprocess_request(self, request):
456 user = request.getUser() if request.getUser() is not None else ''
458 desired_pseudoshare_target = None
460 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
462 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
466 desired_share_target = 2**256 - 1
468 user, min_diff_str = user.rsplit('/', 1)
470 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
474 if random.uniform(0, 100) < args.worker_fee:
475 pubkey_hash = my_pubkey_hash
478 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
480 pubkey_hash = my_pubkey_hash
482 return pubkey_hash, desired_share_target, desired_pseudoshare_target
484 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
485 if len(p2p_node.peers) == 0 and net.PERSIST:
486 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
487 if current_work.value['best_share_hash'] is None and net.PERSIST:
488 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
489 if time.time() > current_work2.value['last_update'] + 60:
490 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
492 if current_work.value['mm_chains']:
493 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
494 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
495 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
496 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
500 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
505 share_info, generate_tx = p2pool_data.Share.generate_transaction(
508 previous_share_hash=current_work.value['best_share_hash'],
509 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
510 nonce=random.randrange(2**32),
511 pubkey_hash=pubkey_hash,
512 subsidy=current_work2.value['subsidy'],
513 donation=math.perfect_round(65535*args.donation_percentage/100),
514 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
515 253 if orphans > orphans_recorded_in_chain else
516 254 if doas > doas_recorded_in_chain else
518 )(*get_stale_counts()),
520 block_target=current_work.value['bits'].target,
521 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
522 desired_target=desired_share_target,
526 target = net.PARENT.SANE_MAX_TARGET
527 if desired_pseudoshare_target is None:
528 if len(self.recent_shares_ts_work) == 50:
529 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
530 target = min(target, 2**256//hash_rate)
532 target = min(target, desired_pseudoshare_target)
533 target = max(target, share_info['bits'].target)
534 for aux_work in current_work.value['mm_chains'].itervalues():
535 target = max(target, aux_work['target'])
537 transactions = [generate_tx] + list(current_work2.value['transactions'])
538 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
539 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
541 getwork_time = time.time()
542 merkle_branch = current_work2.value['merkle_branch']
544 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
545 bitcoin_data.target_to_difficulty(target),
546 bitcoin_data.target_to_difficulty(share_info['bits'].target),
547 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
548 len(current_work2.value['transactions']),
551 ba = bitcoin_getwork.BlockAttempt(
552 version=current_work.value['version'],
553 previous_block=current_work.value['previous_block'],
554 merkle_root=merkle_root,
555 timestamp=current_work2.value['time'],
556 bits=current_work.value['bits'],
560 received_header_hashes = set()
562 def got_response(header, request):
563 assert header['merkle_root'] == merkle_root
565 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
566 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
567 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
570 if pow_hash <= header['bits'].target or p2pool.DEBUG:
571 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
572 if pow_hash <= header['bits'].target:
574 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
576 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
578 log.err(None, 'Error while processing potential block:')
580 for aux_work, index, hashes in mm_later:
582 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
583 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
584 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
585 bitcoin_data.aux_pow_type.pack(dict(
588 block_hash=header_hash,
589 merkle_branch=merkle_branch,
592 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
594 parent_block_header=header,
599 if result != (pow_hash <= aux_work['target']):
600 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
602 print 'Merged block submittal result: %s' % (result,)
605 log.err(err, 'Error submitting merged block:')
607 log.err(None, 'Error while processing merged mining POW:')
609 if pow_hash <= share_info['bits'].target:
610 min_header = dict(header);del min_header['merkle_root']
611 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
612 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
614 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
616 p2pool_data.format_hash(share.hash),
617 p2pool_data.format_hash(share.previous_hash),
618 time.time() - getwork_time,
619 ' DEAD ON ARRIVAL' if not on_time else '',
621 my_share_hashes.add(share.hash)
623 my_doa_share_hashes.add(share.hash)
627 tracker.verified.add(share)
631 if pow_hash <= header['bits'].target or p2pool.DEBUG:
632 for peer in p2p_node.peers.itervalues():
633 peer.sendShares([share])
634 shared_share_hashes.add(share.hash)
636 log.err(None, 'Error forwarding block solution:')
638 if pow_hash > target:
639 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
640 print ' Hash: %56x' % (pow_hash,)
641 print ' Target: %56x' % (target,)
642 elif header_hash in received_header_hashes:
643 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
645 received_header_hashes.add(header_hash)
647 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
648 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
649 while len(self.recent_shares_ts_work) > 50:
650 self.recent_shares_ts_work.pop(0)
651 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
655 return ba, got_response
657 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
659 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
660 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
662 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
664 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
671 @defer.inlineCallbacks
674 flag = factory.new_block.get_deferred()
676 yield set_real_work1()
679 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
684 print 'Started successfully!'
688 if hasattr(signal, 'SIGALRM'):
689 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
690 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
692 signal.siginterrupt(signal.SIGALRM, False)
693 task.LoopingCall(signal.alarm, 30).start(1)
695 if args.irc_announce:
696 from twisted.words.protocols import irc
697 class IRCClient(irc.IRCClient):
698 nickname = 'p2pool%02i' % (random.randrange(100),)
699 channel = net.ANNOUNCE_CHANNEL
700 def lineReceived(self, line):
702 irc.IRCClient.lineReceived(self, line)
704 irc.IRCClient.signedOn(self)
705 self.factory.resetDelay()
706 self.join(self.channel)
707 @defer.inlineCallbacks
708 def new_share(share):
709 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
710 yield deferral.sleep(random.expovariate(1/60))
711 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
712 if message not in self.recent_messages:
713 self.say(self.channel, message)
714 self._remember_message(message)
715 self.watch_id = tracker.verified.added.watch(new_share)
716 self.recent_messages = []
717 def _remember_message(self, message):
718 self.recent_messages.append(message)
719 while len(self.recent_message) > 100:
720 self.recent_messages.pop(0)
721 def privmsg(self, user, channel, message):
722 if channel == self.channel:
723 self._remember_message(message)
724 def connectionLost(self, reason):
725 tracker.verified.added.unwatch(self.watch_id)
726 print 'IRC connection lost:', reason.getErrorMessage()
727 class IRCClientFactory(protocol.ReconnectingClientFactory):
729 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
731 @defer.inlineCallbacks
736 yield deferral.sleep(3)
738 if time.time() > current_work2.value['last_update'] + 60:
739 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
741 height = tracker.get_height(current_work.value['best_share_hash'])
742 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
744 len(tracker.verified.shares),
747 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
748 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
750 datums, dt = local_rate_monitor.get_datums_in_last()
751 my_att_s = sum(datum['work']/dt for datum in datums)
752 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
753 math.format(int(my_att_s)),
755 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
756 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
760 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
761 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
762 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
764 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
765 shares, stale_orphan_shares, stale_doa_shares,
766 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
767 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
768 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
770 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
771 math.format(int(real_att_s)),
773 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
776 if this_str != last_str or time.time() > last_time + 15:
779 last_time = time.time()
785 log.err(None, 'Fatal error:')
788 class FixedArgumentParser(argparse.ArgumentParser):
789 def _read_args_from_files(self, arg_strings):
790 # expand arguments referencing files
792 for arg_string in arg_strings:
794 # for regular arguments, just add them back into the list
795 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
796 new_arg_strings.append(arg_string)
798 # replace arguments referencing files with the file content
801 args_file = open(arg_string[1:])
804 for arg_line in args_file.read().splitlines():
805 for arg in self.convert_arg_line_to_args(arg_line):
806 arg_strings.append(arg)
807 arg_strings = self._read_args_from_files(arg_strings)
808 new_arg_strings.extend(arg_strings)
812 err = sys.exc_info()[1]
815 # return the modified argument list
816 return new_arg_strings
818 def convert_arg_line_to_args(self, arg_line):
819 return [arg for arg in arg_line.split() if arg.strip()]
822 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
824 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
825 parser.add_argument('--version', action='version', version=p2pool.__version__)
826 parser.add_argument('--net',
827 help='use specified network (default: bitcoin)',
828 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
829 parser.add_argument('--testnet',
830 help='''use the network's testnet''',
831 action='store_const', const=True, default=False, dest='testnet')
832 parser.add_argument('--debug',
833 help='enable debugging mode',
834 action='store_const', const=True, default=False, dest='debug')
835 parser.add_argument('-a', '--address',
836 help='generate payouts to this address (default: <address requested from bitcoind>)',
837 type=str, action='store', default=None, dest='address')
838 parser.add_argument('--datadir',
839 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
840 type=str, action='store', default=None, dest='datadir')
841 parser.add_argument('--logfile',
842 help='''log to this file (default: data/<NET>/log)''',
843 type=str, action='store', default=None, dest='logfile')
844 parser.add_argument('--merged',
845 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
846 type=str, action='append', default=[], dest='merged_urls')
847 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
848 help='donate this percentage of work to author of p2pool (default: 0.5)',
849 type=float, action='store', default=0.5, dest='donation_percentage')
850 parser.add_argument('--irc-announce',
851 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
852 action='store_true', default=False, dest='irc_announce')
854 p2pool_group = parser.add_argument_group('p2pool interface')
855 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
856 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
857 type=int, action='store', default=None, dest='p2pool_port')
858 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
859 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
860 type=str, action='append', default=[], dest='p2pool_nodes')
861 parser.add_argument('--disable-upnp',
862 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
863 action='store_false', default=True, dest='upnp')
864 p2pool_group.add_argument('--max-conns', metavar='CONNS',
865 help='maximum incoming connections (default: 40)',
866 type=int, action='store', default=40, dest='p2pool_conns')
868 worker_group = parser.add_argument_group('worker interface')
869 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
870 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
871 type=str, action='store', default=None, dest='worker_endpoint')
872 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
873 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
874 type=float, action='store', default=0, dest='worker_fee')
876 bitcoind_group = parser.add_argument_group('bitcoind interface')
877 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
878 help='connect to this address (default: 127.0.0.1)',
879 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
880 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
881 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
882 type=int, action='store', default=None, dest='bitcoind_rpc_port')
883 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
884 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
885 type=int, action='store', default=None, dest='bitcoind_p2p_port')
887 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
888 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
889 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
891 args = parser.parse_args()
896 net_name = args.net_name + ('_testnet' if args.testnet else '')
897 net = networks.nets[net_name]
899 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
900 if not os.path.exists(datadir_path):
901 os.makedirs(datadir_path)
903 if len(args.bitcoind_rpc_userpass) > 2:
904 parser.error('a maximum of two arguments are allowed')
905 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
907 if args.bitcoind_rpc_password is None:
908 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
909 parser.error('This network has no configuration file function. Manually enter your RPC password.')
910 conf_path = net.PARENT.CONF_FILE_FUNC()
911 if not os.path.exists(conf_path):
912 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
913 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
916 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
917 with open(conf_path, 'rb') as f:
918 cp = ConfigParser.RawConfigParser()
919 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
920 for conf_name, var_name, var_type in [
921 ('rpcuser', 'bitcoind_rpc_username', str),
922 ('rpcpassword', 'bitcoind_rpc_password', str),
923 ('rpcport', 'bitcoind_rpc_port', int),
924 ('port', 'bitcoind_p2p_port', int),
926 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
927 setattr(args, var_name, var_type(cp.get('x', conf_name)))
928 if args.bitcoind_rpc_password is None:
929 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
931 if args.bitcoind_rpc_username is None:
932 args.bitcoind_rpc_username = ''
934 if args.bitcoind_rpc_port is None:
935 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
937 if args.bitcoind_p2p_port is None:
938 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
940 if args.p2pool_port is None:
941 args.p2pool_port = net.P2P_PORT
943 if args.worker_endpoint is None:
944 worker_endpoint = '', net.WORKER_PORT
945 elif ':' not in args.worker_endpoint:
946 worker_endpoint = '', int(args.worker_endpoint)
948 addr, port = args.worker_endpoint.rsplit(':', 1)
949 worker_endpoint = addr, int(port)
951 if args.address is not None:
953 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
955 parser.error('error parsing address: ' + repr(e))
957 args.pubkey_hash = None
959 def separate_url(url):
960 s = urlparse.urlsplit(url)
961 if '@' not in s.netloc:
962 parser.error('merged url netloc must contain an "@"')
963 userpass, new_netloc = s.netloc.rsplit('@', 1)
964 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
965 merged_urls = map(separate_url, args.merged_urls)
967 if args.logfile is None:
968 args.logfile = os.path.join(datadir_path, 'log')
970 logfile = logging.LogFile(args.logfile)
971 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
972 sys.stdout = logging.AbortPipe(pipe)
973 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
974 if hasattr(signal, "SIGUSR1"):
975 def sigusr1(signum, frame):
976 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
978 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
979 signal.signal(signal.SIGUSR1, sigusr1)
980 task.LoopingCall(logfile.reopen).start(5)
982 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)