1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 @deferral.DeferredCacher
172 @defer.inlineCallbacks
173 def height_cacher(block_hash):
174 x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
175 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
176 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
177 def get_height_rel_highest(block_hash):
178 this_height = height_cacher.call_now(block_hash, 0)
179 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
180 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
181 return this_height - best_height_cached.value
183 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
185 def set_real_work2():
186 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188 t = dict(pre_current_work.value)
189 t['best_share_hash'] = best
190 t['mm_chains'] = pre_merged_work.value
194 for peer2, share_hash in desired:
195 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197 last_request_time, count = requested.get(share_hash, (None, 0))
198 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200 potential_peers = set()
201 for head in tracker.tails[share_hash]:
202 potential_peers.update(peer_heads.get(head, set()))
203 potential_peers = [peer for peer in potential_peers if peer.connected2]
204 if count == 0 and peer2 is not None and peer2.connected2:
207 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
211 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
215 stops=list(set(tracker.heads) | set(
216 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
219 requested[share_hash] = t, count + 1
220 pre_current_work.changed.watch(lambda _: set_real_work2())
221 pre_merged_work.changed.watch(lambda _: set_real_work2())
227 @defer.inlineCallbacks
228 def set_merged_work(merged_url, merged_userpass):
229 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
231 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233 hash=int(auxblock['hash'], 16),
234 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235 merged_proxy=merged_proxy,
237 yield deferral.sleep(1)
238 for merged_url, merged_userpass in merged_urls:
239 set_merged_work(merged_url, merged_userpass)
241 @pre_merged_work.changed.watch
242 def _(new_merged_work):
243 print 'Got new merged mining work!'
245 # setup p2p logic and join p2pool network
247 class Node(p2p.Node):
248 def handle_shares(self, shares, peer):
250 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
254 if share.hash in tracker.shares:
255 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
264 if shares and peer is not None:
265 peer_heads.setdefault(shares[0].hash, set()).add(peer)
271 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273 def handle_share_hashes(self, hashes, peer):
276 for share_hash in hashes:
277 if share_hash in tracker.shares:
279 last_request_time, count = requested.get(share_hash, (None, 0))
280 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283 get_hashes.append(share_hash)
284 requested[share_hash] = t, count + 1
286 if hashes and peer is not None:
287 peer_heads.setdefault(hashes[0], set()).add(peer)
289 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291 def handle_get_shares(self, hashes, parents, stops, peer):
292 parents = min(parents, 1000//len(hashes))
295 for share_hash in hashes:
296 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297 if share.hash in stops:
300 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
301 peer.sendShares(shares)
303 @deferral.retry('Error submitting block: (will retry)', 10, 10)
304 @defer.inlineCallbacks
305 def submit_block(block, ignore_failure):
306 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
307 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
311 @tracker.verified.added.watch
313 if share.pow_hash <= share.header['bits'].target:
314 submit_block(share.as_block(tracker), ignore_failure=True)
316 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
318 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
320 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
322 @defer.inlineCallbacks
325 ip, port = x.split(':')
326 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
328 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
331 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
333 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
335 print >>sys.stderr, "error reading addrs"
336 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
339 if addr not in addrs:
340 addrs[addr] = (0, time.time(), time.time())
344 connect_addrs = set()
345 for addr_df in map(parse, args.p2pool_nodes):
347 connect_addrs.add((yield addr_df))
352 best_share_hash_func=lambda: current_work.value['best_share_hash'],
353 port=args.p2pool_port,
356 connect_addrs=connect_addrs,
360 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
362 # send share when the chain changes to their chain
363 def work_changed(new_work):
364 #print 'Work changed:', new_work
366 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
367 if share.hash in shared_share_hashes:
369 shared_share_hashes.add(share.hash)
372 for peer in p2p_node.peers.itervalues():
373 peer.sendShares([share for share in shares if share.peer is not peer])
375 current_work.changed.watch(work_changed)
378 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
380 if share.hash in tracker.verified.shares:
381 ss.add_verified_hash(share.hash)
382 task.LoopingCall(save_shares).start(60)
388 @defer.inlineCallbacks
392 is_lan, lan_ip = yield ipdiscover.get_local_ip()
394 pm = yield portmapper.get_port_mapper()
395 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
396 except defer.TimeoutError:
400 log.err(None, 'UPnP error:')
401 yield deferral.sleep(random.expovariate(1/120))
404 # start listening for workers with a JSON-RPC server
406 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
408 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
409 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
410 vip_pass = f.read().strip('\r\n')
412 vip_pass = '%016x' % (random.randrange(2**64),)
413 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
415 print ' Worker password:', vip_pass, '(only required for generating graphs)'
419 removed_unstales_var = variable.Variable((0, 0, 0))
420 removed_doa_unstales_var = variable.Variable(0)
421 @tracker.verified.removed.watch
423 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
424 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
425 removed_unstales_var.set((
426 removed_unstales_var.value[0] + 1,
427 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
428 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
430 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
431 removed_doa_unstales.set(removed_doa_unstales.value + 1)
433 def get_stale_counts():
434 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
435 my_shares = len(my_share_hashes)
436 my_doa_shares = len(my_doa_share_hashes)
437 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
438 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
439 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
440 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
441 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
443 my_shares_not_in_chain = my_shares - my_shares_in_chain
444 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
446 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
449 pseudoshare_received = variable.Event()
450 local_rate_monitor = math.RateMonitor(10*60)
452 class WorkerBridge(worker_interface.WorkerBridge):
454 worker_interface.WorkerBridge.__init__(self)
455 self.new_work_event = current_work.changed
456 self.recent_shares_ts_work = []
458 def preprocess_request(self, request):
459 user = request.getUser() if request.getUser() is not None else ''
461 desired_pseudoshare_target = None
463 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
465 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
469 desired_share_target = 2**256 - 1
471 user, min_diff_str = user.rsplit('/', 1)
473 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
477 if random.uniform(0, 100) < args.worker_fee:
478 pubkey_hash = my_pubkey_hash
481 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
483 pubkey_hash = my_pubkey_hash
485 return pubkey_hash, desired_share_target, desired_pseudoshare_target
487 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
488 if len(p2p_node.peers) == 0 and net.PERSIST:
489 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
490 if current_work.value['best_share_hash'] is None and net.PERSIST:
491 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
492 if time.time() > current_work2.value['last_update'] + 60:
493 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
495 if current_work.value['mm_chains']:
496 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
497 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
498 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
499 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
503 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
508 share_info, generate_tx = p2pool_data.generate_transaction(
511 previous_share_hash=current_work.value['best_share_hash'],
512 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
513 nonce=random.randrange(2**32),
514 pubkey_hash=pubkey_hash,
515 subsidy=current_work2.value['subsidy'],
516 donation=math.perfect_round(65535*args.donation_percentage/100),
517 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
518 253 if orphans > orphans_recorded_in_chain else
519 254 if doas > doas_recorded_in_chain else
521 )(*get_stale_counts()),
523 block_target=current_work.value['bits'].target,
524 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
525 desired_target=desired_share_target,
529 target = net.PARENT.SANE_MAX_TARGET
530 if desired_pseudoshare_target is None:
531 if len(self.recent_shares_ts_work) == 50:
532 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
533 target = min(target, 2**256//hash_rate)
535 target = min(target, desired_pseudoshare_target)
536 target = max(target, share_info['bits'].target)
537 for aux_work in current_work.value['mm_chains'].itervalues():
538 target = max(target, aux_work['target'])
540 transactions = [generate_tx] + list(current_work2.value['transactions'])
541 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
544 getwork_time = time.time()
545 merkle_branch = current_work2.value['merkle_branch']
547 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548 bitcoin_data.target_to_difficulty(target),
549 bitcoin_data.target_to_difficulty(share_info['bits'].target),
550 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551 len(current_work2.value['transactions']),
554 ba = bitcoin_getwork.BlockAttempt(
555 version=current_work.value['version'],
556 previous_block=current_work.value['previous_block'],
557 merkle_root=merkle_root,
558 timestamp=current_work2.value['time'],
559 bits=current_work.value['bits'],
563 received_header_hashes = set()
565 def got_response(header, request):
566 assert header['merkle_root'] == merkle_root
568 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
573 if pow_hash <= header['bits'].target or p2pool.DEBUG:
574 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
575 if pow_hash <= header['bits'].target:
577 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
579 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
581 log.err(None, 'Error while processing potential block:')
583 for aux_work, index, hashes in mm_later:
585 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
586 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
587 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
588 bitcoin_data.aux_pow_type.pack(dict(
591 block_hash=header_hash,
592 merkle_branch=merkle_branch,
595 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
597 parent_block_header=header,
602 if result != (pow_hash <= aux_work['target']):
603 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
605 print 'Merged block submittal result: %s' % (result,)
608 log.err(err, 'Error submitting merged block:')
610 log.err(None, 'Error while processing merged mining POW:')
612 if pow_hash <= share_info['bits'].target:
613 min_header = dict(header);del min_header['merkle_root']
614 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
615 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
617 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
619 p2pool_data.format_hash(share.hash),
620 p2pool_data.format_hash(share.previous_hash),
621 time.time() - getwork_time,
622 ' DEAD ON ARRIVAL' if not on_time else '',
624 my_share_hashes.add(share.hash)
626 my_doa_share_hashes.add(share.hash)
630 tracker.verified.add(share)
634 if pow_hash <= header['bits'].target or p2pool.DEBUG:
635 for peer in p2p_node.peers.itervalues():
636 peer.sendShares([share])
637 shared_share_hashes.add(share.hash)
639 log.err(None, 'Error forwarding block solution:')
641 if pow_hash > target:
642 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
643 print ' Hash: %56x' % (pow_hash,)
644 print ' Target: %56x' % (target,)
645 elif header_hash in received_header_hashes:
646 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
648 received_header_hashes.add(header_hash)
650 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
651 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
652 while len(self.recent_shares_ts_work) > 50:
653 self.recent_shares_ts_work.pop(0)
654 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
658 return ba, got_response
660 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
662 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
663 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
665 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
667 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
674 @defer.inlineCallbacks
677 flag = factory.new_block.get_deferred()
679 yield set_real_work1()
682 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
687 print 'Started successfully!'
691 if hasattr(signal, 'SIGALRM'):
692 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
693 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
695 signal.siginterrupt(signal.SIGALRM, False)
696 task.LoopingCall(signal.alarm, 30).start(1)
698 if args.irc_announce:
699 from twisted.words.protocols import irc
700 class IRCClient(irc.IRCClient):
701 nickname = 'p2pool%02i' % (random.randrange(100),)
702 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
703 def lineReceived(self, line):
705 irc.IRCClient.lineReceived(self, line)
707 irc.IRCClient.signedOn(self)
708 self.factory.resetDelay()
709 self.join(self.channel)
710 self.watch_id = tracker.verified.added.watch(self._new_share)
711 self.announced_hashes = set()
712 self.delayed_messages = {}
713 def privmsg(self, user, channel, message):
714 if channel == self.channel and message in self.delayed_messages:
715 self.delayed_messages.pop(message).cancel()
716 def _new_share(self, share):
717 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
718 self.announced_hashes.add(share.header_hash)
719 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
720 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
721 def connectionLost(self, reason):
722 tracker.verified.added.unwatch(self.watch_id)
723 print 'IRC connection lost:', reason.getErrorMessage()
724 class IRCClientFactory(protocol.ReconnectingClientFactory):
726 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
728 @defer.inlineCallbacks
733 yield deferral.sleep(3)
735 if time.time() > current_work2.value['last_update'] + 60:
736 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
738 height = tracker.get_height(current_work.value['best_share_hash'])
739 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
741 len(tracker.verified.shares),
744 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
745 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
747 datums, dt = local_rate_monitor.get_datums_in_last()
748 my_att_s = sum(datum['work']/dt for datum in datums)
749 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
750 math.format(int(my_att_s)),
752 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
753 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
757 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
758 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
759 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
761 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
762 shares, stale_orphan_shares, stale_doa_shares,
763 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
764 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
765 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
767 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
768 math.format(int(real_att_s)),
770 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
773 if this_str != last_str or time.time() > last_time + 15:
776 last_time = time.time()
781 log.err(None, 'Fatal error:')
785 class FixedArgumentParser(argparse.ArgumentParser):
786 def _read_args_from_files(self, arg_strings):
787 # expand arguments referencing files
789 for arg_string in arg_strings:
791 # for regular arguments, just add them back into the list
792 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
793 new_arg_strings.append(arg_string)
795 # replace arguments referencing files with the file content
798 args_file = open(arg_string[1:])
801 for arg_line in args_file.read().splitlines():
802 for arg in self.convert_arg_line_to_args(arg_line):
803 arg_strings.append(arg)
804 arg_strings = self._read_args_from_files(arg_strings)
805 new_arg_strings.extend(arg_strings)
809 err = sys.exc_info()[1]
812 # return the modified argument list
813 return new_arg_strings
815 def convert_arg_line_to_args(self, arg_line):
816 return [arg for arg in arg_line.split() if arg.strip()]
819 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
821 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
822 parser.add_argument('--version', action='version', version=p2pool.__version__)
823 parser.add_argument('--net',
824 help='use specified network (default: bitcoin)',
825 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
826 parser.add_argument('--testnet',
827 help='''use the network's testnet''',
828 action='store_const', const=True, default=False, dest='testnet')
829 parser.add_argument('--debug',
830 help='enable debugging mode',
831 action='store_const', const=True, default=False, dest='debug')
832 parser.add_argument('-a', '--address',
833 help='generate payouts to this address (default: <address requested from bitcoind>)',
834 type=str, action='store', default=None, dest='address')
835 parser.add_argument('--datadir',
836 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
837 type=str, action='store', default=None, dest='datadir')
838 parser.add_argument('--logfile',
839 help='''log to this file (default: data/<NET>/log)''',
840 type=str, action='store', default=None, dest='logfile')
841 parser.add_argument('--merged',
842 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
843 type=str, action='append', default=[], dest='merged_urls')
844 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
845 help='donate this percentage of work to author of p2pool (default: 0.5)',
846 type=float, action='store', default=0.5, dest='donation_percentage')
847 parser.add_argument('--irc-announce',
848 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
849 action='store_true', default=False, dest='irc_announce')
851 p2pool_group = parser.add_argument_group('p2pool interface')
852 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
853 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
854 type=int, action='store', default=None, dest='p2pool_port')
855 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
856 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
857 type=str, action='append', default=[], dest='p2pool_nodes')
858 parser.add_argument('--disable-upnp',
859 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
860 action='store_false', default=True, dest='upnp')
862 worker_group = parser.add_argument_group('worker interface')
863 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
864 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
865 type=str, action='store', default=None, dest='worker_endpoint')
866 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
867 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
868 type=float, action='store', default=0, dest='worker_fee')
870 bitcoind_group = parser.add_argument_group('bitcoind interface')
871 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
872 help='connect to this address (default: 127.0.0.1)',
873 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
874 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
875 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
876 type=int, action='store', default=None, dest='bitcoind_rpc_port')
877 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
878 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
879 type=int, action='store', default=None, dest='bitcoind_p2p_port')
881 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
882 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
883 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
885 args = parser.parse_args()
890 net_name = args.net_name + ('_testnet' if args.testnet else '')
891 net = networks.nets[net_name]
893 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
894 if not os.path.exists(datadir_path):
895 os.makedirs(datadir_path)
897 if len(args.bitcoind_rpc_userpass) > 2:
898 parser.error('a maximum of two arguments are allowed')
899 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
901 if args.bitcoind_rpc_password is None:
902 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
903 parser.error('This network has no configuration file function. Manually enter your RPC password.')
904 conf_path = net.PARENT.CONF_FILE_FUNC()
905 if not os.path.exists(conf_path):
906 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
907 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
910 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
911 with open(conf_path, 'rb') as f:
912 cp = ConfigParser.RawConfigParser()
913 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
914 for conf_name, var_name, var_type in [
915 ('rpcuser', 'bitcoind_rpc_username', str),
916 ('rpcpassword', 'bitcoind_rpc_password', str),
917 ('rpcport', 'bitcoind_rpc_port', int),
918 ('port', 'bitcoind_p2p_port', int),
920 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
921 setattr(args, var_name, var_type(cp.get('x', conf_name)))
922 if args.bitcoind_rpc_password is None:
923 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
925 if args.bitcoind_rpc_username is None:
926 args.bitcoind_rpc_username = ''
928 if args.bitcoind_rpc_port is None:
929 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
931 if args.bitcoind_p2p_port is None:
932 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
934 if args.p2pool_port is None:
935 args.p2pool_port = net.P2P_PORT
937 if args.worker_endpoint is None:
938 worker_endpoint = '', net.WORKER_PORT
939 elif ':' not in args.worker_endpoint:
940 worker_endpoint = '', int(args.worker_endpoint)
942 addr, port = args.worker_endpoint.rsplit(':', 1)
943 worker_endpoint = addr, int(port)
945 if args.address is not None:
947 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
949 parser.error('error parsing address: ' + repr(e))
951 args.pubkey_hash = None
953 def separate_url(url):
954 s = urlparse.urlsplit(url)
955 if '@' not in s.netloc:
956 parser.error('merged url netloc must contain an "@"')
957 userpass, new_netloc = s.netloc.rsplit('@', 1)
958 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
959 merged_urls = map(separate_url, args.merged_urls)
961 if args.logfile is None:
962 args.logfile = os.path.join(datadir_path, 'log')
964 logfile = logging.LogFile(args.logfile)
965 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
966 sys.stdout = logging.AbortPipe(pipe)
967 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
968 if hasattr(signal, "SIGUSR1"):
969 def sigusr1(signum, frame):
970 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
972 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
973 signal.signal(signal.SIGUSR1, sigusr1)
974 task.LoopingCall(logfile.reopen).start(5)
976 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)