4 from __future__ import division
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, graphs
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32 work = yield bitcoind.rpc_getmemorypool()
33 defer.returnValue(dict(
34 version=work['version'],
35 previous_block_hash=int(work['previousblockhash'], 16),
36 transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
43 @defer.inlineCallbacks
44 def main(args, net, datadir_path):
46 my_share_hashes = set()
47 my_doa_share_hashes = set()
48 p2pool_data.OkayTrackerDelta.my_share_hashes = my_share_hashes
49 p2pool_data.OkayTrackerDelta.my_doa_share_hashes = my_doa_share_hashes
51 print 'p2pool (version %s)' % (p2pool.__version__,)
57 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
60 # connect to bitcoind over JSON-RPC and do initial getmemorypool
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
68 temp_work = yield getwork(bitcoind)
70 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
73 # connect to bitcoind over bitcoin-p2p
74 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75 factory = bitcoin_p2p.ClientFactory(net.PARENT)
76 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77 yield factory.getProtocol() # waits until handshake is successful
81 if args.pubkey_hash is None:
82 print 'Getting payout address from bitcoind...'
83 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
84 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
87 print 'Computing payout script from provided address....'
88 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
90 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
93 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
95 tracker = p2pool_data.OkayTracker(net)
96 shared_share_hashes = set()
97 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
98 known_verified = set()
100 print "Loading shares..."
101 for i, (mode, contents) in enumerate(ss.get_shares()):
103 if contents.hash in tracker.shares:
105 shared_share_hashes.add(contents.hash)
106 contents.time_seen = 0
107 tracker.add(contents)
108 if len(tracker.shares) % 1000 == 0 and tracker.shares:
109 print " %i" % (len(tracker.shares),)
110 elif mode == 'verified_hash':
111 known_verified.add(contents)
113 raise AssertionError()
114 print " ...inserting %i verified shares..." % (len(known_verified),)
115 for h in known_verified:
116 if h not in tracker.shares:
117 ss.forget_verified_share(h)
119 tracker.verified.add(tracker.shares[h])
120 print " ...done loading %i shares!" % (len(tracker.shares),)
122 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
123 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
124 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
126 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
128 pre_current_work = variable.Variable(None)
129 pre_merged_work = variable.Variable(None)
130 # information affecting work that should trigger a long-polling update
131 current_work = variable.Variable(None)
132 # information affecting work that should not trigger a long-polling update
133 current_work2 = variable.Variable(None)
135 requested = expiring_dict.ExpiringDict(300)
137 @defer.inlineCallbacks
138 def set_real_work1():
139 work = yield getwork(bitcoind)
140 current_work2.set(dict(
142 transactions=work['transactions'],
143 subsidy=work['subsidy'],
144 clock_offset=time.time() - work['time'],
145 last_update=time.time(),
146 )) # second set first because everything hooks on the first
147 pre_current_work.set(dict(
148 version=work['version'],
149 previous_block=work['previous_block_hash'],
151 coinbaseflags=work['coinbaseflags'],
154 def set_real_work2():
155 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
157 t = dict(pre_current_work.value)
158 t['best_share_hash'] = best
159 t['aux_work'] = pre_merged_work.value
163 for peer2, share_hash in desired:
164 if share_hash not in tracker.tails: # was received in the time tracker.think was running
166 last_request_time, count = requested.get(share_hash, (None, 0))
167 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
169 potential_peers = set()
170 for head in tracker.tails[share_hash]:
171 potential_peers.update(peer_heads.get(head, set()))
172 potential_peers = [peer for peer in potential_peers if peer.connected2]
173 if count == 0 and peer2 is not None and peer2.connected2:
176 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
180 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
184 stops=list(set(tracker.heads) | set(
185 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
188 requested[share_hash] = t, count + 1
189 pre_current_work.changed.watch(lambda _: set_real_work2())
191 print 'Initializing work...'
192 yield set_real_work1()
196 pre_merged_work.changed.watch(lambda _: set_real_work2())
197 ht.updated.watch(set_real_work2)
199 merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
201 @defer.inlineCallbacks
202 def set_merged_work():
204 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
205 pre_merged_work.set(dict(
206 hash=int(auxblock['hash'], 16),
207 target=bitcoin_data.IntType(256).unpack(auxblock['target'].decode('hex')),
208 chain_id=auxblock['chainid'],
210 yield deferral.sleep(1)
211 if merged_proxy is not None:
214 @pre_merged_work.changed.watch
215 def _(new_merged_work):
216 print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
218 # setup p2p logic and join p2pool network
220 class Node(p2p.Node):
221 def handle_shares(self, shares, peer):
223 print 'Processing %i shares...' % (len(shares),)
227 if share.hash in tracker.shares:
228 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
233 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
237 if shares and peer is not None:
238 peer_heads.setdefault(shares[0].hash, set()).add(peer)
244 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
246 def handle_share_hashes(self, hashes, peer):
249 for share_hash in hashes:
250 if share_hash in tracker.shares:
252 last_request_time, count = requested.get(share_hash, (None, 0))
253 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
255 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
256 get_hashes.append(share_hash)
257 requested[share_hash] = t, count + 1
259 if hashes and peer is not None:
260 peer_heads.setdefault(hashes[0], set()).add(peer)
262 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
264 def handle_get_shares(self, hashes, parents, stops, peer):
265 parents = min(parents, 1000//len(hashes))
268 for share_hash in hashes:
269 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
270 if share.hash in stops:
273 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
274 peer.sendShares(shares)
276 @tracker.verified.added.watch
278 if share.pow_hash <= share.header['bits'].target:
279 if factory.conn.value is not None:
280 factory.conn.value.send_block(block=share.as_block(tracker))
282 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
284 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
286 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
288 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
290 @defer.inlineCallbacks
293 ip, port = x.split(':')
294 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
296 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
299 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
301 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
303 print >>sys.stderr, "error reading addrs"
304 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
307 if addr not in addrs:
308 addrs[addr] = (0, time.time(), time.time())
312 connect_addrs = set()
313 for addr_df in map(parse, args.p2pool_nodes):
315 connect_addrs.add((yield addr_df))
320 best_share_hash_func=lambda: current_work.value['best_share_hash'],
321 port=args.p2pool_port,
324 connect_addrs=connect_addrs,
329 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
330 task.LoopingCall(save_addrs).start(60)
332 # send share when the chain changes to their chain
333 def work_changed(new_work):
334 #print 'Work changed:', new_work
336 for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
337 if share.hash in shared_share_hashes:
339 shared_share_hashes.add(share.hash)
342 for peer in p2p_node.peers.itervalues():
343 peer.sendShares([share for share in shares if share.peer is not peer])
345 current_work.changed.watch(work_changed)
348 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
350 if share.hash in tracker.verified.shares:
351 ss.add_verified_hash(share.hash)
352 task.LoopingCall(save_shares).start(60)
357 @defer.inlineCallbacks
361 is_lan, lan_ip = yield ipdiscover.get_local_ip()
363 pm = yield portmapper.get_port_mapper()
364 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
365 except defer.TimeoutError:
369 log.err(None, "UPnP error:")
370 yield deferral.sleep(random.expovariate(1/120))
375 # start listening for workers with a JSON-RPC server
377 print 'Listening for workers on port %i...' % (args.worker_port,)
379 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
380 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
381 vip_pass = f.read().strip('\r\n')
383 vip_pass = '%016x' % (random.randrange(2**64),)
384 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
386 print ' Worker password:', vip_pass, '(only required for generating graphs)'
390 removed_unstales_var = variable.Variable((0, 0, 0))
391 @tracker.verified.removed.watch
393 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
394 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
395 removed_unstales_var.set((
396 removed_unstales_var.value[0] + 1,
397 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
398 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
401 removed_doa_unstales_var = variable.Variable(0)
402 @tracker.verified.removed.watch
404 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
405 removed_doa_unstales.set(removed_doa_unstales.value + 1)
407 def get_stale_counts():
408 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
409 my_shares = len(my_share_hashes)
410 my_doa_shares = len(my_doa_share_hashes)
411 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
412 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
413 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
414 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
415 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
417 my_shares_not_in_chain = my_shares - my_shares_in_chain
418 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
420 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
422 class WorkerBridge(worker_interface.WorkerBridge):
424 worker_interface.WorkerBridge.__init__(self)
425 self.new_work_event = current_work.changed
427 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
428 self.recent_shares_ts_work = []
430 def _get_payout_script_from_username(self, user):
434 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
437 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
439 def preprocess_request(self, request):
440 payout_script = self._get_payout_script_from_username(request.getUser())
441 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
442 payout_script = my_script
443 return payout_script,
445 def get_work(self, payout_script):
446 if len(p2p_node.peers) == 0 and net.PERSIST:
447 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
448 if current_work.value['best_share_hash'] is None and net.PERSIST:
449 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
450 if time.time() > current_work2.value['last_update'] + 60:
451 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
453 share_info, generate_tx = p2pool_data.generate_transaction(
456 previous_share_hash=current_work.value['best_share_hash'],
457 coinbase=(('' if current_work.value['aux_work'] is None else
458 '\xfa\xbemm' + bitcoin_data.IntType(256, 'big').pack(current_work.value['aux_work']['hash']) + struct.pack('<ii', 1, 0)) + current_work.value['coinbaseflags'])[:100],
459 nonce=struct.pack('<Q', random.randrange(2**64)),
460 new_script=payout_script,
461 subsidy=current_work2.value['subsidy'],
462 donation=math.perfect_round(65535*args.donation_percentage/100),
463 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
464 253 if orphans > orphans_recorded_in_chain else
465 254 if doas > doas_recorded_in_chain else
467 )(*get_stale_counts()),
469 block_target=current_work.value['bits'].target,
470 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
474 target = 2**256//2**32 - 1
475 if len(self.recent_shares_ts_work) == 50:
476 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
477 target = min(target, 2**256//(hash_rate * 5))
478 target = max(target, share_info['bits'].target)
479 if current_work.value['aux_work']:
480 target = max(target, current_work.value['aux_work']['target'])
482 transactions = [generate_tx] + list(current_work2.value['transactions'])
483 merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
484 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target
486 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
487 bitcoin_data.target_to_difficulty(target),
488 bitcoin_data.target_to_difficulty(share_info['bits'].target),
489 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
490 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
491 len(current_work2.value['transactions']),
494 return bitcoin_getwork.BlockAttempt(
495 version=current_work.value['version'],
496 previous_block=current_work.value['previous_block'],
497 merkle_root=merkle_root,
498 timestamp=current_work2.value['time'],
499 bits=current_work.value['bits'],
503 def got_response(self, header, request):
504 # match up with transactions
505 if header['merkle_root'] not in self.merkle_root_to_transactions:
506 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
508 share_info, transactions, getwork_time, aux_work, target = self.merkle_root_to_transactions[header['merkle_root']]
510 pow_hash = net.PARENT.POW_FUNC(header)
511 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
514 if pow_hash <= header['bits'].target or p2pool.DEBUG:
515 if factory.conn.value is not None:
516 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
518 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
519 if pow_hash <= header['bits'].target:
521 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.block_header_type.hash256(header),)
523 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.block_header_type.hash256(header),) })
525 log.err(None, 'Error while processing potential block:')
528 if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
529 assert bitcoin_data.IntType(256, 'big').pack(aux_work['hash']).encode('hex') == transactions[0]['tx_ins'][0]['script'][4:4+32].encode('hex')
530 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
531 bitcoin_data.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
532 bitcoin_data.aux_pow_type.pack(dict(
535 block_hash=bitcoin_data.block_header_type.hash256(header),
536 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
541 parent_block_header=header,
546 if result != (pow_hash <= aux_work['target']):
547 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
549 print 'Merged block submittal result: %s' % (result,)
552 log.err(err, 'Error submitting merged block:')
554 log.err(None, 'Error while processing merged mining POW:')
556 if pow_hash <= share_info['bits'].target:
557 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
558 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
560 p2pool_data.format_hash(share.hash),
561 p2pool_data.format_hash(share.previous_hash),
562 time.time() - getwork_time,
563 ' DEAD ON ARRIVAL' if not on_time else '',
565 my_share_hashes.add(share.hash)
567 my_doa_share_hashes.add(share.hash)
568 p2p_node.handle_shares([share], None)
570 if pow_hash <= header['bits'].target:
571 for peer in p2p_node.peers.itervalues():
572 peer.sendShares([share])
573 shared_share_hashes.add(share.hash)
575 log.err(None, 'Error forwarding block solution:')
577 if pow_hash <= target:
578 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
579 if request.getPassword() == vip_pass:
580 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
581 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
582 while len(self.recent_shares_ts_work) > 50:
583 self.recent_shares_ts_work.pop(0)
585 if pow_hash > target:
586 print 'Worker submitted share with hash > target:'
587 print ' Hash: %56x' % (pow_hash,)
588 print ' Target: %56x' % (target,)
592 web_root = resource.Resource()
593 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
596 if tracker.get_height(current_work.value['best_share_hash']) < 720:
597 return json.dumps(None)
598 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
599 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
602 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
603 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
605 for script in sorted(weights, key=lambda s: weights[s]):
606 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
607 return json.dumps(res)
609 def get_current_txouts():
611 tmp_tag = str(random.randrange(2**64))
612 outputs = wb.merkle_root_to_transactions[wb.get_work(tmp_tag).merkle_root][1][0]['tx_outs']
613 total = sum(out['value'] for out in outputs)
614 total_without_tag = sum(out['value'] for out in outputs if out['script'] != tmp_tag)
615 total_diff = total - total_without_tag
616 return dict((out['script'], out['value'] + math.perfect_round(out['value']*total_diff/total)) for out in outputs if out['script'] != tmp_tag and out['value'])
618 def get_current_scaled_txouts(scale, trunc=0):
619 txouts = get_current_txouts()
620 total = sum(txouts.itervalues())
621 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
625 for s in sorted(results, key=results.__getitem__):
626 total_random += results[s]
628 if total_random >= trunc and results[s] >= trunc:
630 winner = math.weighted_choice((script, results[script]) for script in random_set)
631 for script in random_set:
633 results[winner] = total_random
634 if sum(results.itervalues()) < int(scale):
635 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
638 def get_current_payouts():
639 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
641 def get_patron_sendmany(this):
644 this, trunc = this.split('/', 1)
647 return json.dumps(dict(
648 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
649 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
650 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
653 return json.dumps(None)
655 def get_global_stats():
656 # averaged over last hour
657 lookbehind = 3600//net.SHARE_PERIOD
658 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
661 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
662 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
663 return json.dumps(dict(
664 pool_nonstale_hash_rate=nonstale_hash_rate,
665 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
666 pool_stale_prop=stale_prop,
669 def get_local_stats():
670 lookbehind = 3600//net.SHARE_PERIOD
671 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
674 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
676 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
677 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
678 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
679 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
680 my_stale_count = my_orphan_count + my_doa_count
682 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
684 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
685 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
686 if share.hash in my_share_hashes)
687 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
688 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
689 share_att_s = my_work / actual_time
691 return json.dumps(dict(
692 my_hash_rates_in_last_hour=dict(
693 nonstale=share_att_s,
694 rewarded=share_att_s/(1 - global_stale_prop),
695 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
697 my_share_counts_in_last_hour=dict(
698 shares=my_share_count,
699 unstale_shares=my_unstale_count,
700 stale_shares=my_stale_count,
701 orphan_stale_shares=my_orphan_count,
702 doa_stale_shares=my_doa_count,
704 my_stale_proportions_in_last_hour=dict(
706 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
707 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
711 def get_peer_addresses():
712 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
714 class WebInterface(resource.Resource):
715 def __init__(self, func, mime_type, *fields):
716 self.func, self.mime_type, self.fields = func, mime_type, fields
718 def render_GET(self, request):
719 request.setHeader('Content-Type', self.mime_type)
720 request.setHeader('Access-Control-Allow-Origin', '*')
721 return self.func(*(request.args[field][0] for field in self.fields))
723 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
724 web_root.putChild('users', WebInterface(get_users, 'application/json'))
725 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
726 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
727 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
728 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
729 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
730 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
731 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
732 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
734 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
736 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
737 web_root.putChild('graphs', grapher.get_resource())
739 if tracker.get_height(current_work.value['best_share_hash']) < 720:
741 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
742 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
743 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
744 task.LoopingCall(add_point).start(100)
746 reactor.listenTCP(args.worker_port, server.Site(web_root))
752 @defer.inlineCallbacks
755 flag = factory.new_block.get_deferred()
757 yield set_real_work1()
760 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
765 print 'Started successfully!'
769 if hasattr(signal, 'SIGALRM'):
770 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
771 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
773 signal.siginterrupt(signal.SIGALRM, False)
774 task.LoopingCall(signal.alarm, 30).start(1)
776 if args.irc_announce:
777 from twisted.words.protocols import irc
778 class IRCClient(irc.IRCClient):
780 def lineReceived(self, line):
782 irc.IRCClient.lineReceived(self, line)
784 irc.IRCClient.signedOn(self)
785 self.factory.resetDelay()
787 self.watch_id = current_work.changed.watch(self._work_changed)
788 self.announced_hashes = set()
789 def _work_changed(self, new_work):
790 share = tracker.shares[new_work['best_share_hash']]
791 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
792 self.say('#p2pool', '\x033,4BLOCK FOUND! http://blockexplorer.com/block/' + bitcoin_data.IntType(256, 'big').pack(share.header_hash).encode('hex'))
793 def connectionLost(self, reason):
794 current_work.changed.unwatch(self.watch_id)
795 class IRCClientFactory(protocol.ReconnectingClientFactory):
797 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
799 @defer.inlineCallbacks
804 yield deferral.sleep(3)
806 if time.time() > current_work2.value['last_update'] + 60:
807 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
808 if current_work.value['best_share_hash'] is not None:
809 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
811 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
812 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
813 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
814 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
815 real_att_s = att_s / (1 - stale_prop)
816 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
817 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
818 math.format(int(real_att_s)),
820 len(tracker.verified.shares),
822 weights.get(my_script, 0)/total_weight*100,
823 math.format(int(my_att_s)),
828 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
829 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
830 this_str += '\nAverage time between blocks: %.2f days' % (
831 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
833 this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
834 stale_center, stale_radius = math.binomial_conf_center_radius(stale_orphan_shares + stale_doa_shares, shares, 0.95)
835 this_str += u' Own: %i±%i%%' % (int(100*stale_center+.5), int(100*stale_radius+.5))
836 this_str += u' Own efficiency: %i±%i%%' % (int(100*(1 - stale_center)/(1 - stale_prop)+.5), int(100*stale_radius/(1 - stale_prop)+.5))
837 if this_str != last_str or time.time() > last_time + 15:
840 last_time = time.time()
845 log.err(None, 'Fatal error:')
848 class FixedArgumentParser(argparse.ArgumentParser):
849 def _read_args_from_files(self, arg_strings):
850 # expand arguments referencing files
852 for arg_string in arg_strings:
854 # for regular arguments, just add them back into the list
855 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
856 new_arg_strings.append(arg_string)
858 # replace arguments referencing files with the file content
861 args_file = open(arg_string[1:])
864 for arg_line in args_file.read().splitlines():
865 for arg in self.convert_arg_line_to_args(arg_line):
866 arg_strings.append(arg)
867 arg_strings = self._read_args_from_files(arg_strings)
868 new_arg_strings.extend(arg_strings)
872 err = sys.exc_info()[1]
875 # return the modified argument list
876 return new_arg_strings
878 def convert_arg_line_to_args(self, arg_line):
879 return [arg for arg in arg_line.split() if arg.strip()]
881 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
882 parser.add_argument('--version', action='version', version=p2pool.__version__)
883 parser.add_argument('--net',
884 help='use specified network (default: bitcoin)',
885 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
886 parser.add_argument('--testnet',
887 help='''use the network's testnet''',
888 action='store_const', const=True, default=False, dest='testnet')
889 parser.add_argument('--debug',
890 help='enable debugging mode',
891 action='store_const', const=True, default=False, dest='debug')
892 parser.add_argument('-a', '--address',
893 help='generate payouts to this address (default: <address requested from bitcoind>)',
894 type=str, action='store', default=None, dest='address')
895 parser.add_argument('--logfile',
896 help='''log to this file (default: data/<NET>/log)''',
897 type=str, action='store', default=None, dest='logfile')
898 parser.add_argument('--merged-url',
899 help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
900 type=str, action='store', default=None, dest='merged_url')
901 parser.add_argument('--merged-userpass',
902 help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
903 type=str, action='store', default=None, dest='merged_userpass')
904 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
905 help='donate this percentage of work to author of p2pool (default: 0.5)',
906 type=float, action='store', default=0.5, dest='donation_percentage')
907 parser.add_argument('--irc-announce',
908 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
909 action='store_true', default=False, dest='irc_announce')
911 p2pool_group = parser.add_argument_group('p2pool interface')
912 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
913 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
914 type=int, action='store', default=None, dest='p2pool_port')
915 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
916 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
917 type=str, action='append', default=[], dest='p2pool_nodes')
918 parser.add_argument('--disable-upnp',
919 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
920 action='store_false', default=True, dest='upnp')
922 worker_group = parser.add_argument_group('worker interface')
923 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
924 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
925 type=int, action='store', default=None, dest='worker_port')
926 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
927 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
928 type=float, action='store', default=0, dest='worker_fee')
930 bitcoind_group = parser.add_argument_group('bitcoind interface')
931 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
932 help='connect to this address (default: 127.0.0.1)',
933 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
934 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
935 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
936 type=int, action='store', default=None, dest='bitcoind_rpc_port')
937 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
938 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
939 type=int, action='store', default=None, dest='bitcoind_p2p_port')
941 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
942 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
943 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
945 args = parser.parse_args()
950 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
952 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
953 if not os.path.exists(datadir_path):
954 os.makedirs(datadir_path)
956 if len(args.bitcoind_rpc_userpass) > 2:
957 parser.error('a maximum of two arguments are allowed')
958 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
960 if args.bitcoind_rpc_password is None:
961 if not hasattr(net, 'CONF_FILE_FUNC'):
962 parser.error('This network has no configuration file function. Manually enter your RPC password.')
963 conf_path = net.CONF_FILE_FUNC()
964 if not os.path.exists(conf_path):
965 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
966 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
968 '''server=true\r\n'''
969 '''rpcpassword=%x # (randomly generated for your convenience)''' % (conf_path, random.randrange(2**128)))
970 with open(conf_path, 'rb') as f:
971 cp = ConfigParser.RawConfigParser()
972 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
973 for conf_name, var_name, var_type in [
974 ('rpcuser', 'bitcoind_rpc_username', str),
975 ('rpcpassword', 'bitcoind_rpc_password', str),
976 ('rpcport', 'bitcoind_rpc_port', int),
977 ('port', 'bitcoind_p2p_port', int),
979 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
980 setattr(args, var_name, var_type(cp.get('x', conf_name)))
982 if args.bitcoind_rpc_username is None:
983 args.bitcoind_rpc_username = ''
985 if args.bitcoind_rpc_port is None:
986 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
988 if args.bitcoind_p2p_port is None:
989 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
991 if args.p2pool_port is None:
992 args.p2pool_port = net.P2P_PORT
994 if args.worker_port is None:
995 args.worker_port = net.WORKER_PORT
997 if args.address is not None:
999 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1000 except Exception, e:
1001 parser.error('error parsing address: ' + repr(e))
1003 args.pubkey_hash = None
1005 if (args.merged_url is None) ^ (args.merged_userpass is None):
1006 parser.error('must specify --merged-url and --merged-userpass')
1009 if args.logfile is None:
1010 args.logfile = os.path.join(datadir_path, 'log')
1012 logfile = logging.LogFile(args.logfile)
1013 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1014 sys.stdout = logging.AbortPipe(pipe)
1015 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1016 if hasattr(signal, "SIGUSR1"):
1017 def sigusr1(signum, frame):
1018 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1020 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1021 signal.signal(signal.SIGUSR1, sigusr1)
1022 task.LoopingCall(logfile.reopen).start(5)
1024 reactor.callWhenRunning(main, args, net, datadir_path)