1 from __future__ import division
15 from twisted.internet import defer, error, reactor, protocol, task
16 from twisted.web import server, resource
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, graphs
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 work = yield bitcoind.rpc_getmemorypool()
30 transactions = [bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']]
31 defer.returnValue(dict(
32 version=work['version'],
33 previous_block_hash=int(work['previousblockhash'], 16),
34 transactions=transactions,
35 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in transactions], 0),
36 subsidy=work['coinbasevalue'],
38 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
39 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
42 @defer.inlineCallbacks
43 def main(args, net, datadir_path, merged_urls):
45 print 'p2pool (version %s)' % (p2pool.__version__,)
51 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 if args.pubkey_hash is None:
76 print 'Getting payout address from bitcoind...'
77 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
78 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
81 print 'Computing payout script from provided address....'
82 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
84 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
87 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
89 my_share_hashes = set()
90 my_doa_share_hashes = set()
92 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
93 shared_share_hashes = set()
94 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
95 known_verified = set()
97 print "Loading shares..."
98 for i, (mode, contents) in enumerate(ss.get_shares()):
100 if contents.hash in tracker.shares:
102 shared_share_hashes.add(contents.hash)
103 contents.time_seen = 0
104 tracker.add(contents)
105 if len(tracker.shares) % 1000 == 0 and tracker.shares:
106 print " %i" % (len(tracker.shares),)
107 elif mode == 'verified_hash':
108 known_verified.add(contents)
110 raise AssertionError()
111 print " ...inserting %i verified shares..." % (len(known_verified),)
112 for h in known_verified:
113 if h not in tracker.shares:
114 ss.forget_verified_share(h)
116 tracker.verified.add(tracker.shares[h])
117 print " ...done loading %i shares!" % (len(tracker.shares),)
119 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
120 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
121 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
123 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
125 pre_current_work = variable.Variable(None)
126 pre_merged_work = variable.Variable({})
127 # information affecting work that should trigger a long-polling update
128 current_work = variable.Variable(None)
129 # information affecting work that should not trigger a long-polling update
130 current_work2 = variable.Variable(None)
132 requested = expiring_dict.ExpiringDict(300)
134 @defer.inlineCallbacks
135 def set_real_work1():
136 work = yield getwork(bitcoind)
137 current_work2.set(dict(
139 transactions=work['transactions'],
140 merkle_branch=work['merkle_branch'],
141 subsidy=work['subsidy'],
142 clock_offset=time.time() - work['time'],
143 last_update=time.time(),
144 )) # second set first because everything hooks on the first
145 pre_current_work.set(dict(
146 version=work['version'],
147 previous_block=work['previous_block_hash'],
149 coinbaseflags=work['coinbaseflags'],
152 def set_real_work2():
153 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
155 t = dict(pre_current_work.value)
156 t['best_share_hash'] = best
157 t['mm_chains'] = pre_merged_work.value
161 for peer2, share_hash in desired:
162 if share_hash not in tracker.tails: # was received in the time tracker.think was running
164 last_request_time, count = requested.get(share_hash, (None, 0))
165 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
167 potential_peers = set()
168 for head in tracker.tails[share_hash]:
169 potential_peers.update(peer_heads.get(head, set()))
170 potential_peers = [peer for peer in potential_peers if peer.connected2]
171 if count == 0 and peer2 is not None and peer2.connected2:
174 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
178 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
182 stops=list(set(tracker.heads) | set(
183 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
186 requested[share_hash] = t, count + 1
187 pre_current_work.changed.watch(lambda _: set_real_work2())
189 print 'Initializing work...'
190 yield set_real_work1()
194 pre_merged_work.changed.watch(lambda _: set_real_work2())
195 ht.updated.watch(set_real_work2)
198 @defer.inlineCallbacks
199 def set_merged_work(merged_url, merged_userpass):
200 merged_proxy = jsonrpc.Proxy(args.merged_url, (merged_userpass,))
202 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
203 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
204 hash=int(auxblock['hash'], 16),
205 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
206 merged_proxy=merged_proxy,
208 yield deferral.sleep(1)
209 for merged_url, merged_userpass in merged_urls:
210 set_merged_work(merged_url, merged_userpass)
212 @pre_merged_work.changed.watch
213 def _(new_merged_work):
214 print 'Got new merged mining work!'
216 # setup p2p logic and join p2pool network
218 class Node(p2p.Node):
219 def handle_shares(self, shares, peer):
221 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
225 if share.hash in tracker.shares:
226 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
231 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
235 if shares and peer is not None:
236 peer_heads.setdefault(shares[0].hash, set()).add(peer)
242 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
244 def handle_share_hashes(self, hashes, peer):
247 for share_hash in hashes:
248 if share_hash in tracker.shares:
250 last_request_time, count = requested.get(share_hash, (None, 0))
251 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
253 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
254 get_hashes.append(share_hash)
255 requested[share_hash] = t, count + 1
257 if hashes and peer is not None:
258 peer_heads.setdefault(hashes[0], set()).add(peer)
260 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
262 def handle_get_shares(self, hashes, parents, stops, peer):
263 parents = min(parents, 1000//len(hashes))
266 for share_hash in hashes:
267 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
268 if share.hash in stops:
271 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
272 peer.sendShares(shares)
274 @tracker.verified.added.watch
276 if share.pow_hash <= share.header['bits'].target:
277 if factory.conn.value is not None:
278 factory.conn.value.send_block(block=share.as_block(tracker))
280 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
282 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
284 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
286 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
288 @defer.inlineCallbacks
291 ip, port = x.split(':')
292 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
294 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
297 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
299 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
301 print >>sys.stderr, "error reading addrs"
302 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
305 if addr not in addrs:
306 addrs[addr] = (0, time.time(), time.time())
310 connect_addrs = set()
311 for addr_df in map(parse, args.p2pool_nodes):
313 connect_addrs.add((yield addr_df))
318 best_share_hash_func=lambda: current_work.value['best_share_hash'],
319 port=args.p2pool_port,
322 connect_addrs=connect_addrs,
327 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
328 task.LoopingCall(save_addrs).start(60)
330 # send share when the chain changes to their chain
331 def work_changed(new_work):
332 #print 'Work changed:', new_work
334 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
335 if share.hash in shared_share_hashes:
337 shared_share_hashes.add(share.hash)
340 for peer in p2p_node.peers.itervalues():
341 peer.sendShares([share for share in shares if share.peer is not peer])
343 current_work.changed.watch(work_changed)
346 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
348 if share.hash in tracker.verified.shares:
349 ss.add_verified_hash(share.hash)
350 task.LoopingCall(save_shares).start(60)
355 @defer.inlineCallbacks
359 is_lan, lan_ip = yield ipdiscover.get_local_ip()
361 pm = yield portmapper.get_port_mapper()
362 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
363 except defer.TimeoutError:
367 log.err(None, "UPnP error:")
368 yield deferral.sleep(random.expovariate(1/120))
373 # start listening for workers with a JSON-RPC server
375 print 'Listening for workers on port %i...' % (args.worker_port,)
377 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
378 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
379 vip_pass = f.read().strip('\r\n')
381 vip_pass = '%016x' % (random.randrange(2**64),)
382 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
384 print ' Worker password:', vip_pass, '(only required for generating graphs)'
388 removed_unstales_var = variable.Variable((0, 0, 0))
389 @tracker.verified.removed.watch
391 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
392 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
393 removed_unstales_var.set((
394 removed_unstales_var.value[0] + 1,
395 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
396 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
399 removed_doa_unstales_var = variable.Variable(0)
400 @tracker.verified.removed.watch
402 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
403 removed_doa_unstales.set(removed_doa_unstales.value + 1)
405 def get_stale_counts():
406 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
407 my_shares = len(my_share_hashes)
408 my_doa_shares = len(my_doa_share_hashes)
409 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
410 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
411 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
412 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
413 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
415 my_shares_not_in_chain = my_shares - my_shares_in_chain
416 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
418 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
420 class WorkerBridge(worker_interface.WorkerBridge):
422 worker_interface.WorkerBridge.__init__(self)
423 self.new_work_event = current_work.changed
425 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
426 self.recent_shares_ts_work = []
428 def _get_payout_script_from_username(self, user):
432 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
435 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
437 def preprocess_request(self, request):
438 payout_script = self._get_payout_script_from_username(request.getUser())
439 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
440 payout_script = my_script
441 return payout_script,
443 def get_work(self, payout_script):
444 if len(p2p_node.peers) == 0 and net.PERSIST:
445 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
446 if current_work.value['best_share_hash'] is None and net.PERSIST:
447 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
448 if time.time() > current_work2.value['last_update'] + 60:
449 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
451 if current_work.value['mm_chains']:
452 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
453 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
454 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
455 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
459 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
464 share_info, generate_tx = p2pool_data.generate_transaction(
467 previous_share_hash=current_work.value['best_share_hash'],
468 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
469 nonce=struct.pack('<Q', random.randrange(2**64)),
470 new_script=payout_script,
471 subsidy=current_work2.value['subsidy'],
472 donation=math.perfect_round(65535*args.donation_percentage/100),
473 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
474 253 if orphans > orphans_recorded_in_chain else
475 254 if doas > doas_recorded_in_chain else
477 )(*get_stale_counts()),
479 block_target=current_work.value['bits'].target,
480 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
484 target = 2**256//2**32 - 1
485 if len(self.recent_shares_ts_work) == 50:
486 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
487 target = min(target, 2**256//(hash_rate * 5))
488 target = max(target, share_info['bits'].target)
489 for aux_work in current_work.value['mm_chains'].itervalues():
490 target = max(target, aux_work['target'])
492 transactions = [generate_tx] + list(current_work2.value['transactions'])
493 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
494 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
496 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
497 bitcoin_data.target_to_difficulty(target),
498 bitcoin_data.target_to_difficulty(share_info['bits'].target),
499 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
500 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
501 len(current_work2.value['transactions']),
504 return bitcoin_getwork.BlockAttempt(
505 version=current_work.value['version'],
506 previous_block=current_work.value['previous_block'],
507 merkle_root=merkle_root,
508 timestamp=current_work2.value['time'],
509 bits=current_work.value['bits'],
513 def got_response(self, header, request):
514 # match up with transactions
515 if header['merkle_root'] not in self.merkle_root_to_transactions:
516 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
518 share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
520 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
521 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
524 if pow_hash <= header['bits'].target or p2pool.DEBUG:
525 if factory.conn.value is not None:
526 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
528 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
529 if pow_hash <= header['bits'].target:
531 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
533 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
535 log.err(None, 'Error while processing potential block:')
537 for aux_work, index, hashes in mm_later:
539 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
540 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
541 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
542 bitcoin_data.aux_pow_type.pack(dict(
545 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
546 merkle_branch=merkle_branch,
549 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
551 parent_block_header=header,
556 if result != (pow_hash <= aux_work['target']):
557 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
559 print 'Merged block submittal result: %s' % (result,)
562 log.err(err, 'Error submitting merged block:')
564 log.err(None, 'Error while processing merged mining POW:')
566 if pow_hash <= share_info['bits'].target:
567 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
568 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
570 p2pool_data.format_hash(share.hash),
571 p2pool_data.format_hash(share.previous_hash),
572 time.time() - getwork_time,
573 ' DEAD ON ARRIVAL' if not on_time else '',
575 my_share_hashes.add(share.hash)
577 my_doa_share_hashes.add(share.hash)
581 tracker.verified.add(share)
585 if pow_hash <= header['bits'].target or p2pool.DEBUG:
586 for peer in p2p_node.peers.itervalues():
587 peer.sendShares([share])
588 shared_share_hashes.add(share.hash)
590 log.err(None, 'Error forwarding block solution:')
592 if pow_hash <= target:
593 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
594 if request.getPassword() == vip_pass:
595 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
596 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
597 while len(self.recent_shares_ts_work) > 50:
598 self.recent_shares_ts_work.pop(0)
600 if pow_hash > target:
601 print 'Worker submitted share with hash > target:'
602 print ' Hash: %56x' % (pow_hash,)
603 print ' Target: %56x' % (target,)
607 web_root = resource.Resource()
608 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
611 if tracker.get_height(current_work.value['best_share_hash']) < 720:
612 return json.dumps(None)
613 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
614 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
617 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
618 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
620 for script in sorted(weights, key=lambda s: weights[s]):
621 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
622 return json.dumps(res)
624 def get_current_txouts():
625 share = tracker.shares[current_work.value['best_share_hash']]
626 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
627 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
629 def get_current_scaled_txouts(scale, trunc=0):
630 txouts = get_current_txouts()
631 total = sum(txouts.itervalues())
632 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
636 for s in sorted(results, key=results.__getitem__):
637 total_random += results[s]
639 if total_random >= trunc and results[s] >= trunc:
641 winner = math.weighted_choice((script, results[script]) for script in random_set)
642 for script in random_set:
644 results[winner] = total_random
645 if sum(results.itervalues()) < int(scale):
646 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
649 def get_current_payouts():
650 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
652 def get_patron_sendmany(this):
655 this, trunc = this.split('/', 1)
658 return json.dumps(dict(
659 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
660 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
661 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
664 return json.dumps(None)
666 def get_global_stats():
667 # averaged over last hour
668 lookbehind = 3600//net.SHARE_PERIOD
669 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
672 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
673 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
674 return json.dumps(dict(
675 pool_nonstale_hash_rate=nonstale_hash_rate,
676 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
677 pool_stale_prop=stale_prop,
680 def get_local_stats():
681 lookbehind = 3600//net.SHARE_PERIOD
682 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
685 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
687 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
688 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
689 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
690 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
691 my_stale_count = my_orphan_count + my_doa_count
693 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
695 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
696 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
697 if share.hash in my_share_hashes)
698 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
699 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
700 share_att_s = my_work / actual_time
702 return json.dumps(dict(
703 my_hash_rates_in_last_hour=dict(
704 nonstale=share_att_s,
705 rewarded=share_att_s/(1 - global_stale_prop),
706 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
708 my_share_counts_in_last_hour=dict(
709 shares=my_share_count,
710 unstale_shares=my_unstale_count,
711 stale_shares=my_stale_count,
712 orphan_stale_shares=my_orphan_count,
713 doa_stale_shares=my_doa_count,
715 my_stale_proportions_in_last_hour=dict(
717 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
718 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
722 def get_peer_addresses():
723 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
725 class WebInterface(resource.Resource):
726 def __init__(self, func, mime_type, *fields):
727 self.func, self.mime_type, self.fields = func, mime_type, fields
729 def render_GET(self, request):
730 request.setHeader('Content-Type', self.mime_type)
731 request.setHeader('Access-Control-Allow-Origin', '*')
732 return self.func(*(request.args[field][0] for field in self.fields))
734 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
735 web_root.putChild('users', WebInterface(get_users, 'application/json'))
736 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
737 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
738 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
739 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
740 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
741 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
742 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
743 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
745 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
747 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
748 web_root.putChild('graphs', grapher.get_resource())
750 if tracker.get_height(current_work.value['best_share_hash']) < 720:
752 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
753 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
754 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
755 task.LoopingCall(add_point).start(100)
757 def attempt_listen():
759 reactor.listenTCP(args.worker_port, server.Site(web_root))
760 except error.CannotListenError, e:
761 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
762 reactor.callLater(1, attempt_listen)
764 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
772 @defer.inlineCallbacks
775 flag = factory.new_block.get_deferred()
777 yield set_real_work1()
780 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
785 print 'Started successfully!'
789 if hasattr(signal, 'SIGALRM'):
790 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
791 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
793 signal.siginterrupt(signal.SIGALRM, False)
794 task.LoopingCall(signal.alarm, 30).start(1)
796 if args.irc_announce:
797 from twisted.words.protocols import irc
798 class IRCClient(irc.IRCClient):
800 def lineReceived(self, line):
802 irc.IRCClient.lineReceived(self, line)
804 irc.IRCClient.signedOn(self)
805 self.factory.resetDelay()
807 self.watch_id = tracker.verified.added.watch(self._new_share)
808 self.announced_hashes = set()
809 def _new_share(self, share):
810 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
811 self.announced_hashes.add(share.header_hash)
812 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.new_script, net.PARENT), share.header_hash))
813 def connectionLost(self, reason):
814 tracker.verified.added.unwatch(self.watch_id)
815 class IRCClientFactory(protocol.ReconnectingClientFactory):
817 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
819 @defer.inlineCallbacks
824 yield deferral.sleep(3)
826 if time.time() > current_work2.value['last_update'] + 60:
827 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
828 if current_work.value['best_share_hash'] is not None:
829 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
831 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
832 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
833 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
834 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
835 real_att_s = att_s / (1 - stale_prop)
836 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
837 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
838 math.format(int(real_att_s)),
840 len(tracker.verified.shares),
842 weights.get(my_script, 0)/total_weight*100,
843 math.format(int(my_att_s)),
848 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
849 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
850 this_str += '\nAverage time between blocks: %.2f days' % (
851 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
853 this_str += '\nPool stales: %i%% Own: %s Own efficiency: %s' % (
854 int(100*stale_prop+.5),
855 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
856 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
858 if this_str != last_str or time.time() > last_time + 15:
861 last_time = time.time()
866 log.err(None, 'Fatal error:')
870 class FixedArgumentParser(argparse.ArgumentParser):
871 def _read_args_from_files(self, arg_strings):
872 # expand arguments referencing files
874 for arg_string in arg_strings:
876 # for regular arguments, just add them back into the list
877 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
878 new_arg_strings.append(arg_string)
880 # replace arguments referencing files with the file content
883 args_file = open(arg_string[1:])
886 for arg_line in args_file.read().splitlines():
887 for arg in self.convert_arg_line_to_args(arg_line):
888 arg_strings.append(arg)
889 arg_strings = self._read_args_from_files(arg_strings)
890 new_arg_strings.extend(arg_strings)
894 err = sys.exc_info()[1]
897 # return the modified argument list
898 return new_arg_strings
900 def convert_arg_line_to_args(self, arg_line):
901 return [arg for arg in arg_line.split() if arg.strip()]
903 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
904 parser.add_argument('--version', action='version', version=p2pool.__version__)
905 parser.add_argument('--net',
906 help='use specified network (default: bitcoin)',
907 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
908 parser.add_argument('--testnet',
909 help='''use the network's testnet''',
910 action='store_const', const=True, default=False, dest='testnet')
911 parser.add_argument('--debug',
912 help='enable debugging mode',
913 action='store_const', const=True, default=False, dest='debug')
914 parser.add_argument('-a', '--address',
915 help='generate payouts to this address (default: <address requested from bitcoind>)',
916 type=str, action='store', default=None, dest='address')
917 parser.add_argument('--logfile',
918 help='''log to this file (default: data/<NET>/log)''',
919 type=str, action='store', default=None, dest='logfile')
920 parser.add_argument('--merged',
921 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
922 type=str, action='append', default=[], dest='merged_urls')
923 parser.add_argument('--merged-url',
924 help='DEPRECIATED, use --merged',
925 type=str, action='store', default=None, dest='merged_url')
926 parser.add_argument('--merged-userpass',
927 help='DEPRECIATED, use --merged',
928 type=str, action='store', default=None, dest='merged_userpass')
929 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
930 help='donate this percentage of work to author of p2pool (default: 0.5)',
931 type=float, action='store', default=0.5, dest='donation_percentage')
932 parser.add_argument('--irc-announce',
933 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
934 action='store_true', default=False, dest='irc_announce')
936 p2pool_group = parser.add_argument_group('p2pool interface')
937 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
938 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
939 type=int, action='store', default=None, dest='p2pool_port')
940 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
941 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
942 type=str, action='append', default=[], dest='p2pool_nodes')
943 parser.add_argument('--disable-upnp',
944 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
945 action='store_false', default=True, dest='upnp')
947 worker_group = parser.add_argument_group('worker interface')
948 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
949 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
950 type=int, action='store', default=None, dest='worker_port')
951 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
952 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
953 type=float, action='store', default=0, dest='worker_fee')
955 bitcoind_group = parser.add_argument_group('bitcoind interface')
956 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
957 help='connect to this address (default: 127.0.0.1)',
958 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
959 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
960 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
961 type=int, action='store', default=None, dest='bitcoind_rpc_port')
962 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
963 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
964 type=int, action='store', default=None, dest='bitcoind_p2p_port')
966 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
967 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
968 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
970 args = parser.parse_args()
975 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
977 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
978 if not os.path.exists(datadir_path):
979 os.makedirs(datadir_path)
981 if len(args.bitcoind_rpc_userpass) > 2:
982 parser.error('a maximum of two arguments are allowed')
983 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
985 if args.bitcoind_rpc_password is None:
986 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
987 parser.error('This network has no configuration file function. Manually enter your RPC password.')
988 conf_path = net.PARENT.CONF_FILE_FUNC()
989 if not os.path.exists(conf_path):
990 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
991 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
994 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
995 with open(conf_path, 'rb') as f:
996 cp = ConfigParser.RawConfigParser()
997 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
998 for conf_name, var_name, var_type in [
999 ('rpcuser', 'bitcoind_rpc_username', str),
1000 ('rpcpassword', 'bitcoind_rpc_password', str),
1001 ('rpcport', 'bitcoind_rpc_port', int),
1002 ('port', 'bitcoind_p2p_port', int),
1004 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1005 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1007 if args.bitcoind_rpc_username is None:
1008 args.bitcoind_rpc_username = ''
1010 if args.bitcoind_rpc_port is None:
1011 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1013 if args.bitcoind_p2p_port is None:
1014 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1016 if args.p2pool_port is None:
1017 args.p2pool_port = net.P2P_PORT
1019 if args.worker_port is None:
1020 args.worker_port = net.WORKER_PORT
1022 if args.address is not None:
1024 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1025 except Exception, e:
1026 parser.error('error parsing address: ' + repr(e))
1028 args.pubkey_hash = None
1030 def separate_url(url):
1031 s = urlparse.urlsplit(url)
1032 if '@' not in s.netloc:
1033 parser.error('merged url netloc must contain an "@"')
1034 userpass, new_netloc = s.netloc.rsplit('@', 1)
1035 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1036 merged_urls = map(separate_url, args.merged_urls)
1038 if args.merged_url is not None or args.merged_userpass is not None:
1039 print '--merged-url and --merged-userpass are depreciated! Use --merged http://USER:PASS@HOST:PORT/ instead!'
1040 print 'Pausing 10 seconds...'
1043 if args.merged_url is None or args.merged_userpass is None:
1044 parser.error('must specify both --merged-url and --merged-userpass')
1046 merged_urls = merged_urls + [(args.merged_url, args.merged_userpass)]
1049 if args.logfile is None:
1050 args.logfile = os.path.join(datadir_path, 'log')
1052 logfile = logging.LogFile(args.logfile)
1053 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1054 sys.stdout = logging.AbortPipe(pipe)
1055 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1056 if hasattr(signal, "SIGUSR1"):
1057 def sigusr1(signum, frame):
1058 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1060 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1061 signal.signal(signal.SIGUSR1, sigusr1)
1062 task.LoopingCall(logfile.reopen).start(5)
1064 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls)