1 from __future__ import division
16 from twisted.internet import defer, error, reactor, protocol, task
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface
23 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
24 from . import p2p, networks, graphs
25 import p2pool, p2pool.data as p2pool_data
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 transactions = [bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']]
32 defer.returnValue(dict(
33 version=work['version'],
34 previous_block_hash=int(work['previousblockhash'], 16),
35 transactions=transactions,
36 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in transactions], 0),
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
43 @defer.inlineCallbacks
44 def main(args, net, datadir_path, merged_urls):
46 print 'p2pool (version %s)' % (p2pool.__version__,)
52 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
55 # connect to bitcoind over JSON-RPC and do initial getmemorypool
56 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
57 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
58 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
59 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
61 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 temp_work = yield getwork(bitcoind)
65 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
68 # connect to bitcoind over bitcoin-p2p
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin_p2p.ClientFactory(net.PARENT)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 yield factory.getProtocol() # waits until handshake is successful
76 if args.pubkey_hash is None:
77 print 'Getting payout address from bitcoind...'
78 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
79 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
82 print 'Computing payout script from provided address....'
83 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
85 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
88 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
90 my_share_hashes = set()
91 my_doa_share_hashes = set()
93 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
94 shared_share_hashes = set()
95 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
96 known_verified = set()
98 print "Loading shares..."
99 for i, (mode, contents) in enumerate(ss.get_shares()):
101 if contents.hash in tracker.shares:
103 shared_share_hashes.add(contents.hash)
104 contents.time_seen = 0
105 tracker.add(contents)
106 if len(tracker.shares) % 1000 == 0 and tracker.shares:
107 print " %i" % (len(tracker.shares),)
108 elif mode == 'verified_hash':
109 known_verified.add(contents)
111 raise AssertionError()
112 print " ...inserting %i verified shares..." % (len(known_verified),)
113 for h in known_verified:
114 if h not in tracker.shares:
115 ss.forget_verified_share(h)
117 tracker.verified.add(tracker.shares[h])
118 print " ...done loading %i shares!" % (len(tracker.shares),)
120 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
121 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
122 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
124 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
126 pre_current_work = variable.Variable(None)
127 pre_merged_work = variable.Variable({})
128 # information affecting work that should trigger a long-polling update
129 current_work = variable.Variable(None)
130 # information affecting work that should not trigger a long-polling update
131 current_work2 = variable.Variable(None)
133 requested = expiring_dict.ExpiringDict(300)
135 @defer.inlineCallbacks
136 def set_real_work1():
137 work = yield getwork(bitcoind)
138 current_work2.set(dict(
140 transactions=work['transactions'],
141 merkle_branch=work['merkle_branch'],
142 subsidy=work['subsidy'],
143 clock_offset=time.time() - work['time'],
144 last_update=time.time(),
145 )) # second set first because everything hooks on the first
146 pre_current_work.set(dict(
147 version=work['version'],
148 previous_block=work['previous_block_hash'],
150 coinbaseflags=work['coinbaseflags'],
153 def set_real_work2():
154 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
156 t = dict(pre_current_work.value)
157 t['best_share_hash'] = best
158 t['mm_chains'] = pre_merged_work.value
162 for peer2, share_hash in desired:
163 if share_hash not in tracker.tails: # was received in the time tracker.think was running
165 last_request_time, count = requested.get(share_hash, (None, 0))
166 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
168 potential_peers = set()
169 for head in tracker.tails[share_hash]:
170 potential_peers.update(peer_heads.get(head, set()))
171 potential_peers = [peer for peer in potential_peers if peer.connected2]
172 if count == 0 and peer2 is not None and peer2.connected2:
175 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
179 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
183 stops=list(set(tracker.heads) | set(
184 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
187 requested[share_hash] = t, count + 1
188 pre_current_work.changed.watch(lambda _: set_real_work2())
190 print 'Initializing work...'
191 yield set_real_work1()
195 pre_merged_work.changed.watch(lambda _: set_real_work2())
196 ht.updated.watch(set_real_work2)
199 @defer.inlineCallbacks
200 def set_merged_work(merged_url, merged_userpass):
201 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
203 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
204 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
205 hash=int(auxblock['hash'], 16),
206 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
207 merged_proxy=merged_proxy,
209 yield deferral.sleep(1)
210 for merged_url, merged_userpass in merged_urls:
211 set_merged_work(merged_url, merged_userpass)
213 @pre_merged_work.changed.watch
214 def _(new_merged_work):
215 print 'Got new merged mining work!'
217 # setup p2p logic and join p2pool network
219 class Node(p2p.Node):
220 def handle_shares(self, shares, peer):
222 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
226 if share.hash in tracker.shares:
227 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
232 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
236 if shares and peer is not None:
237 peer_heads.setdefault(shares[0].hash, set()).add(peer)
243 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
245 def handle_share_hashes(self, hashes, peer):
248 for share_hash in hashes:
249 if share_hash in tracker.shares:
251 last_request_time, count = requested.get(share_hash, (None, 0))
252 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
254 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
255 get_hashes.append(share_hash)
256 requested[share_hash] = t, count + 1
258 if hashes and peer is not None:
259 peer_heads.setdefault(hashes[0], set()).add(peer)
261 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
263 def handle_get_shares(self, hashes, parents, stops, peer):
264 parents = min(parents, 1000//len(hashes))
267 for share_hash in hashes:
268 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
269 if share.hash in stops:
272 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
273 peer.sendShares(shares)
275 @tracker.verified.added.watch
277 if share.pow_hash <= share.header['bits'].target:
278 if factory.conn.value is not None:
279 factory.conn.value.send_block(block=share.as_block(tracker))
281 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
283 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
285 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
287 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
289 @defer.inlineCallbacks
292 ip, port = x.split(':')
293 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
295 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
298 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
300 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
302 print >>sys.stderr, "error reading addrs"
303 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
306 if addr not in addrs:
307 addrs[addr] = (0, time.time(), time.time())
311 connect_addrs = set()
312 for addr_df in map(parse, args.p2pool_nodes):
314 connect_addrs.add((yield addr_df))
319 best_share_hash_func=lambda: current_work.value['best_share_hash'],
320 port=args.p2pool_port,
323 connect_addrs=connect_addrs,
328 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
329 task.LoopingCall(save_addrs).start(60)
331 # send share when the chain changes to their chain
332 def work_changed(new_work):
333 #print 'Work changed:', new_work
335 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
336 if share.hash in shared_share_hashes:
338 shared_share_hashes.add(share.hash)
341 for peer in p2p_node.peers.itervalues():
342 peer.sendShares([share for share in shares if share.peer is not peer])
344 current_work.changed.watch(work_changed)
347 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
349 if share.hash in tracker.verified.shares:
350 ss.add_verified_hash(share.hash)
351 task.LoopingCall(save_shares).start(60)
356 @defer.inlineCallbacks
360 is_lan, lan_ip = yield ipdiscover.get_local_ip()
362 pm = yield portmapper.get_port_mapper()
363 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
364 except defer.TimeoutError:
368 log.err(None, "UPnP error:")
369 yield deferral.sleep(random.expovariate(1/120))
374 # start listening for workers with a JSON-RPC server
376 print 'Listening for workers on port %i...' % (args.worker_port,)
378 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
379 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
380 vip_pass = f.read().strip('\r\n')
382 vip_pass = '%016x' % (random.randrange(2**64),)
383 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
385 print ' Worker password:', vip_pass, '(only required for generating graphs)'
389 removed_unstales_var = variable.Variable((0, 0, 0))
390 @tracker.verified.removed.watch
392 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
393 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
394 removed_unstales_var.set((
395 removed_unstales_var.value[0] + 1,
396 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
397 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
400 removed_doa_unstales_var = variable.Variable(0)
401 @tracker.verified.removed.watch
403 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
404 removed_doa_unstales.set(removed_doa_unstales.value + 1)
406 def get_stale_counts():
407 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
408 my_shares = len(my_share_hashes)
409 my_doa_shares = len(my_doa_share_hashes)
410 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
411 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
412 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
413 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
414 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
416 my_shares_not_in_chain = my_shares - my_shares_in_chain
417 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
419 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
421 class WorkerBridge(worker_interface.WorkerBridge):
423 worker_interface.WorkerBridge.__init__(self)
424 self.new_work_event = current_work.changed
426 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
427 self.recent_shares_ts_work = []
429 def _get_payout_script_from_username(self, user):
433 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
436 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
438 def preprocess_request(self, request):
439 payout_script = self._get_payout_script_from_username(request.getUser())
440 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
441 payout_script = my_script
442 return payout_script,
444 def get_work(self, payout_script):
445 if len(p2p_node.peers) == 0 and net.PERSIST:
446 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
447 if current_work.value['best_share_hash'] is None and net.PERSIST:
448 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
449 if time.time() > current_work2.value['last_update'] + 60:
450 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
452 if current_work.value['mm_chains']:
453 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
454 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
455 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
456 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
460 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
465 share_info, generate_tx = p2pool_data.generate_transaction(
468 previous_share_hash=current_work.value['best_share_hash'],
469 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
470 nonce=struct.pack('<Q', random.randrange(2**64)),
471 new_script=payout_script,
472 subsidy=current_work2.value['subsidy'],
473 donation=math.perfect_round(65535*args.donation_percentage/100),
474 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
475 253 if orphans > orphans_recorded_in_chain else
476 254 if doas > doas_recorded_in_chain else
478 )(*get_stale_counts()),
480 block_target=current_work.value['bits'].target,
481 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
485 target = 2**256//2**32 - 1
486 if len(self.recent_shares_ts_work) == 50:
487 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
488 target = min(target, 2**256//(hash_rate * 5))
489 target = max(target, share_info['bits'].target)
490 for aux_work in current_work.value['mm_chains'].itervalues():
491 target = max(target, aux_work['target'])
493 transactions = [generate_tx] + list(current_work2.value['transactions'])
494 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
495 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
497 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
498 bitcoin_data.target_to_difficulty(target),
499 bitcoin_data.target_to_difficulty(share_info['bits'].target),
500 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
501 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
502 len(current_work2.value['transactions']),
505 return bitcoin_getwork.BlockAttempt(
506 version=current_work.value['version'],
507 previous_block=current_work.value['previous_block'],
508 merkle_root=merkle_root,
509 timestamp=current_work2.value['time'],
510 bits=current_work.value['bits'],
514 def got_response(self, header, request):
515 # match up with transactions
516 if header['merkle_root'] not in self.merkle_root_to_transactions:
517 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
519 share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
521 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
522 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
525 if pow_hash <= header['bits'].target or p2pool.DEBUG:
526 if factory.conn.value is not None:
527 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
529 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
530 if pow_hash <= header['bits'].target:
532 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
534 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
536 log.err(None, 'Error while processing potential block:')
538 for aux_work, index, hashes in mm_later:
540 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
541 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
542 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
543 bitcoin_data.aux_pow_type.pack(dict(
546 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
547 merkle_branch=merkle_branch,
550 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
552 parent_block_header=header,
557 if result != (pow_hash <= aux_work['target']):
558 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
560 print 'Merged block submittal result: %s' % (result,)
563 log.err(err, 'Error submitting merged block:')
565 log.err(None, 'Error while processing merged mining POW:')
567 if pow_hash <= share_info['bits'].target:
568 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
569 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
571 p2pool_data.format_hash(share.hash),
572 p2pool_data.format_hash(share.previous_hash),
573 time.time() - getwork_time,
574 ' DEAD ON ARRIVAL' if not on_time else '',
576 my_share_hashes.add(share.hash)
578 my_doa_share_hashes.add(share.hash)
582 tracker.verified.add(share)
586 if pow_hash <= header['bits'].target or p2pool.DEBUG:
587 for peer in p2p_node.peers.itervalues():
588 peer.sendShares([share])
589 shared_share_hashes.add(share.hash)
591 log.err(None, 'Error forwarding block solution:')
593 if pow_hash <= target:
594 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
595 if request.getPassword() == vip_pass:
596 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
597 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
598 while len(self.recent_shares_ts_work) > 50:
599 self.recent_shares_ts_work.pop(0)
601 if pow_hash > target:
602 print 'Worker submitted share with hash > target:'
603 print ' Hash: %56x' % (pow_hash,)
604 print ' Target: %56x' % (target,)
608 web_root = resource.Resource()
609 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
612 if tracker.get_height(current_work.value['best_share_hash']) < 720:
613 return json.dumps(None)
614 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
615 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
618 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
619 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
621 for script in sorted(weights, key=lambda s: weights[s]):
622 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
623 return json.dumps(res)
625 def get_current_txouts():
626 share = tracker.shares[current_work.value['best_share_hash']]
627 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
628 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
630 def get_current_scaled_txouts(scale, trunc=0):
631 txouts = get_current_txouts()
632 total = sum(txouts.itervalues())
633 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
637 for s in sorted(results, key=results.__getitem__):
638 total_random += results[s]
640 if total_random >= trunc and results[s] >= trunc:
642 winner = math.weighted_choice((script, results[script]) for script in random_set)
643 for script in random_set:
645 results[winner] = total_random
646 if sum(results.itervalues()) < int(scale):
647 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
650 def get_current_payouts():
651 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
653 def get_patron_sendmany(this):
656 this, trunc = this.split('/', 1)
659 return json.dumps(dict(
660 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
661 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
662 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
665 return json.dumps(None)
667 def get_global_stats():
668 # averaged over last hour
669 lookbehind = 3600//net.SHARE_PERIOD
670 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
673 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
674 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
675 return json.dumps(dict(
676 pool_nonstale_hash_rate=nonstale_hash_rate,
677 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
678 pool_stale_prop=stale_prop,
681 def get_local_stats():
682 lookbehind = 3600//net.SHARE_PERIOD
683 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
686 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
688 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
689 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
690 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
691 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
692 my_stale_count = my_orphan_count + my_doa_count
694 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
696 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
697 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
698 if share.hash in my_share_hashes)
699 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
700 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
701 share_att_s = my_work / actual_time
703 return json.dumps(dict(
704 my_hash_rates_in_last_hour=dict(
705 nonstale=share_att_s,
706 rewarded=share_att_s/(1 - global_stale_prop),
707 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
709 my_share_counts_in_last_hour=dict(
710 shares=my_share_count,
711 unstale_shares=my_unstale_count,
712 stale_shares=my_stale_count,
713 orphan_stale_shares=my_orphan_count,
714 doa_stale_shares=my_doa_count,
716 my_stale_proportions_in_last_hour=dict(
718 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
719 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
723 def get_peer_addresses():
724 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
726 class WebInterface(resource.Resource):
727 def __init__(self, func, mime_type, *fields):
728 self.func, self.mime_type, self.fields = func, mime_type, fields
730 def render_GET(self, request):
731 request.setHeader('Content-Type', self.mime_type)
732 request.setHeader('Access-Control-Allow-Origin', '*')
733 return self.func(*(request.args[field][0] for field in self.fields))
735 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
736 web_root.putChild('users', WebInterface(get_users, 'application/json'))
737 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
738 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
739 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
740 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
741 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
742 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
743 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
744 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
746 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
748 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
749 web_root.putChild('graphs', grapher.get_resource())
751 if tracker.get_height(current_work.value['best_share_hash']) < 720:
753 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
754 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
755 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
756 task.LoopingCall(add_point).start(100)
758 def attempt_listen():
760 reactor.listenTCP(args.worker_port, server.Site(web_root))
761 except error.CannotListenError, e:
762 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
763 reactor.callLater(1, attempt_listen)
765 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
773 @defer.inlineCallbacks
776 flag = factory.new_block.get_deferred()
778 yield set_real_work1()
781 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
786 print 'Started successfully!'
790 if hasattr(signal, 'SIGALRM'):
791 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
792 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
794 signal.siginterrupt(signal.SIGALRM, False)
795 task.LoopingCall(signal.alarm, 30).start(1)
797 if args.irc_announce:
798 from twisted.words.protocols import irc
799 class IRCClient(irc.IRCClient):
801 def lineReceived(self, line):
803 irc.IRCClient.lineReceived(self, line)
805 irc.IRCClient.signedOn(self)
806 self.factory.resetDelay()
808 self.watch_id = tracker.verified.added.watch(self._new_share)
809 self.announced_hashes = set()
810 def _new_share(self, share):
811 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
812 self.announced_hashes.add(share.header_hash)
813 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.new_script, net.PARENT), share.header_hash))
814 def connectionLost(self, reason):
815 tracker.verified.added.unwatch(self.watch_id)
816 class IRCClientFactory(protocol.ReconnectingClientFactory):
818 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
820 @defer.inlineCallbacks
825 yield deferral.sleep(3)
827 if time.time() > current_work2.value['last_update'] + 60:
828 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
829 if current_work.value['best_share_hash'] is not None:
830 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
832 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
833 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
834 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
835 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
836 real_att_s = att_s / (1 - stale_prop)
837 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
838 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
839 math.format(int(real_att_s)),
841 len(tracker.verified.shares),
843 weights.get(my_script, 0)/total_weight*100,
844 math.format(int(my_att_s)),
849 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
850 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
851 this_str += '\nAverage time between blocks: %.2f days' % (
852 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
854 this_str += '\nPool stales: %i%% Own: %s Own efficiency: %s' % (
855 int(100*stale_prop+.5),
856 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
857 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
859 if this_str != last_str or time.time() > last_time + 15:
862 last_time = time.time()
867 log.err(None, 'Fatal error:')
871 class FixedArgumentParser(argparse.ArgumentParser):
872 def _read_args_from_files(self, arg_strings):
873 # expand arguments referencing files
875 for arg_string in arg_strings:
877 # for regular arguments, just add them back into the list
878 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
879 new_arg_strings.append(arg_string)
881 # replace arguments referencing files with the file content
884 args_file = open(arg_string[1:])
887 for arg_line in args_file.read().splitlines():
888 for arg in self.convert_arg_line_to_args(arg_line):
889 arg_strings.append(arg)
890 arg_strings = self._read_args_from_files(arg_strings)
891 new_arg_strings.extend(arg_strings)
895 err = sys.exc_info()[1]
898 # return the modified argument list
899 return new_arg_strings
901 def convert_arg_line_to_args(self, arg_line):
902 return [arg for arg in arg_line.split() if arg.strip()]
904 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
905 parser.add_argument('--version', action='version', version=p2pool.__version__)
906 parser.add_argument('--net',
907 help='use specified network (default: bitcoin)',
908 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
909 parser.add_argument('--testnet',
910 help='''use the network's testnet''',
911 action='store_const', const=True, default=False, dest='testnet')
912 parser.add_argument('--debug',
913 help='enable debugging mode',
914 action='store_const', const=True, default=False, dest='debug')
915 parser.add_argument('-a', '--address',
916 help='generate payouts to this address (default: <address requested from bitcoind>)',
917 type=str, action='store', default=None, dest='address')
918 parser.add_argument('--logfile',
919 help='''log to this file (default: data/<NET>/log)''',
920 type=str, action='store', default=None, dest='logfile')
921 parser.add_argument('--merged',
922 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
923 type=str, action='append', default=[], dest='merged_urls')
924 parser.add_argument('--merged-url',
925 help='DEPRECIATED, use --merged',
926 type=str, action='store', default=None, dest='merged_url')
927 parser.add_argument('--merged-userpass',
928 help='DEPRECIATED, use --merged',
929 type=str, action='store', default=None, dest='merged_userpass')
930 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
931 help='donate this percentage of work to author of p2pool (default: 0.5)',
932 type=float, action='store', default=0.5, dest='donation_percentage')
933 parser.add_argument('--irc-announce',
934 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
935 action='store_true', default=False, dest='irc_announce')
937 p2pool_group = parser.add_argument_group('p2pool interface')
938 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
939 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
940 type=int, action='store', default=None, dest='p2pool_port')
941 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
942 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
943 type=str, action='append', default=[], dest='p2pool_nodes')
944 parser.add_argument('--disable-upnp',
945 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
946 action='store_false', default=True, dest='upnp')
948 worker_group = parser.add_argument_group('worker interface')
949 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
950 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
951 type=int, action='store', default=None, dest='worker_port')
952 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
953 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
954 type=float, action='store', default=0, dest='worker_fee')
956 bitcoind_group = parser.add_argument_group('bitcoind interface')
957 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
958 help='connect to this address (default: 127.0.0.1)',
959 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
960 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
961 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
962 type=int, action='store', default=None, dest='bitcoind_rpc_port')
963 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
964 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
965 type=int, action='store', default=None, dest='bitcoind_p2p_port')
967 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
968 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
969 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
971 args = parser.parse_args()
976 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
978 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
979 if not os.path.exists(datadir_path):
980 os.makedirs(datadir_path)
982 if len(args.bitcoind_rpc_userpass) > 2:
983 parser.error('a maximum of two arguments are allowed')
984 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
986 if args.bitcoind_rpc_password is None:
987 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
988 parser.error('This network has no configuration file function. Manually enter your RPC password.')
989 conf_path = net.PARENT.CONF_FILE_FUNC()
990 if not os.path.exists(conf_path):
991 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
992 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
995 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
996 with open(conf_path, 'rb') as f:
997 cp = ConfigParser.RawConfigParser()
998 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
999 for conf_name, var_name, var_type in [
1000 ('rpcuser', 'bitcoind_rpc_username', str),
1001 ('rpcpassword', 'bitcoind_rpc_password', str),
1002 ('rpcport', 'bitcoind_rpc_port', int),
1003 ('port', 'bitcoind_p2p_port', int),
1005 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1006 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1008 if args.bitcoind_rpc_username is None:
1009 args.bitcoind_rpc_username = ''
1011 if args.bitcoind_rpc_port is None:
1012 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1014 if args.bitcoind_p2p_port is None:
1015 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1017 if args.p2pool_port is None:
1018 args.p2pool_port = net.P2P_PORT
1020 if args.worker_port is None:
1021 args.worker_port = net.WORKER_PORT
1023 if args.address is not None:
1025 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1026 except Exception, e:
1027 parser.error('error parsing address: ' + repr(e))
1029 args.pubkey_hash = None
1031 def separate_url(url):
1032 s = urlparse.urlsplit(url)
1033 if '@' not in s.netloc:
1034 parser.error('merged url netloc must contain an "@"')
1035 userpass, new_netloc = s.netloc.rsplit('@', 1)
1036 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1037 merged_urls = map(separate_url, args.merged_urls)
1039 if args.merged_url is not None or args.merged_userpass is not None:
1040 print '--merged-url and --merged-userpass are depreciated! Use --merged http://USER:PASS@HOST:PORT/ instead!'
1041 print 'Pausing 10 seconds...'
1044 if args.merged_url is None or args.merged_userpass is None:
1045 parser.error('must specify both --merged-url and --merged-userpass')
1047 merged_urls = merged_urls + [(args.merged_url, args.merged_userpass)]
1050 if args.logfile is None:
1051 args.logfile = os.path.join(datadir_path, 'log')
1053 logfile = logging.LogFile(args.logfile)
1054 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1055 sys.stdout = logging.AbortPipe(pipe)
1056 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1057 if hasattr(signal, "SIGUSR1"):
1058 def sigusr1(signum, frame):
1059 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1061 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1062 signal.signal(signal.SIGUSR1, sigusr1)
1063 task.LoopingCall(logfile.reopen).start(5)
1065 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls)