1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 @deferral.DeferredCacher
172 @defer.inlineCallbacks
173 def height_cacher(block_hash):
175 x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
176 except jsonrpc.Error, e:
177 if e.code == -5 and not p2pool.DEBUG:
178 raise deferral.RetrySilentlyException()
180 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
181 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
182 def get_height_rel_highest(block_hash):
183 this_height = height_cacher.call_now(block_hash, 0)
184 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
185 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
186 return this_height - best_height_cached.value
188 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
190 def set_real_work2():
191 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
193 t = dict(pre_current_work.value)
194 t['best_share_hash'] = best
195 t['mm_chains'] = pre_merged_work.value
199 for peer2, share_hash in desired:
200 if share_hash not in tracker.tails: # was received in the time tracker.think was running
202 last_request_time, count = requested.get(share_hash, (None, 0))
203 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
205 potential_peers = set()
206 for head in tracker.tails[share_hash]:
207 potential_peers.update(peer_heads.get(head, set()))
208 potential_peers = [peer for peer in potential_peers if peer.connected2]
209 if count == 0 and peer2 is not None and peer2.connected2:
212 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
216 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
220 stops=list(set(tracker.heads) | set(
221 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
224 requested[share_hash] = t, count + 1
225 pre_current_work.changed.watch(lambda _: set_real_work2())
226 pre_merged_work.changed.watch(lambda _: set_real_work2())
232 @defer.inlineCallbacks
233 def set_merged_work(merged_url, merged_userpass):
234 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
236 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
237 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
238 hash=int(auxblock['hash'], 16),
239 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
240 merged_proxy=merged_proxy,
242 yield deferral.sleep(1)
243 for merged_url, merged_userpass in merged_urls:
244 set_merged_work(merged_url, merged_userpass)
246 @pre_merged_work.changed.watch
247 def _(new_merged_work):
248 print 'Got new merged mining work!'
250 # setup p2p logic and join p2pool network
252 class Node(p2p.Node):
253 def handle_shares(self, shares, peer):
255 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
259 if share.hash in tracker.shares:
260 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
265 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
269 if shares and peer is not None:
270 peer_heads.setdefault(shares[0].hash, set()).add(peer)
276 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
278 def handle_share_hashes(self, hashes, peer):
281 for share_hash in hashes:
282 if share_hash in tracker.shares:
284 last_request_time, count = requested.get(share_hash, (None, 0))
285 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
287 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
288 get_hashes.append(share_hash)
289 requested[share_hash] = t, count + 1
291 if hashes and peer is not None:
292 peer_heads.setdefault(hashes[0], set()).add(peer)
294 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
296 def handle_get_shares(self, hashes, parents, stops, peer):
297 parents = min(parents, 1000//len(hashes))
300 for share_hash in hashes:
301 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
302 if share.hash in stops:
305 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
306 peer.sendShares(shares)
308 @deferral.retry('Error submitting block: (will retry)', 10, 10)
309 @defer.inlineCallbacks
310 def submit_block(block, ignore_failure):
311 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
312 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
313 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
314 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
316 @tracker.verified.added.watch
318 if share.pow_hash <= share.header['bits'].target:
319 submit_block(share.as_block(tracker), ignore_failure=True)
321 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
323 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
325 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
327 @defer.inlineCallbacks
330 ip, port = x.split(':')
331 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
333 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
336 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
338 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
340 print >>sys.stderr, "error reading addrs"
341 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
344 if addr not in addrs:
345 addrs[addr] = (0, time.time(), time.time())
349 connect_addrs = set()
350 for addr_df in map(parse, args.p2pool_nodes):
352 connect_addrs.add((yield addr_df))
357 best_share_hash_func=lambda: current_work.value['best_share_hash'],
358 port=args.p2pool_port,
361 connect_addrs=connect_addrs,
365 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
367 # send share when the chain changes to their chain
368 def work_changed(new_work):
369 #print 'Work changed:', new_work
371 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
372 if share.hash in shared_share_hashes:
374 shared_share_hashes.add(share.hash)
377 for peer in p2p_node.peers.itervalues():
378 peer.sendShares([share for share in shares if share.peer is not peer])
380 current_work.changed.watch(work_changed)
383 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
385 if share.hash in tracker.verified.shares:
386 ss.add_verified_hash(share.hash)
387 task.LoopingCall(save_shares).start(60)
393 @defer.inlineCallbacks
397 is_lan, lan_ip = yield ipdiscover.get_local_ip()
399 pm = yield portmapper.get_port_mapper()
400 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
401 except defer.TimeoutError:
405 log.err(None, 'UPnP error:')
406 yield deferral.sleep(random.expovariate(1/120))
409 # start listening for workers with a JSON-RPC server
411 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
413 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
414 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
415 vip_pass = f.read().strip('\r\n')
417 vip_pass = '%016x' % (random.randrange(2**64),)
418 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
420 print ' Worker password:', vip_pass, '(only required for generating graphs)'
424 removed_unstales_var = variable.Variable((0, 0, 0))
425 removed_doa_unstales_var = variable.Variable(0)
426 @tracker.verified.removed.watch
428 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
429 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
430 removed_unstales_var.set((
431 removed_unstales_var.value[0] + 1,
432 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
433 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
435 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
436 removed_doa_unstales.set(removed_doa_unstales.value + 1)
438 def get_stale_counts():
439 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
440 my_shares = len(my_share_hashes)
441 my_doa_shares = len(my_doa_share_hashes)
442 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
443 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
444 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
445 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
446 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
448 my_shares_not_in_chain = my_shares - my_shares_in_chain
449 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
451 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
454 pseudoshare_received = variable.Event()
455 local_rate_monitor = math.RateMonitor(10*60)
457 class WorkerBridge(worker_interface.WorkerBridge):
459 worker_interface.WorkerBridge.__init__(self)
460 self.new_work_event = current_work.changed
461 self.recent_shares_ts_work = []
463 def preprocess_request(self, request):
464 user = request.getUser() if request.getUser() is not None else ''
466 desired_pseudoshare_target = None
468 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
470 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
474 desired_share_target = 2**256 - 1
476 user, min_diff_str = user.rsplit('/', 1)
478 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
482 if random.uniform(0, 100) < args.worker_fee:
483 pubkey_hash = my_pubkey_hash
486 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
488 pubkey_hash = my_pubkey_hash
490 return pubkey_hash, desired_share_target, desired_pseudoshare_target
492 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
493 if len(p2p_node.peers) == 0 and net.PERSIST:
494 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
495 if current_work.value['best_share_hash'] is None and net.PERSIST:
496 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
497 if time.time() > current_work2.value['last_update'] + 60:
498 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
500 if current_work.value['mm_chains']:
501 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
502 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
503 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
504 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
508 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
513 share_info, generate_tx = p2pool_data.generate_transaction(
516 previous_share_hash=current_work.value['best_share_hash'],
517 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
518 nonce=random.randrange(2**32),
519 pubkey_hash=pubkey_hash,
520 subsidy=current_work2.value['subsidy'],
521 donation=math.perfect_round(65535*args.donation_percentage/100),
522 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
523 253 if orphans > orphans_recorded_in_chain else
524 254 if doas > doas_recorded_in_chain else
526 )(*get_stale_counts()),
528 block_target=current_work.value['bits'].target,
529 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
530 desired_target=desired_share_target,
534 target = net.PARENT.SANE_MAX_TARGET
535 if desired_pseudoshare_target is None:
536 if len(self.recent_shares_ts_work) == 50:
537 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
538 target = min(target, 2**256//hash_rate)
540 target = min(target, desired_pseudoshare_target)
541 target = max(target, share_info['bits'].target)
542 for aux_work in current_work.value['mm_chains'].itervalues():
543 target = max(target, aux_work['target'])
545 transactions = [generate_tx] + list(current_work2.value['transactions'])
546 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
547 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
549 getwork_time = time.time()
550 merkle_branch = current_work2.value['merkle_branch']
552 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
553 bitcoin_data.target_to_difficulty(target),
554 bitcoin_data.target_to_difficulty(share_info['bits'].target),
555 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
556 len(current_work2.value['transactions']),
559 ba = bitcoin_getwork.BlockAttempt(
560 version=current_work.value['version'],
561 previous_block=current_work.value['previous_block'],
562 merkle_root=merkle_root,
563 timestamp=current_work2.value['time'],
564 bits=current_work.value['bits'],
568 received_header_hashes = set()
570 def got_response(header, request):
571 assert header['merkle_root'] == merkle_root
573 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
574 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
575 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
578 if pow_hash <= header['bits'].target or p2pool.DEBUG:
579 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
580 if pow_hash <= header['bits'].target:
582 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
584 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
586 log.err(None, 'Error while processing potential block:')
588 for aux_work, index, hashes in mm_later:
590 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
591 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
592 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
593 bitcoin_data.aux_pow_type.pack(dict(
596 block_hash=header_hash,
597 merkle_branch=merkle_branch,
600 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
602 parent_block_header=header,
607 if result != (pow_hash <= aux_work['target']):
608 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
610 print 'Merged block submittal result: %s' % (result,)
613 log.err(err, 'Error submitting merged block:')
615 log.err(None, 'Error while processing merged mining POW:')
617 if pow_hash <= share_info['bits'].target:
618 min_header = dict(header);del min_header['merkle_root']
619 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
620 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
622 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
624 p2pool_data.format_hash(share.hash),
625 p2pool_data.format_hash(share.previous_hash),
626 time.time() - getwork_time,
627 ' DEAD ON ARRIVAL' if not on_time else '',
629 my_share_hashes.add(share.hash)
631 my_doa_share_hashes.add(share.hash)
635 tracker.verified.add(share)
639 if pow_hash <= header['bits'].target or p2pool.DEBUG:
640 for peer in p2p_node.peers.itervalues():
641 peer.sendShares([share])
642 shared_share_hashes.add(share.hash)
644 log.err(None, 'Error forwarding block solution:')
646 if pow_hash > target:
647 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
648 print ' Hash: %56x' % (pow_hash,)
649 print ' Target: %56x' % (target,)
650 elif header_hash in received_header_hashes:
651 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
653 received_header_hashes.add(header_hash)
655 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
656 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
657 while len(self.recent_shares_ts_work) > 50:
658 self.recent_shares_ts_work.pop(0)
659 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
663 return ba, got_response
665 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
667 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
668 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
670 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
672 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
679 @defer.inlineCallbacks
682 flag = factory.new_block.get_deferred()
684 yield set_real_work1()
687 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
692 print 'Started successfully!'
696 if hasattr(signal, 'SIGALRM'):
697 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
698 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
700 signal.siginterrupt(signal.SIGALRM, False)
701 task.LoopingCall(signal.alarm, 30).start(1)
703 if args.irc_announce:
704 from twisted.words.protocols import irc
705 class IRCClient(irc.IRCClient):
706 nickname = 'p2pool%02i' % (random.randrange(100),)
707 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
708 def lineReceived(self, line):
710 irc.IRCClient.lineReceived(self, line)
712 irc.IRCClient.signedOn(self)
713 self.factory.resetDelay()
714 self.join(self.channel)
715 self.watch_id = tracker.verified.added.watch(self._new_share)
716 self.announced_hashes = set()
717 self.delayed_messages = {}
718 def privmsg(self, user, channel, message):
719 if channel == self.channel and message in self.delayed_messages:
720 self.delayed_messages.pop(message).cancel()
721 def _new_share(self, share):
722 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
723 self.announced_hashes.add(share.header_hash)
724 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
725 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
726 def connectionLost(self, reason):
727 tracker.verified.added.unwatch(self.watch_id)
728 print 'IRC connection lost:', reason.getErrorMessage()
729 class IRCClientFactory(protocol.ReconnectingClientFactory):
731 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
733 @defer.inlineCallbacks
738 yield deferral.sleep(3)
740 if time.time() > current_work2.value['last_update'] + 60:
741 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
743 height = tracker.get_height(current_work.value['best_share_hash'])
744 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
746 len(tracker.verified.shares),
749 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
750 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
752 datums, dt = local_rate_monitor.get_datums_in_last()
753 my_att_s = sum(datum['work']/dt for datum in datums)
754 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
755 math.format(int(my_att_s)),
757 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
758 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
762 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
763 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
764 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
766 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
767 shares, stale_orphan_shares, stale_doa_shares,
768 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
769 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
770 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
772 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
773 math.format(int(real_att_s)),
775 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
778 if this_str != last_str or time.time() > last_time + 15:
781 last_time = time.time()
786 log.err(None, 'Fatal error:')
790 class FixedArgumentParser(argparse.ArgumentParser):
791 def _read_args_from_files(self, arg_strings):
792 # expand arguments referencing files
794 for arg_string in arg_strings:
796 # for regular arguments, just add them back into the list
797 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
798 new_arg_strings.append(arg_string)
800 # replace arguments referencing files with the file content
803 args_file = open(arg_string[1:])
806 for arg_line in args_file.read().splitlines():
807 for arg in self.convert_arg_line_to_args(arg_line):
808 arg_strings.append(arg)
809 arg_strings = self._read_args_from_files(arg_strings)
810 new_arg_strings.extend(arg_strings)
814 err = sys.exc_info()[1]
817 # return the modified argument list
818 return new_arg_strings
820 def convert_arg_line_to_args(self, arg_line):
821 return [arg for arg in arg_line.split() if arg.strip()]
824 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
826 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
827 parser.add_argument('--version', action='version', version=p2pool.__version__)
828 parser.add_argument('--net',
829 help='use specified network (default: bitcoin)',
830 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
831 parser.add_argument('--testnet',
832 help='''use the network's testnet''',
833 action='store_const', const=True, default=False, dest='testnet')
834 parser.add_argument('--debug',
835 help='enable debugging mode',
836 action='store_const', const=True, default=False, dest='debug')
837 parser.add_argument('-a', '--address',
838 help='generate payouts to this address (default: <address requested from bitcoind>)',
839 type=str, action='store', default=None, dest='address')
840 parser.add_argument('--datadir',
841 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
842 type=str, action='store', default=None, dest='datadir')
843 parser.add_argument('--logfile',
844 help='''log to this file (default: data/<NET>/log)''',
845 type=str, action='store', default=None, dest='logfile')
846 parser.add_argument('--merged',
847 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
848 type=str, action='append', default=[], dest='merged_urls')
849 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
850 help='donate this percentage of work to author of p2pool (default: 0.5)',
851 type=float, action='store', default=0.5, dest='donation_percentage')
852 parser.add_argument('--irc-announce',
853 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
854 action='store_true', default=False, dest='irc_announce')
856 p2pool_group = parser.add_argument_group('p2pool interface')
857 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
858 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
859 type=int, action='store', default=None, dest='p2pool_port')
860 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
861 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
862 type=str, action='append', default=[], dest='p2pool_nodes')
863 parser.add_argument('--disable-upnp',
864 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
865 action='store_false', default=True, dest='upnp')
867 worker_group = parser.add_argument_group('worker interface')
868 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
869 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
870 type=str, action='store', default=None, dest='worker_endpoint')
871 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
872 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
873 type=float, action='store', default=0, dest='worker_fee')
875 bitcoind_group = parser.add_argument_group('bitcoind interface')
876 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
877 help='connect to this address (default: 127.0.0.1)',
878 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
879 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
880 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
881 type=int, action='store', default=None, dest='bitcoind_rpc_port')
882 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
883 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
884 type=int, action='store', default=None, dest='bitcoind_p2p_port')
886 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
887 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
888 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
890 args = parser.parse_args()
895 net_name = args.net_name + ('_testnet' if args.testnet else '')
896 net = networks.nets[net_name]
898 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
899 if not os.path.exists(datadir_path):
900 os.makedirs(datadir_path)
902 if len(args.bitcoind_rpc_userpass) > 2:
903 parser.error('a maximum of two arguments are allowed')
904 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
906 if args.bitcoind_rpc_password is None:
907 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
908 parser.error('This network has no configuration file function. Manually enter your RPC password.')
909 conf_path = net.PARENT.CONF_FILE_FUNC()
910 if not os.path.exists(conf_path):
911 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
912 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
915 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
916 with open(conf_path, 'rb') as f:
917 cp = ConfigParser.RawConfigParser()
918 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
919 for conf_name, var_name, var_type in [
920 ('rpcuser', 'bitcoind_rpc_username', str),
921 ('rpcpassword', 'bitcoind_rpc_password', str),
922 ('rpcport', 'bitcoind_rpc_port', int),
923 ('port', 'bitcoind_p2p_port', int),
925 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
926 setattr(args, var_name, var_type(cp.get('x', conf_name)))
927 if args.bitcoind_rpc_password is None:
928 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
930 if args.bitcoind_rpc_username is None:
931 args.bitcoind_rpc_username = ''
933 if args.bitcoind_rpc_port is None:
934 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
936 if args.bitcoind_p2p_port is None:
937 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
939 if args.p2pool_port is None:
940 args.p2pool_port = net.P2P_PORT
942 if args.worker_endpoint is None:
943 worker_endpoint = '', net.WORKER_PORT
944 elif ':' not in args.worker_endpoint:
945 worker_endpoint = '', int(args.worker_endpoint)
947 addr, port = args.worker_endpoint.rsplit(':', 1)
948 worker_endpoint = addr, int(port)
950 if args.address is not None:
952 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
954 parser.error('error parsing address: ' + repr(e))
956 args.pubkey_hash = None
958 def separate_url(url):
959 s = urlparse.urlsplit(url)
960 if '@' not in s.netloc:
961 parser.error('merged url netloc must contain an "@"')
962 userpass, new_netloc = s.netloc.rsplit('@', 1)
963 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
964 merged_urls = map(separate_url, args.merged_urls)
966 if args.logfile is None:
967 args.logfile = os.path.join(datadir_path, 'log')
969 logfile = logging.LogFile(args.logfile)
970 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
971 sys.stdout = logging.AbortPipe(pipe)
972 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
973 if hasattr(signal, "SIGUSR1"):
974 def sigusr1(signum, frame):
975 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
977 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
978 signal.signal(signal.SIGUSR1, sigusr1)
979 task.LoopingCall(logfile.reopen).start(5)
981 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)