4 from __future__ import division
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, networks, graphs
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32 work = yield bitcoind.rpc_getmemorypool()
33 defer.returnValue(dict(
34 version=work['version'],
35 previous_block_hash=int(work['previousblockhash'], 16),
36 transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
43 @defer.inlineCallbacks
44 def main(args, net, datadir_path):
46 my_share_hashes = set()
47 my_doa_share_hashes = set()
48 p2pool_data.OkayTrackerDelta.my_share_hashes = my_share_hashes
49 p2pool_data.OkayTrackerDelta.my_doa_share_hashes = my_doa_share_hashes
51 print 'p2pool (version %s)' % (p2pool.__version__,)
57 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
60 # connect to bitcoind over JSON-RPC and do initial getmemorypool
61 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
63 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
68 temp_work = yield getwork(bitcoind)
70 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
73 # connect to bitcoind over bitcoin-p2p
74 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75 factory = bitcoin_p2p.ClientFactory(net.PARENT)
76 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77 yield factory.getProtocol() # waits until handshake is successful
81 if args.pubkey_hash is None:
82 print 'Getting payout address from bitcoind...'
83 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
84 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
87 print 'Computing payout script from provided address....'
88 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
90 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
93 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
95 tracker = p2pool_data.OkayTracker(net)
96 shared_share_hashes = set()
97 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
98 known_verified = set()
100 print "Loading shares..."
101 for i, (mode, contents) in enumerate(ss.get_shares()):
103 if contents.hash in tracker.shares:
105 shared_share_hashes.add(contents.hash)
106 contents.time_seen = 0
107 tracker.add(contents)
108 if len(tracker.shares) % 1000 == 0 and tracker.shares:
109 print " %i" % (len(tracker.shares),)
110 elif mode == 'verified_hash':
111 known_verified.add(contents)
113 raise AssertionError()
114 print " ...inserting %i verified shares..." % (len(known_verified),)
115 for h in known_verified:
116 if h not in tracker.shares:
117 ss.forget_verified_share(h)
119 tracker.verified.add(tracker.shares[h])
120 print " ...done loading %i shares!" % (len(tracker.shares),)
122 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
123 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
124 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
126 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
128 pre_current_work = variable.Variable(None)
129 pre_merged_work = variable.Variable(None)
130 # information affecting work that should trigger a long-polling update
131 current_work = variable.Variable(None)
132 # information affecting work that should not trigger a long-polling update
133 current_work2 = variable.Variable(None)
135 requested = expiring_dict.ExpiringDict(300)
137 @defer.inlineCallbacks
138 def set_real_work1():
139 work = yield getwork(bitcoind)
140 current_work2.set(dict(
142 transactions=work['transactions'],
143 subsidy=work['subsidy'],
144 clock_offset=time.time() - work['time'],
145 last_update=time.time(),
146 )) # second set first because everything hooks on the first
147 pre_current_work.set(dict(
148 version=work['version'],
149 previous_block=work['previous_block_hash'],
151 coinbaseflags=work['coinbaseflags'],
154 def set_real_work2():
155 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
157 t = dict(pre_current_work.value)
158 t['best_share_hash'] = best
159 t['aux_work'] = pre_merged_work.value
163 for peer2, share_hash in desired:
164 if share_hash not in tracker.tails: # was received in the time tracker.think was running
166 last_request_time, count = requested.get(share_hash, (None, 0))
167 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
169 potential_peers = set()
170 for head in tracker.tails[share_hash]:
171 potential_peers.update(peer_heads.get(head, set()))
172 potential_peers = [peer for peer in potential_peers if peer.connected2]
173 if count == 0 and peer2 is not None and peer2.connected2:
176 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
180 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
184 stops=list(set(tracker.heads) | set(
185 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
188 requested[share_hash] = t, count + 1
189 pre_current_work.changed.watch(lambda _: set_real_work2())
191 print 'Initializing work...'
192 yield set_real_work1()
196 pre_merged_work.changed.watch(lambda _: set_real_work2())
197 ht.updated.watch(set_real_work2)
199 merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
201 @defer.inlineCallbacks
202 def set_merged_work():
204 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
205 pre_merged_work.set(dict(
206 hash=int(auxblock['hash'], 16),
207 target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
208 chain_id=auxblock['chainid'],
210 yield deferral.sleep(1)
211 if merged_proxy is not None:
214 @pre_merged_work.changed.watch
215 def _(new_merged_work):
216 print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
218 start_time = time.time() - current_work2.value['clock_offset']
220 # setup p2p logic and join p2pool network
222 class Node(p2p.Node):
223 def handle_shares(self, shares, peer):
225 print 'Processing %i shares...' % (len(shares),)
229 if share.hash in tracker.shares:
230 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
235 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
239 if shares and peer is not None:
240 peer_heads.setdefault(shares[0].hash, set()).add(peer)
246 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
248 def handle_share_hashes(self, hashes, peer):
251 for share_hash in hashes:
252 if share_hash in tracker.shares:
254 last_request_time, count = requested.get(share_hash, (None, 0))
255 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
257 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
258 get_hashes.append(share_hash)
259 requested[share_hash] = t, count + 1
261 if hashes and peer is not None:
262 peer_heads.setdefault(hashes[0], set()).add(peer)
264 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
266 def handle_get_shares(self, hashes, parents, stops, peer):
267 parents = min(parents, 1000//len(hashes))
270 for share_hash in hashes:
271 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
272 if share.hash in stops:
275 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
276 peer.sendShares(shares)
278 @tracker.verified.added.watch
280 if share.pow_hash <= share.header['bits'].target:
281 if factory.conn.value is not None:
282 factory.conn.value.send_block(block=share.as_block(tracker))
284 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
286 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
288 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
290 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
294 ip, port = x.split(':')
297 return x, net.P2P_PORT
300 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
302 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
304 print >>sys.stderr, "error reading addrs"
305 for addr in map(parse, net.BOOTSTRAP_ADDRS):
306 if addr not in addrs:
307 addrs[addr] = (0, time.time(), time.time())
310 best_share_hash_func=lambda: current_work.value['best_share_hash'],
311 port=args.p2pool_port,
314 connect_addrs=set(map(parse, args.p2pool_nodes)),
319 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
320 task.LoopingCall(save_addrs).start(60)
322 # send share when the chain changes to their chain
323 def work_changed(new_work):
324 #print 'Work changed:', new_work
326 for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
327 if share.hash in shared_share_hashes:
329 shared_share_hashes.add(share.hash)
332 for peer in p2p_node.peers.itervalues():
333 peer.sendShares([share for share in shares if share.peer is not peer])
335 current_work.changed.watch(work_changed)
338 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
340 if share.hash in tracker.verified.shares:
341 ss.add_verified_hash(share.hash)
342 task.LoopingCall(save_shares).start(60)
347 @defer.inlineCallbacks
351 is_lan, lan_ip = yield ipdiscover.get_local_ip()
353 pm = yield portmapper.get_port_mapper()
354 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
355 except defer.TimeoutError:
359 log.err(None, "UPnP error:")
360 yield deferral.sleep(random.expovariate(1/120))
365 # start listening for workers with a JSON-RPC server
367 print 'Listening for workers on port %i...' % (args.worker_port,)
369 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
370 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
371 vip_pass = f.read().strip('\r\n')
373 vip_pass = '%016x' % (random.randrange(2**64),)
374 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
376 print ' Worker password:', vip_pass, '(only required for generating graphs)'
380 removed_unstales_var = variable.Variable((0, 0, 0))
381 @tracker.verified.removed.watch
383 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
384 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
385 removed_unstales_var.set((
386 removed_unstales_var.value[0] + 1,
387 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
388 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
391 removed_doa_unstales_var = variable.Variable(0)
392 @tracker.verified.removed.watch
394 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
395 removed_doa_unstales.set(removed_doa_unstales.value + 1)
397 def get_stale_counts():
398 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
399 my_shares = len(my_share_hashes)
400 my_doa_shares = len(my_doa_share_hashes)
401 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
402 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
403 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
404 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
405 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
407 my_shares_not_in_chain = my_shares - my_shares_in_chain
408 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
410 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
412 class WorkerBridge(worker_interface.WorkerBridge):
414 worker_interface.WorkerBridge.__init__(self)
415 self.new_work_event = current_work.changed
417 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
419 def _get_payout_script_from_username(self, user):
423 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
426 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
428 def preprocess_request(self, request):
429 payout_script = self._get_payout_script_from_username(request.getUser())
430 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
431 payout_script = my_script
432 return payout_script,
434 def get_work(self, payout_script):
435 if len(p2p_node.peers) == 0 and net.PERSIST:
436 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
437 if current_work.value['best_share_hash'] is None and net.PERSIST:
438 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
439 if time.time() > current_work2.value['last_update'] + 60:
440 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
442 share_info, generate_tx = p2pool_data.generate_transaction(
445 previous_share_hash=current_work.value['best_share_hash'],
446 coinbase=(('' if current_work.value['aux_work'] is None else
447 '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)) + current_work.value['coinbaseflags'])[:100],
448 nonce=struct.pack('<Q', random.randrange(2**64)),
449 new_script=payout_script,
450 subsidy=current_work2.value['subsidy'],
451 donation=math.perfect_round(65535*args.donation_percentage/100),
452 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
453 253 if orphans > orphans_recorded_in_chain else
454 254 if doas > doas_recorded_in_chain else
456 )(*get_stale_counts()),
458 block_target=current_work.value['bits'].target,
459 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
463 print 'New work for worker! Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
464 bitcoin_data.target_to_difficulty(share_info['bits'].target),
465 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
466 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
467 len(current_work2.value['transactions']),
470 target = 2**256//2**32//8 - 1
471 target = max(target, share_info['bits'].target)
472 if current_work.value['aux_work']:
473 target = max(target, current_work.value['aux_work']['target'])
475 transactions = [generate_tx] + list(current_work2.value['transactions'])
476 merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
477 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target
479 return bitcoin_getwork.BlockAttempt(
480 version=current_work.value['version'],
481 previous_block=current_work.value['previous_block'],
482 merkle_root=merkle_root,
483 timestamp=current_work2.value['time'],
484 bits=current_work.value['bits'],
488 def got_response(self, header, request):
489 # match up with transactions
490 if header['merkle_root'] not in self.merkle_root_to_transactions:
491 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
493 share_info, transactions, getwork_time, aux_work, target = self.merkle_root_to_transactions[header['merkle_root']]
495 pow_hash = net.PARENT.POW_FUNC(header)
496 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
499 if pow_hash <= header['bits'].target or p2pool.DEBUG:
500 if factory.conn.value is not None:
501 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
503 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
504 if pow_hash <= header['bits'].target:
506 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.block_header_type.hash256(header),)
508 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.block_header_type.hash256(header),) })
510 log.err(None, 'Error while processing potential block:')
513 if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
514 assert bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex') == transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex')
515 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
516 bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex'),
517 bitcoin_data.aux_pow_type.pack(dict(
520 block_hash=bitcoin_data.block_header_type.hash256(header),
521 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
526 parent_block_header=header,
531 if result != (pow_hash <= aux_work['target']):
532 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
534 print 'Merged block submittal result: %s' % (result,)
537 log.err(err, 'Error submitting merged block:')
539 log.err(None, 'Error while processing merged mining POW:')
541 if pow_hash <= share_info['bits'].target:
542 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
543 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
545 p2pool_data.format_hash(share.hash),
546 p2pool_data.format_hash(share.previous_hash),
547 time.time() - getwork_time,
548 ' DEAD ON ARRIVAL' if not on_time else '',
550 my_share_hashes.add(share.hash)
552 my_doa_share_hashes.add(share.hash)
553 p2p_node.handle_shares([share], None)
555 if pow_hash <= target:
556 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
557 if request.getPassword() == vip_pass:
558 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
560 if pow_hash > target:
561 print 'Worker submitted share with hash > target:'
562 print ' Hash: %56x' % (pow_hash,)
563 print ' Target: %56x' % (target,)
567 web_root = resource.Resource()
568 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
571 if tracker.get_height(current_work.value['best_share_hash']) < 720:
572 return json.dumps(None)
573 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
574 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
577 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
578 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
580 for script in sorted(weights, key=lambda s: weights[s]):
581 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
582 return json.dumps(res)
584 def get_current_txouts():
586 tmp_tag = str(random.randrange(2**64))
587 outputs = wb.merkle_root_to_transactions[wb.get_work(tmp_tag).merkle_root][1][0]['tx_outs']
588 total = sum(out['value'] for out in outputs)
589 total_without_tag = sum(out['value'] for out in outputs if out['script'] != tmp_tag)
590 total_diff = total - total_without_tag
591 return dict((out['script'], out['value'] + math.perfect_round(out['value']*total_diff/total)) for out in outputs if out['script'] != tmp_tag and out['value'])
593 def get_current_scaled_txouts(scale, trunc=0):
594 txouts = get_current_txouts()
595 total = sum(txouts.itervalues())
596 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
600 for s in sorted(results, key=results.__getitem__):
601 total_random += results[s]
603 if total_random >= trunc and results[s] >= trunc:
605 winner = math.weighted_choice((script, results[script]) for script in random_set)
606 for script in random_set:
608 results[winner] = total_random
609 if sum(results.itervalues()) < int(scale):
610 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
613 def get_current_payouts():
614 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
616 def get_patron_sendmany(this):
619 this, trunc = this.split('/', 1)
622 return json.dumps(dict(
623 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
624 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
625 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
628 return json.dumps(None)
630 def get_global_stats():
631 # averaged over last hour
632 lookbehind = 3600//net.SHARE_PERIOD
633 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
636 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
637 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
638 return json.dumps(dict(
639 pool_nonstale_hash_rate=nonstale_hash_rate,
640 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
641 pool_stale_prop=stale_prop,
644 def get_local_stats():
645 lookbehind = 3600//net.SHARE_PERIOD
646 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
649 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
651 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
652 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
653 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
654 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
655 my_stale_count = my_orphan_count + my_doa_count
657 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
659 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
660 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
661 if share.hash in my_share_hashes)
662 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
663 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
664 share_att_s = my_work / actual_time
666 return json.dumps(dict(
667 my_hash_rates_in_last_hour=dict(
668 nonstale=share_att_s,
669 rewarded=share_att_s/(1 - global_stale_prop),
670 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
672 my_share_counts_in_last_hour=dict(
673 shares=my_share_count,
674 unstale_shares=my_unstale_count,
675 stale_shares=my_stale_count,
676 orphan_stale_shares=my_orphan_count,
677 doa_stale_shares=my_doa_count,
679 my_stale_proportions_in_last_hour=dict(
681 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
682 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
686 def get_peer_addresses():
687 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
689 class WebInterface(resource.Resource):
690 def __init__(self, func, mime_type, *fields):
691 self.func, self.mime_type, self.fields = func, mime_type, fields
693 def render_GET(self, request):
694 request.setHeader('Content-Type', self.mime_type)
695 request.setHeader('Access-Control-Allow-Origin', '*')
696 return self.func(*(request.args[field][0] for field in self.fields))
698 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
699 web_root.putChild('users', WebInterface(get_users, 'application/json'))
700 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
701 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
702 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
703 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
704 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
705 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
706 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
707 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
709 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
711 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
712 web_root.putChild('graphs', grapher.get_resource())
714 if tracker.get_height(current_work.value['best_share_hash']) < 720:
716 grapher.add_poolrate_point(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
717 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
718 task.LoopingCall(add_point).start(100)
720 reactor.listenTCP(args.worker_port, server.Site(web_root))
726 @defer.inlineCallbacks
729 flag = factory.new_block.get_deferred()
731 yield set_real_work1()
734 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
739 print 'Started successfully!'
743 if hasattr(signal, 'SIGALRM'):
744 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
745 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
747 signal.siginterrupt(signal.SIGALRM, False)
748 task.LoopingCall(signal.alarm, 30).start(1)
750 @defer.inlineCallbacks
755 yield deferral.sleep(3)
757 if time.time() > current_work2.value['last_update'] + 60:
758 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
759 if current_work.value['best_share_hash'] is not None:
760 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
762 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
763 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
764 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
765 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
766 real_att_s = att_s / (1 - stale_prop)
767 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
768 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
769 math.format(int(real_att_s)),
771 len(tracker.verified.shares),
773 weights.get(my_script, 0)/total_weight*100,
774 math.format(int(my_att_s)),
779 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
780 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
781 this_str += '\nAverage time between blocks: %.2f days' % (
782 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
784 this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
787 stale_shares = stale_orphan_shares + stale_doa_shares
788 stale_center, stale_radius = math.binomial_conf_center_radius(stale_shares, shares, conf)
789 this_str += u' Own: %i±%i%%' % (int(100*stale_center+.5), int(100*stale_radius+.5))
791 eff_center = (1 - stale_center)/(1 - stale_prop)
792 eff_radius = stale_radius/(1 - stale_prop)
793 this_str += u' Own efficiency: %i±%i%%' % (int(100*eff_center+.5), int(100*eff_radius+.5))
794 if this_str != last_str or time.time() > last_time + 15:
797 last_time = time.time()
802 log.err(None, 'Fatal error:')
805 class FixedArgumentParser(argparse.ArgumentParser):
806 def _read_args_from_files(self, arg_strings):
807 # expand arguments referencing files
809 for arg_string in arg_strings:
811 # for regular arguments, just add them back into the list
812 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
813 new_arg_strings.append(arg_string)
815 # replace arguments referencing files with the file content
818 args_file = open(arg_string[1:])
821 for arg_line in args_file.read().splitlines():
822 for arg in self.convert_arg_line_to_args(arg_line):
823 arg_strings.append(arg)
824 arg_strings = self._read_args_from_files(arg_strings)
825 new_arg_strings.extend(arg_strings)
829 err = sys.exc_info()[1]
832 # return the modified argument list
833 return new_arg_strings
835 def convert_arg_line_to_args(self, arg_line):
836 return [arg for arg in arg_line.split() if arg.strip()]
838 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
839 parser.add_argument('--version', action='version', version=p2pool.__version__)
840 parser.add_argument('--net',
841 help='use specified network (default: bitcoin)',
842 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
843 parser.add_argument('--testnet',
844 help='''use the network's testnet''',
845 action='store_const', const=True, default=False, dest='testnet')
846 parser.add_argument('--debug',
847 help='enable debugging mode',
848 action='store_const', const=True, default=False, dest='debug')
849 parser.add_argument('-a', '--address',
850 help='generate payouts to this address (default: <address requested from bitcoind>)',
851 type=str, action='store', default=None, dest='address')
852 parser.add_argument('--logfile',
853 help='''log to this file (default: data/<NET>/log)''',
854 type=str, action='store', default=None, dest='logfile')
855 parser.add_argument('--merged-url',
856 help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
857 type=str, action='store', default=None, dest='merged_url')
858 parser.add_argument('--merged-userpass',
859 help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
860 type=str, action='store', default=None, dest='merged_userpass')
861 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
862 help='donate this percentage of work to author of p2pool (default: 0.5)',
863 type=float, action='store', default=0.5, dest='donation_percentage')
865 p2pool_group = parser.add_argument_group('p2pool interface')
866 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
867 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
868 type=int, action='store', default=None, dest='p2pool_port')
869 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
870 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
871 type=str, action='append', default=[], dest='p2pool_nodes')
872 parser.add_argument('--disable-upnp',
873 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
874 action='store_false', default=True, dest='upnp')
876 worker_group = parser.add_argument_group('worker interface')
877 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
878 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
879 type=int, action='store', default=None, dest='worker_port')
880 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
881 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
882 type=float, action='store', default=0, dest='worker_fee')
884 bitcoind_group = parser.add_argument_group('bitcoind interface')
885 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
886 help='connect to this address (default: 127.0.0.1)',
887 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
888 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
889 help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
890 type=int, action='store', default=None, dest='bitcoind_rpc_port')
891 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
892 help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
893 type=int, action='store', default=None, dest='bitcoind_p2p_port')
895 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
896 help='bitcoind RPC interface username (default: <empty>)',
897 type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
898 bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
899 help='bitcoind RPC interface password',
900 type=str, action='store', dest='bitcoind_rpc_password')
902 args = parser.parse_args()
907 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
909 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
910 if not os.path.exists(datadir_path):
911 os.makedirs(datadir_path)
913 if args.logfile is None:
914 args.logfile = os.path.join(datadir_path, 'log')
916 class EncodeReplacerPipe(object):
917 def __init__(self, inner_file):
918 self.inner_file = inner_file
920 def write(self, data):
921 if isinstance(data, unicode):
923 data = data.encode(self.inner_file.encoding, 'replace')
925 data = data.encode('ascii', 'replace')
926 self.inner_file.write(data)
928 self.inner_file.flush()
929 class LogFile(object):
930 def __init__(self, filename):
931 self.filename = filename
932 self.inner_file = None
935 if self.inner_file is not None:
936 self.inner_file.close()
937 open(self.filename, 'a').close()
938 f = open(self.filename, 'rb')
939 f.seek(0, os.SEEK_END)
941 if length > 100*1000*1000:
942 f.seek(-1000*1000, os.SEEK_END)
944 if f.read(1) in ('', '\n'):
948 f = open(self.filename, 'wb')
951 self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
952 def write(self, data):
953 self.inner_file.write(data)
955 self.inner_file.flush()
956 class TeePipe(object):
957 def __init__(self, outputs):
958 self.outputs = outputs
959 def write(self, data):
960 for output in self.outputs:
963 for output in self.outputs:
965 class TimestampingPipe(object):
966 def __init__(self, inner_file):
967 self.inner_file = inner_file
970 def write(self, data):
971 buf = self.buf + data
972 lines = buf.split('\n')
973 for line in lines[:-1]:
974 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
975 self.inner_file.flush()
979 class AbortPipe(object):
980 def __init__(self, inner_file):
981 self.inner_file = inner_file
983 def write(self, data):
985 self.inner_file.write(data)
987 sys.stdout = sys.__stdout__
988 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
991 self.inner_file.flush()
992 class PrefixPipe(object):
993 def __init__(self, inner_file, prefix):
994 self.inner_file = inner_file
998 def write(self, data):
999 buf = self.buf + data
1000 lines = buf.split('\n')
1001 for line in lines[:-1]:
1002 self.inner_file.write(self.prefix + line + '\n')
1003 self.inner_file.flush()
1004 self.buf = lines[-1]
1007 logfile = LogFile(args.logfile)
1008 pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
1009 sys.stdout = AbortPipe(pipe)
1010 sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
1011 if hasattr(signal, "SIGUSR1"):
1012 def sigusr1(signum, frame):
1013 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1015 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1016 signal.signal(signal.SIGUSR1, sigusr1)
1017 task.LoopingCall(logfile.reopen).start(5)
1019 if args.bitcoind_rpc_port is None:
1020 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1022 if args.bitcoind_p2p_port is None:
1023 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1025 if args.p2pool_port is None:
1026 args.p2pool_port = net.P2P_PORT
1028 if args.worker_port is None:
1029 args.worker_port = net.WORKER_PORT
1031 if args.address is not None:
1033 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1034 except Exception, e:
1035 parser.error('error parsing address: ' + repr(e))
1037 args.pubkey_hash = None
1039 if (args.merged_url is None) ^ (args.merged_userpass is None):
1040 parser.error('must specify --merged-url and --merged-userpass')
1042 reactor.callWhenRunning(main, args, net, datadir_path)