1 from __future__ import division
15 from twisted.internet import defer, error, reactor, protocol, task
16 from twisted.web import server, resource
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, graphs
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
29 work = yield bitcoind.rpc_getmemorypool()
30 transactions = [bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']]
31 defer.returnValue(dict(
32 version=work['version'],
33 previous_block_hash=int(work['previousblockhash'], 16),
34 transactions=transactions,
35 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in transactions], 0),
36 subsidy=work['coinbasevalue'],
38 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
39 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
42 @defer.inlineCallbacks
43 def main(args, net, datadir_path):
45 print 'p2pool (version %s)' % (p2pool.__version__,)
51 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 if args.pubkey_hash is None:
76 print 'Getting payout address from bitcoind...'
77 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
78 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
81 print 'Computing payout script from provided address....'
82 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
84 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
87 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
89 my_share_hashes = set()
90 my_doa_share_hashes = set()
92 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
93 shared_share_hashes = set()
94 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
95 known_verified = set()
97 print "Loading shares..."
98 for i, (mode, contents) in enumerate(ss.get_shares()):
100 if contents.hash in tracker.shares:
102 shared_share_hashes.add(contents.hash)
103 contents.time_seen = 0
104 tracker.add(contents)
105 if len(tracker.shares) % 1000 == 0 and tracker.shares:
106 print " %i" % (len(tracker.shares),)
107 elif mode == 'verified_hash':
108 known_verified.add(contents)
110 raise AssertionError()
111 print " ...inserting %i verified shares..." % (len(known_verified),)
112 for h in known_verified:
113 if h not in tracker.shares:
114 ss.forget_verified_share(h)
116 tracker.verified.add(tracker.shares[h])
117 print " ...done loading %i shares!" % (len(tracker.shares),)
119 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
120 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
121 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
123 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
125 pre_current_work = variable.Variable(None)
126 pre_merged_work = variable.Variable(None)
127 # information affecting work that should trigger a long-polling update
128 current_work = variable.Variable(None)
129 # information affecting work that should not trigger a long-polling update
130 current_work2 = variable.Variable(None)
132 requested = expiring_dict.ExpiringDict(300)
134 @defer.inlineCallbacks
135 def set_real_work1():
136 work = yield getwork(bitcoind)
137 current_work2.set(dict(
139 transactions=work['transactions'],
140 merkle_branch=work['merkle_branch'],
141 subsidy=work['subsidy'],
142 clock_offset=time.time() - work['time'],
143 last_update=time.time(),
144 )) # second set first because everything hooks on the first
145 pre_current_work.set(dict(
146 version=work['version'],
147 previous_block=work['previous_block_hash'],
149 coinbaseflags=work['coinbaseflags'],
152 def set_real_work2():
153 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
155 t = dict(pre_current_work.value)
156 t['best_share_hash'] = best
157 t['aux_work'] = pre_merged_work.value
161 for peer2, share_hash in desired:
162 if share_hash not in tracker.tails: # was received in the time tracker.think was running
164 last_request_time, count = requested.get(share_hash, (None, 0))
165 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
167 potential_peers = set()
168 for head in tracker.tails[share_hash]:
169 potential_peers.update(peer_heads.get(head, set()))
170 potential_peers = [peer for peer in potential_peers if peer.connected2]
171 if count == 0 and peer2 is not None and peer2.connected2:
174 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
178 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
182 stops=list(set(tracker.heads) | set(
183 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
186 requested[share_hash] = t, count + 1
187 pre_current_work.changed.watch(lambda _: set_real_work2())
189 print 'Initializing work...'
190 yield set_real_work1()
194 pre_merged_work.changed.watch(lambda _: set_real_work2())
195 ht.updated.watch(set_real_work2)
197 merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
199 @defer.inlineCallbacks
200 def set_merged_work():
202 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
203 pre_merged_work.set(dict(
204 hash=int(auxblock['hash'], 16),
205 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
206 chain_id=auxblock['chainid'],
208 yield deferral.sleep(1)
209 if merged_proxy is not None:
212 @pre_merged_work.changed.watch
213 def _(new_merged_work):
214 print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
216 # setup p2p logic and join p2pool network
218 class Node(p2p.Node):
219 def handle_shares(self, shares, peer):
221 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
225 if share.hash in tracker.shares:
226 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
231 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
235 if shares and peer is not None:
236 peer_heads.setdefault(shares[0].hash, set()).add(peer)
242 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
244 def handle_share_hashes(self, hashes, peer):
247 for share_hash in hashes:
248 if share_hash in tracker.shares:
250 last_request_time, count = requested.get(share_hash, (None, 0))
251 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
253 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
254 get_hashes.append(share_hash)
255 requested[share_hash] = t, count + 1
257 if hashes and peer is not None:
258 peer_heads.setdefault(hashes[0], set()).add(peer)
260 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
262 def handle_get_shares(self, hashes, parents, stops, peer):
263 parents = min(parents, 1000//len(hashes))
266 for share_hash in hashes:
267 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
268 if share.hash in stops:
271 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
272 peer.sendShares(shares)
274 @tracker.verified.added.watch
276 if share.pow_hash <= share.header['bits'].target:
277 if factory.conn.value is not None:
278 factory.conn.value.send_block(block=share.as_block(tracker))
280 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
282 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
284 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
286 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
288 @defer.inlineCallbacks
291 ip, port = x.split(':')
292 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
294 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
297 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
299 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
301 print >>sys.stderr, "error reading addrs"
302 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
305 if addr not in addrs:
306 addrs[addr] = (0, time.time(), time.time())
310 connect_addrs = set()
311 for addr_df in map(parse, args.p2pool_nodes):
313 connect_addrs.add((yield addr_df))
318 best_share_hash_func=lambda: current_work.value['best_share_hash'],
319 port=args.p2pool_port,
322 connect_addrs=connect_addrs,
327 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
328 task.LoopingCall(save_addrs).start(60)
330 # send share when the chain changes to their chain
331 def work_changed(new_work):
332 #print 'Work changed:', new_work
334 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
335 if share.hash in shared_share_hashes:
337 shared_share_hashes.add(share.hash)
340 for peer in p2p_node.peers.itervalues():
341 peer.sendShares([share for share in shares if share.peer is not peer])
343 current_work.changed.watch(work_changed)
346 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
348 if share.hash in tracker.verified.shares:
349 ss.add_verified_hash(share.hash)
350 task.LoopingCall(save_shares).start(60)
355 @defer.inlineCallbacks
359 is_lan, lan_ip = yield ipdiscover.get_local_ip()
361 pm = yield portmapper.get_port_mapper()
362 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
363 except defer.TimeoutError:
367 log.err(None, "UPnP error:")
368 yield deferral.sleep(random.expovariate(1/120))
373 # start listening for workers with a JSON-RPC server
375 print 'Listening for workers on port %i...' % (args.worker_port,)
377 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
378 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
379 vip_pass = f.read().strip('\r\n')
381 vip_pass = '%016x' % (random.randrange(2**64),)
382 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
384 print ' Worker password:', vip_pass, '(only required for generating graphs)'
388 removed_unstales_var = variable.Variable((0, 0, 0))
389 @tracker.verified.removed.watch
391 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
392 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
393 removed_unstales_var.set((
394 removed_unstales_var.value[0] + 1,
395 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
396 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
399 removed_doa_unstales_var = variable.Variable(0)
400 @tracker.verified.removed.watch
402 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
403 removed_doa_unstales.set(removed_doa_unstales.value + 1)
405 def get_stale_counts():
406 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
407 my_shares = len(my_share_hashes)
408 my_doa_shares = len(my_doa_share_hashes)
409 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
410 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
411 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
412 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
413 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
415 my_shares_not_in_chain = my_shares - my_shares_in_chain
416 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
418 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
420 class WorkerBridge(worker_interface.WorkerBridge):
422 worker_interface.WorkerBridge.__init__(self)
423 self.new_work_event = current_work.changed
425 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
426 self.recent_shares_ts_work = []
428 def _get_payout_script_from_username(self, user):
432 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
435 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
437 def preprocess_request(self, request):
438 payout_script = self._get_payout_script_from_username(request.getUser())
439 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
440 payout_script = my_script
441 return payout_script,
443 def get_work(self, payout_script):
444 if len(p2p_node.peers) == 0 and net.PERSIST:
445 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
446 if current_work.value['best_share_hash'] is None and net.PERSIST:
447 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
448 if time.time() > current_work2.value['last_update'] + 60:
449 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
451 share_info, generate_tx = p2pool_data.generate_transaction(
454 previous_share_hash=current_work.value['best_share_hash'],
455 coinbase=(('' if current_work.value['aux_work'] is None else
456 '\xfa\xbemm' + pack.IntType(256, 'big').pack(current_work.value['aux_work']['hash']) + struct.pack('<ii', 1, 0)) + current_work.value['coinbaseflags'])[:100],
457 nonce=struct.pack('<Q', random.randrange(2**64)),
458 new_script=payout_script,
459 subsidy=current_work2.value['subsidy'],
460 donation=math.perfect_round(65535*args.donation_percentage/100),
461 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
462 253 if orphans > orphans_recorded_in_chain else
463 254 if doas > doas_recorded_in_chain else
465 )(*get_stale_counts()),
467 block_target=current_work.value['bits'].target,
468 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
472 target = 2**256//2**32 - 1
473 if len(self.recent_shares_ts_work) == 50:
474 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
475 target = min(target, 2**256//(hash_rate * 5))
476 target = max(target, share_info['bits'].target)
477 if current_work.value['aux_work']:
478 target = max(target, current_work.value['aux_work']['target'])
480 transactions = [generate_tx] + list(current_work2.value['transactions'])
481 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
482 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target, current_work2.value['merkle_branch']
484 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
485 bitcoin_data.target_to_difficulty(target),
486 bitcoin_data.target_to_difficulty(share_info['bits'].target),
487 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
488 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
489 len(current_work2.value['transactions']),
492 return bitcoin_getwork.BlockAttempt(
493 version=current_work.value['version'],
494 previous_block=current_work.value['previous_block'],
495 merkle_root=merkle_root,
496 timestamp=current_work2.value['time'],
497 bits=current_work.value['bits'],
501 def got_response(self, header, request):
502 # match up with transactions
503 if header['merkle_root'] not in self.merkle_root_to_transactions:
504 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
506 share_info, transactions, getwork_time, aux_work, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
508 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
509 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
512 if pow_hash <= header['bits'].target or p2pool.DEBUG:
513 if factory.conn.value is not None:
514 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
516 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
517 if pow_hash <= header['bits'].target:
519 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
521 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
523 log.err(None, 'Error while processing potential block:')
526 if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
527 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
528 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
529 bitcoin_data.aux_pow_type.pack(dict(
532 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
533 merkle_branch=merkle_branch,
538 parent_block_header=header,
543 if result != (pow_hash <= aux_work['target']):
544 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
546 print 'Merged block submittal result: %s' % (result,)
549 log.err(err, 'Error submitting merged block:')
551 log.err(None, 'Error while processing merged mining POW:')
553 if pow_hash <= share_info['bits'].target:
554 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
555 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
557 p2pool_data.format_hash(share.hash),
558 p2pool_data.format_hash(share.previous_hash),
559 time.time() - getwork_time,
560 ' DEAD ON ARRIVAL' if not on_time else '',
562 my_share_hashes.add(share.hash)
564 my_doa_share_hashes.add(share.hash)
565 p2p_node.handle_shares([share], None)
567 if pow_hash <= header['bits'].target:
568 for peer in p2p_node.peers.itervalues():
569 peer.sendShares([share])
570 shared_share_hashes.add(share.hash)
572 log.err(None, 'Error forwarding block solution:')
574 if pow_hash <= target:
575 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
576 if request.getPassword() == vip_pass:
577 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
578 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
579 while len(self.recent_shares_ts_work) > 50:
580 self.recent_shares_ts_work.pop(0)
582 if pow_hash > target:
583 print 'Worker submitted share with hash > target:'
584 print ' Hash: %56x' % (pow_hash,)
585 print ' Target: %56x' % (target,)
589 web_root = resource.Resource()
590 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
593 if tracker.get_height(current_work.value['best_share_hash']) < 720:
594 return json.dumps(None)
595 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
596 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
599 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
600 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
602 for script in sorted(weights, key=lambda s: weights[s]):
603 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
604 return json.dumps(res)
606 def get_current_txouts():
607 share = tracker.shares[current_work.value['best_share_hash']]
608 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
609 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
611 def get_current_scaled_txouts(scale, trunc=0):
612 txouts = get_current_txouts()
613 total = sum(txouts.itervalues())
614 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
618 for s in sorted(results, key=results.__getitem__):
619 total_random += results[s]
621 if total_random >= trunc and results[s] >= trunc:
623 winner = math.weighted_choice((script, results[script]) for script in random_set)
624 for script in random_set:
626 results[winner] = total_random
627 if sum(results.itervalues()) < int(scale):
628 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
631 def get_current_payouts():
632 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
634 def get_patron_sendmany(this):
637 this, trunc = this.split('/', 1)
640 return json.dumps(dict(
641 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
642 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
643 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
646 return json.dumps(None)
648 def get_global_stats():
649 # averaged over last hour
650 lookbehind = 3600//net.SHARE_PERIOD
651 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
654 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
655 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
656 return json.dumps(dict(
657 pool_nonstale_hash_rate=nonstale_hash_rate,
658 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
659 pool_stale_prop=stale_prop,
662 def get_local_stats():
663 lookbehind = 3600//net.SHARE_PERIOD
664 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
667 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
669 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
670 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
671 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
672 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
673 my_stale_count = my_orphan_count + my_doa_count
675 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
677 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
678 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
679 if share.hash in my_share_hashes)
680 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
681 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
682 share_att_s = my_work / actual_time
684 return json.dumps(dict(
685 my_hash_rates_in_last_hour=dict(
686 nonstale=share_att_s,
687 rewarded=share_att_s/(1 - global_stale_prop),
688 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
690 my_share_counts_in_last_hour=dict(
691 shares=my_share_count,
692 unstale_shares=my_unstale_count,
693 stale_shares=my_stale_count,
694 orphan_stale_shares=my_orphan_count,
695 doa_stale_shares=my_doa_count,
697 my_stale_proportions_in_last_hour=dict(
699 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
700 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
704 def get_peer_addresses():
705 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
707 class WebInterface(resource.Resource):
708 def __init__(self, func, mime_type, *fields):
709 self.func, self.mime_type, self.fields = func, mime_type, fields
711 def render_GET(self, request):
712 request.setHeader('Content-Type', self.mime_type)
713 request.setHeader('Access-Control-Allow-Origin', '*')
714 return self.func(*(request.args[field][0] for field in self.fields))
716 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
717 web_root.putChild('users', WebInterface(get_users, 'application/json'))
718 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
719 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
720 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
721 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
722 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
723 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
724 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
725 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
727 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
729 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
730 web_root.putChild('graphs', grapher.get_resource())
732 if tracker.get_height(current_work.value['best_share_hash']) < 720:
734 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
735 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
736 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
737 task.LoopingCall(add_point).start(100)
739 def attempt_listen():
741 reactor.listenTCP(args.worker_port, server.Site(web_root))
742 except error.CannotListenError, e:
743 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
744 reactor.callLater(1, attempt_listen)
746 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
754 @defer.inlineCallbacks
757 flag = factory.new_block.get_deferred()
759 yield set_real_work1()
762 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
767 print 'Started successfully!'
771 if hasattr(signal, 'SIGALRM'):
772 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
773 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
775 signal.siginterrupt(signal.SIGALRM, False)
776 task.LoopingCall(signal.alarm, 30).start(1)
778 if args.irc_announce:
779 from twisted.words.protocols import irc
780 class IRCClient(irc.IRCClient):
782 def lineReceived(self, line):
784 irc.IRCClient.lineReceived(self, line)
786 irc.IRCClient.signedOn(self)
787 self.factory.resetDelay()
789 self.watch_id = tracker.verified.added.watch(self._new_share)
790 self.announced_hashes = set()
791 def _new_share(self, share):
792 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
793 self.announced_hashes.add(share.header_hash)
794 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.new_script, net.PARENT), share.header_hash))
795 def connectionLost(self, reason):
796 tracker.verified.added.unwatch(self.watch_id)
797 class IRCClientFactory(protocol.ReconnectingClientFactory):
799 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
801 @defer.inlineCallbacks
806 yield deferral.sleep(3)
808 if time.time() > current_work2.value['last_update'] + 60:
809 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
810 if current_work.value['best_share_hash'] is not None:
811 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
813 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
814 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
815 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
816 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
817 real_att_s = att_s / (1 - stale_prop)
818 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
819 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
820 math.format(int(real_att_s)),
822 len(tracker.verified.shares),
824 weights.get(my_script, 0)/total_weight*100,
825 math.format(int(my_att_s)),
830 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
831 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
832 this_str += '\nAverage time between blocks: %.2f days' % (
833 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
835 this_str += '\nPool stales: %i%% Own: %s Own efficiency: %s' % (
836 int(100*stale_prop+.5),
837 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
838 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
840 if this_str != last_str or time.time() > last_time + 15:
843 last_time = time.time()
848 log.err(None, 'Fatal error:')
852 class FixedArgumentParser(argparse.ArgumentParser):
853 def _read_args_from_files(self, arg_strings):
854 # expand arguments referencing files
856 for arg_string in arg_strings:
858 # for regular arguments, just add them back into the list
859 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
860 new_arg_strings.append(arg_string)
862 # replace arguments referencing files with the file content
865 args_file = open(arg_string[1:])
868 for arg_line in args_file.read().splitlines():
869 for arg in self.convert_arg_line_to_args(arg_line):
870 arg_strings.append(arg)
871 arg_strings = self._read_args_from_files(arg_strings)
872 new_arg_strings.extend(arg_strings)
876 err = sys.exc_info()[1]
879 # return the modified argument list
880 return new_arg_strings
882 def convert_arg_line_to_args(self, arg_line):
883 return [arg for arg in arg_line.split() if arg.strip()]
885 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
886 parser.add_argument('--version', action='version', version=p2pool.__version__)
887 parser.add_argument('--net',
888 help='use specified network (default: bitcoin)',
889 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
890 parser.add_argument('--testnet',
891 help='''use the network's testnet''',
892 action='store_const', const=True, default=False, dest='testnet')
893 parser.add_argument('--debug',
894 help='enable debugging mode',
895 action='store_const', const=True, default=False, dest='debug')
896 parser.add_argument('-a', '--address',
897 help='generate payouts to this address (default: <address requested from bitcoind>)',
898 type=str, action='store', default=None, dest='address')
899 parser.add_argument('--logfile',
900 help='''log to this file (default: data/<NET>/log)''',
901 type=str, action='store', default=None, dest='logfile')
902 parser.add_argument('--merged-url',
903 help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
904 type=str, action='store', default=None, dest='merged_url')
905 parser.add_argument('--merged-userpass',
906 help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
907 type=str, action='store', default=None, dest='merged_userpass')
908 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
909 help='donate this percentage of work to author of p2pool (default: 0.5)',
910 type=float, action='store', default=0.5, dest='donation_percentage')
911 parser.add_argument('--irc-announce',
912 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
913 action='store_true', default=False, dest='irc_announce')
915 p2pool_group = parser.add_argument_group('p2pool interface')
916 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
917 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
918 type=int, action='store', default=None, dest='p2pool_port')
919 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
920 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
921 type=str, action='append', default=[], dest='p2pool_nodes')
922 parser.add_argument('--disable-upnp',
923 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
924 action='store_false', default=True, dest='upnp')
926 worker_group = parser.add_argument_group('worker interface')
927 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
928 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
929 type=int, action='store', default=None, dest='worker_port')
930 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
931 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
932 type=float, action='store', default=0, dest='worker_fee')
934 bitcoind_group = parser.add_argument_group('bitcoind interface')
935 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
936 help='connect to this address (default: 127.0.0.1)',
937 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
938 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
939 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
940 type=int, action='store', default=None, dest='bitcoind_rpc_port')
941 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
942 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
943 type=int, action='store', default=None, dest='bitcoind_p2p_port')
945 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
946 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
947 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
949 args = parser.parse_args()
954 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
956 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
957 if not os.path.exists(datadir_path):
958 os.makedirs(datadir_path)
960 if len(args.bitcoind_rpc_userpass) > 2:
961 parser.error('a maximum of two arguments are allowed')
962 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
964 if args.bitcoind_rpc_password is None:
965 if not hasattr(net, 'CONF_FILE_FUNC'):
966 parser.error('This network has no configuration file function. Manually enter your RPC password.')
967 conf_path = net.CONF_FILE_FUNC()
968 if not os.path.exists(conf_path):
969 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
970 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
973 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
974 with open(conf_path, 'rb') as f:
975 cp = ConfigParser.RawConfigParser()
976 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
977 for conf_name, var_name, var_type in [
978 ('rpcuser', 'bitcoind_rpc_username', str),
979 ('rpcpassword', 'bitcoind_rpc_password', str),
980 ('rpcport', 'bitcoind_rpc_port', int),
981 ('port', 'bitcoind_p2p_port', int),
983 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
984 setattr(args, var_name, var_type(cp.get('x', conf_name)))
986 if args.bitcoind_rpc_username is None:
987 args.bitcoind_rpc_username = ''
989 if args.bitcoind_rpc_port is None:
990 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
992 if args.bitcoind_p2p_port is None:
993 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
995 if args.p2pool_port is None:
996 args.p2pool_port = net.P2P_PORT
998 if args.worker_port is None:
999 args.worker_port = net.WORKER_PORT
1001 if args.address is not None:
1003 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1004 except Exception, e:
1005 parser.error('error parsing address: ' + repr(e))
1007 args.pubkey_hash = None
1009 if (args.merged_url is None) ^ (args.merged_userpass is None):
1010 parser.error('must specify --merged-url and --merged-userpass')
1013 if args.logfile is None:
1014 args.logfile = os.path.join(datadir_path, 'log')
1016 logfile = logging.LogFile(args.logfile)
1017 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1018 sys.stdout = logging.AbortPipe(pipe)
1019 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1020 if hasattr(signal, "SIGUSR1"):
1021 def sigusr1(signum, frame):
1022 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1024 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1025 signal.signal(signal.SIGUSR1, sigusr1)
1026 task.LoopingCall(logfile.reopen).start(5)
1028 reactor.callWhenRunning(main, args, net, datadir_path)