4 from __future__ import division
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks, graphs
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32 work = yield bitcoind.rpc_getmemorypool()
33 defer.returnValue(dict(
34 version=work['version'],
35 previous_block_hash=int(work['previousblockhash'], 16),
36 transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
42 @defer.inlineCallbacks
43 def main(args, net, datadir_path):
45 print 'p2pool (version %s)' % (p2pool.__version__,)
51 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 if args.pubkey_hash is None:
76 print 'Getting payout address from bitcoind...'
77 my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
78 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
81 print 'Computing payout script from provided address....'
82 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
84 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
87 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
89 tracker = p2pool_data.OkayTracker(net)
90 shared_share_hashes = set()
91 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
92 known_verified = set()
94 print "Loading shares..."
95 for i, (mode, contents) in enumerate(ss.get_shares()):
97 if contents.hash in tracker.shares:
99 shared_share_hashes.add(contents.hash)
100 contents.time_seen = 0
101 tracker.add(contents)
102 if len(tracker.shares) % 1000 == 0 and tracker.shares:
103 print " %i" % (len(tracker.shares),)
104 elif mode == 'verified_hash':
105 known_verified.add(contents)
107 raise AssertionError()
108 print " ...inserting %i verified shares..." % (len(known_verified),)
109 for h in known_verified:
110 if h not in tracker.shares:
111 ss.forget_verified_share(h)
113 tracker.verified.add(tracker.shares[h])
114 print " ...done loading %i shares!" % (len(tracker.shares),)
116 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
117 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
118 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
120 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
122 pre_current_work = variable.Variable(None)
123 pre_merged_work = variable.Variable(None)
124 # information affecting work that should trigger a long-polling update
125 current_work = variable.Variable(None)
126 # information affecting work that should not trigger a long-polling update
127 current_work2 = variable.Variable(None)
129 requested = expiring_dict.ExpiringDict(300)
131 @defer.inlineCallbacks
132 def set_real_work1():
133 work = yield getwork(bitcoind)
134 current_work2.set(dict(
136 transactions=work['transactions'],
137 subsidy=work['subsidy'],
138 clock_offset=time.time() - work['time'],
139 last_update=time.time(),
140 )) # second set first because everything hooks on the first
141 pre_current_work.set(dict(
142 version=work['version'],
143 previous_block=work['previous_block_hash'],
147 def set_real_work2():
148 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
150 t = dict(pre_current_work.value)
151 t['best_share_hash'] = best
152 t['aux_work'] = pre_merged_work.value
156 for peer2, share_hash in desired:
157 if share_hash not in tracker.tails: # was received in the time tracker.think was running
159 last_request_time, count = requested.get(share_hash, (None, 0))
160 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
162 potential_peers = set()
163 for head in tracker.tails[share_hash]:
164 potential_peers.update(peer_heads.get(head, set()))
165 potential_peers = [peer for peer in potential_peers if peer.connected2]
166 if count == 0 and peer2 is not None and peer2.connected2:
169 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
173 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
177 stops=list(set(tracker.heads) | set(
178 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
181 requested[share_hash] = t, count + 1
182 pre_current_work.changed.watch(lambda _: set_real_work2())
184 print 'Initializing work...'
185 yield set_real_work1()
189 pre_merged_work.changed.watch(lambda _: set_real_work2())
190 ht.updated.watch(set_real_work2)
192 merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
194 @defer.inlineCallbacks
195 def set_merged_work():
197 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
198 pre_merged_work.set(dict(
199 hash=int(auxblock['hash'], 16),
200 target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
201 chain_id=auxblock['chainid'],
203 yield deferral.sleep(1)
204 if merged_proxy is not None:
207 @pre_merged_work.changed.watch
208 def _(new_merged_work):
209 print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
211 start_time = time.time() - current_work2.value['clock_offset']
213 # setup p2p logic and join p2pool network
215 class Node(p2p.Node):
216 def handle_shares(self, shares, peer):
218 print 'Processing %i shares...' % (len(shares),)
222 if share.hash in tracker.shares:
223 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
228 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
232 if shares and peer is not None:
233 peer_heads.setdefault(shares[0].hash, set()).add(peer)
239 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
241 def handle_share_hashes(self, hashes, peer):
244 for share_hash in hashes:
245 if share_hash in tracker.shares:
247 last_request_time, count = requested.get(share_hash, (None, 0))
248 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
250 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
251 get_hashes.append(share_hash)
252 requested[share_hash] = t, count + 1
254 if hashes and peer is not None:
255 peer_heads.setdefault(hashes[0], set()).add(peer)
257 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
259 def handle_get_shares(self, hashes, parents, stops, peer):
260 parents = min(parents, 1000//len(hashes))
263 for share_hash in hashes:
264 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
265 if share.hash in stops:
268 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
269 peer.sendShares(shares)
271 @tracker.verified.added.watch
273 if share.pow_hash <= share.header['bits'].target:
274 if factory.conn.value is not None:
275 factory.conn.value.send_block(block=share.as_block(tracker))
277 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
279 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
281 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
283 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
287 ip, port = x.split(':')
290 return x, net.P2P_PORT
293 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
295 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
297 print >>sys.stderr, "error reading addrs"
298 for addr in map(parse, net.BOOTSTRAP_ADDRS):
299 if addr not in addrs:
300 addrs[addr] = (0, time.time(), time.time())
303 best_share_hash_func=lambda: current_work.value['best_share_hash'],
304 port=args.p2pool_port,
307 connect_addrs=set(map(parse, args.p2pool_nodes)),
312 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
313 task.LoopingCall(save_addrs).start(60)
315 # send share when the chain changes to their chain
316 def work_changed(new_work):
317 #print 'Work changed:', new_work
319 for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
320 if share.hash in shared_share_hashes:
322 shared_share_hashes.add(share.hash)
325 for peer in p2p_node.peers.itervalues():
326 peer.sendShares([share for share in shares if share.peer is not peer])
328 current_work.changed.watch(work_changed)
331 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
333 if share.hash in tracker.verified.shares:
334 ss.add_verified_hash(share.hash)
335 task.LoopingCall(save_shares).start(60)
340 @defer.inlineCallbacks
344 is_lan, lan_ip = yield ipdiscover.get_local_ip()
346 pm = yield portmapper.get_port_mapper()
347 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
348 except defer.TimeoutError:
352 log.err(None, "UPnP error:")
353 yield deferral.sleep(random.expovariate(1/120))
358 # start listening for workers with a JSON-RPC server
360 print 'Listening for workers on port %i...' % (args.worker_port,)
364 removed_unstales_var = variable.Variable((0, 0, 0))
365 @tracker.verified.removed.watch
367 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
368 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
369 removed_unstales_var.set((
370 removed_unstales_var.value[0] + 1,
371 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
372 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
375 removed_doa_unstales_var = variable.Variable(0)
376 @tracker.verified.removed.watch
378 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
379 removed_doa_unstales.set(removed_doa_unstales.value + 1)
381 stale_counter = skiplists.SumSkipList(tracker, lambda share: (
382 1 if share.hash in my_share_hashes else 0,
383 1 if share.hash in my_doa_share_hashes else 0,
384 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
385 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
386 ), (0, 0, 0, 0), math.add_tuples)
387 def get_stale_counts():
388 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
389 my_shares = len(my_share_hashes)
390 my_doa_shares = len(my_doa_share_hashes)
391 my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
392 current_work.value['best_share_hash'],
393 tracker.verified.get_height(current_work.value['best_share_hash']),
395 my_shares_in_chain += removed_unstales_var.value[0]
396 my_doa_shares_in_chain += removed_doa_unstales_var.value
397 orphans_recorded_in_chain += removed_unstales_var.value[1]
398 doas_recorded_in_chain += removed_unstales_var.value[2]
400 my_shares_not_in_chain = my_shares - my_shares_in_chain
401 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
403 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
405 my_share_hashes = set()
406 my_doa_share_hashes = set()
408 class WorkerBridge(worker_interface.WorkerBridge):
410 worker_interface.WorkerBridge.__init__(self)
411 self.new_work_event = current_work.changed
413 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
415 def _get_payout_script_from_username(self, user):
419 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
422 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
424 def preprocess_request(self, request):
425 payout_script = self._get_payout_script_from_username(request.getUser())
426 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
427 payout_script = my_script
428 return payout_script,
430 def get_work(self, payout_script):
431 if len(p2p_node.peers) == 0 and net.PERSIST:
432 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
433 if current_work.value['best_share_hash'] is None and net.PERSIST:
434 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
435 if time.time() > current_work2.value['last_update'] + 60:
436 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
438 share_info, generate_tx = p2pool_data.generate_transaction(
441 previous_share_hash=current_work.value['best_share_hash'],
442 coinbase='' if current_work.value['aux_work'] is None else
443 '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
444 nonce=struct.pack('<Q', random.randrange(2**64)),
445 new_script=payout_script,
446 subsidy=current_work2.value['subsidy'],
447 donation=math.perfect_round(65535*args.donation_percentage/100),
448 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
449 253 if orphans > orphans_recorded_in_chain else
450 254 if doas > doas_recorded_in_chain else
452 )(*get_stale_counts()),
454 block_target=current_work.value['bits'].target,
455 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
459 print 'New work for worker! Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
460 bitcoin_data.target_to_difficulty(share_info['bits'].target),
461 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
462 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
463 len(current_work2.value['transactions']),
466 target = 2**256//2**32//8 - 1
467 target = max(target, share_info['bits'].target)
468 if current_work.value['aux_work']:
469 target = max(target, current_work.value['aux_work']['target'])
471 transactions = [generate_tx] + list(current_work2.value['transactions'])
472 merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
473 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target
475 return bitcoin_getwork.BlockAttempt(
476 version=current_work.value['version'],
477 previous_block=current_work.value['previous_block'],
478 merkle_root=merkle_root,
479 timestamp=current_work2.value['time'],
480 bits=current_work.value['bits'],
484 def got_response(self, header, request):
485 # match up with transactions
486 if header['merkle_root'] not in self.merkle_root_to_transactions:
487 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
489 share_info, transactions, getwork_time, aux_work, target = self.merkle_root_to_transactions[header['merkle_root']]
491 pow_hash = net.PARENT.POW_FUNC(header)
492 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
495 if pow_hash <= header['bits'].target or p2pool.DEBUG:
496 if factory.conn.value is not None:
497 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
499 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
500 if pow_hash <= header['bits'].target:
502 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.block_header_type.hash256(header),)
504 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.block_header_type.hash256(header),) })
506 log.err(None, 'Error while processing potential block:')
509 if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
510 assert bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex') == transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex')
511 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
512 bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex'),
513 bitcoin_data.aux_pow_type.pack(dict(
516 block_hash=bitcoin_data.block_header_type.hash256(header),
517 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
522 parent_block_header=header,
527 if result != (pow_hash <= aux_work['target']):
528 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
530 print 'Merged block submittal result: %s' % (result,)
533 log.err(err, 'Error submitting merged block:')
535 log.err(None, 'Error while processing merged mining POW:')
537 if pow_hash <= share_info['bits'].target:
538 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
539 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
541 p2pool_data.format_hash(share.hash),
542 p2pool_data.format_hash(share.previous_hash),
543 time.time() - getwork_time,
544 ' DEAD ON ARRIVAL' if not on_time else '',
546 my_share_hashes.add(share.hash)
548 my_doa_share_hashes.add(share.hash)
549 p2p_node.handle_shares([share], None)
551 if pow_hash <= target:
552 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
554 if pow_hash > target:
555 print 'Worker submitted share with hash > target:'
556 print ' Hash: %56x' % (pow_hash,)
557 print ' Target: %56x' % (target,)
561 web_root = resource.Resource()
562 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
565 if tracker.get_height(current_work.value['best_share_hash']) < 720:
566 return json.dumps(None)
567 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
568 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
571 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
572 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
574 for script in sorted(weights, key=lambda s: weights[s]):
575 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
576 return json.dumps(res)
578 def get_current_txouts():
580 tmp_tag = str(random.randrange(2**64))
581 outputs = wb.merkle_root_to_transactions[wb.get_work(tmp_tag).merkle_root][1][0]['tx_outs']
582 total = sum(out['value'] for out in outputs)
583 total_without_tag = sum(out['value'] for out in outputs if out['script'] != tmp_tag)
584 total_diff = total - total_without_tag
585 return dict((out['script'], out['value'] + math.perfect_round(out['value']*total_diff/total)) for out in outputs if out['script'] != tmp_tag and out['value'])
587 def get_current_scaled_txouts(scale, trunc=0):
588 txouts = get_current_txouts()
589 total = sum(txouts.itervalues())
590 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
594 for s in sorted(results, key=results.__getitem__):
595 total_random += results[s]
597 if total_random >= trunc and results[s] >= trunc:
599 winner = math.weighted_choice((script, results[script]) for script in random_set)
600 for script in random_set:
602 results[winner] = total_random
603 if sum(results.itervalues()) < int(scale):
604 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
607 def get_current_payouts():
608 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
610 def get_patron_sendmany(this):
613 this, trunc = this.split('/', 1)
616 return json.dumps(dict(
617 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
618 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
619 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
622 return json.dumps(None)
624 def get_global_stats():
625 # averaged over last hour
626 lookbehind = 3600//net.SHARE_PERIOD
627 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
630 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
631 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
632 return json.dumps(dict(
633 pool_nonstale_hash_rate=nonstale_hash_rate,
634 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
635 pool_stale_prop=stale_prop,
638 def get_local_stats():
639 lookbehind = 3600//net.SHARE_PERIOD
640 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
643 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
645 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
646 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
647 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
648 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
649 my_stale_count = my_orphan_count + my_doa_count
651 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
653 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
654 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
655 if share.hash in my_share_hashes)
656 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
657 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
658 share_att_s = my_work / actual_time
660 return json.dumps(dict(
661 my_hash_rates_in_last_hour=dict(
662 nonstale=share_att_s,
663 rewarded=share_att_s/(1 - global_stale_prop),
664 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
666 my_share_counts_in_last_hour=dict(
667 shares=my_share_count,
668 unstale_shares=my_unstale_count,
669 stale_shares=my_stale_count,
670 orphan_stale_shares=my_orphan_count,
671 doa_stale_shares=my_doa_count,
673 my_stale_proportions_in_last_hour=dict(
675 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
676 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
680 def get_peer_addresses():
681 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
683 class WebInterface(resource.Resource):
684 def __init__(self, func, mime_type, *fields):
685 self.func, self.mime_type, self.fields = func, mime_type, fields
687 def render_GET(self, request):
688 request.setHeader('Content-Type', self.mime_type)
689 request.setHeader('Access-Control-Allow-Origin', '*')
690 return self.func(*(request.args[field][0] for field in self.fields))
692 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
693 web_root.putChild('users', WebInterface(get_users, 'application/json'))
694 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
695 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
696 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
697 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
698 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
699 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
700 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
701 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
703 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
705 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
706 web_root.putChild('graphs', grapher.get_resource())
708 if tracker.get_height(current_work.value['best_share_hash']) < 720:
710 grapher.add_poolrate_point(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
711 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
712 task.LoopingCall(add_point).start(100)
714 reactor.listenTCP(args.worker_port, server.Site(web_root))
720 @defer.inlineCallbacks
723 flag = factory.new_block.get_deferred()
725 yield set_real_work1()
728 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
733 print 'Started successfully!'
737 if hasattr(signal, 'SIGALRM'):
738 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
739 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
741 signal.siginterrupt(signal.SIGALRM, False)
742 task.LoopingCall(signal.alarm, 30).start(1)
744 @defer.inlineCallbacks
749 yield deferral.sleep(3)
751 if time.time() > current_work2.value['last_update'] + 60:
752 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
753 if current_work.value['best_share_hash'] is not None:
754 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
756 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
757 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
758 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
759 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
760 real_att_s = att_s / (1 - stale_prop)
761 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
762 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
763 math.format(int(real_att_s)),
765 len(tracker.verified.shares),
767 weights.get(my_script, 0)/total_weight*100,
768 math.format(int(my_att_s)),
773 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
774 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
775 this_str += '\nAverage time between blocks: %.2f days' % (
776 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
778 this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
781 stale_shares = stale_orphan_shares + stale_doa_shares
782 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
784 this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
785 if this_str != last_str or time.time() > last_time + 15:
788 last_time = time.time()
793 log.err(None, 'Fatal error:')
796 class FixedArgumentParser(argparse.ArgumentParser):
797 def _read_args_from_files(self, arg_strings):
798 # expand arguments referencing files
800 for arg_string in arg_strings:
802 # for regular arguments, just add them back into the list
803 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
804 new_arg_strings.append(arg_string)
806 # replace arguments referencing files with the file content
809 args_file = open(arg_string[1:])
812 for arg_line in args_file.read().splitlines():
813 for arg in self.convert_arg_line_to_args(arg_line):
814 arg_strings.append(arg)
815 arg_strings = self._read_args_from_files(arg_strings)
816 new_arg_strings.extend(arg_strings)
820 err = sys.exc_info()[1]
823 # return the modified argument list
824 return new_arg_strings
826 def convert_arg_line_to_args(self, arg_line):
827 return [arg for arg in arg_line.split() if arg.strip()]
829 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
830 parser.add_argument('--version', action='version', version=p2pool.__version__)
831 parser.add_argument('--net',
832 help='use specified network (default: bitcoin)',
833 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
834 parser.add_argument('--testnet',
835 help='''use the network's testnet''',
836 action='store_const', const=True, default=False, dest='testnet')
837 parser.add_argument('--debug',
838 help='enable debugging mode',
839 action='store_const', const=True, default=False, dest='debug')
840 parser.add_argument('-a', '--address',
841 help='generate payouts to this address (default: <address requested from bitcoind>)',
842 type=str, action='store', default=None, dest='address')
843 parser.add_argument('--logfile',
844 help='''log to this file (default: data/<NET>/log)''',
845 type=str, action='store', default=None, dest='logfile')
846 parser.add_argument('--merged-url',
847 help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
848 type=str, action='store', default=None, dest='merged_url')
849 parser.add_argument('--merged-userpass',
850 help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
851 type=str, action='store', default=None, dest='merged_userpass')
852 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
853 help='donate this percentage of work to author of p2pool (default: 0.5)',
854 type=float, action='store', default=0.5, dest='donation_percentage')
856 p2pool_group = parser.add_argument_group('p2pool interface')
857 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
858 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
859 type=int, action='store', default=None, dest='p2pool_port')
860 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
861 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
862 type=str, action='append', default=[], dest='p2pool_nodes')
863 parser.add_argument('--disable-upnp',
864 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
865 action='store_false', default=True, dest='upnp')
867 worker_group = parser.add_argument_group('worker interface')
868 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
869 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
870 type=int, action='store', default=None, dest='worker_port')
871 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
872 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
873 type=float, action='store', default=0, dest='worker_fee')
875 bitcoind_group = parser.add_argument_group('bitcoind interface')
876 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
877 help='connect to this address (default: 127.0.0.1)',
878 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
879 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
880 help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
881 type=int, action='store', default=None, dest='bitcoind_rpc_port')
882 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
883 help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
884 type=int, action='store', default=None, dest='bitcoind_p2p_port')
886 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
887 help='bitcoind RPC interface username (default: <empty>)',
888 type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
889 bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
890 help='bitcoind RPC interface password',
891 type=str, action='store', dest='bitcoind_rpc_password')
893 args = parser.parse_args()
898 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
900 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
901 if not os.path.exists(datadir_path):
902 os.makedirs(datadir_path)
904 if args.logfile is None:
905 args.logfile = os.path.join(datadir_path, 'log')
907 class EncodeReplacerPipe(object):
908 def __init__(self, inner_file):
909 self.inner_file = inner_file
911 def write(self, data):
912 if isinstance(data, unicode):
914 data = data.encode(self.inner_file.encoding, 'replace')
916 data = data.encode('ascii', 'replace')
917 self.inner_file.write(data)
919 self.inner_file.flush()
920 class LogFile(object):
921 def __init__(self, filename):
922 self.filename = filename
923 self.inner_file = None
926 if self.inner_file is not None:
927 self.inner_file.close()
928 open(self.filename, 'a').close()
929 f = open(self.filename, 'rb')
930 f.seek(0, os.SEEK_END)
932 if length > 100*1000*1000:
933 f.seek(-1000*1000, os.SEEK_END)
935 if f.read(1) in ('', '\n'):
939 f = open(self.filename, 'wb')
942 self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
943 def write(self, data):
944 self.inner_file.write(data)
946 self.inner_file.flush()
947 class TeePipe(object):
948 def __init__(self, outputs):
949 self.outputs = outputs
950 def write(self, data):
951 for output in self.outputs:
954 for output in self.outputs:
956 class TimestampingPipe(object):
957 def __init__(self, inner_file):
958 self.inner_file = inner_file
961 def write(self, data):
962 buf = self.buf + data
963 lines = buf.split('\n')
964 for line in lines[:-1]:
965 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
966 self.inner_file.flush()
970 class AbortPipe(object):
971 def __init__(self, inner_file):
972 self.inner_file = inner_file
974 def write(self, data):
976 self.inner_file.write(data)
978 sys.stdout = sys.__stdout__
979 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
982 self.inner_file.flush()
983 class PrefixPipe(object):
984 def __init__(self, inner_file, prefix):
985 self.inner_file = inner_file
989 def write(self, data):
990 buf = self.buf + data
991 lines = buf.split('\n')
992 for line in lines[:-1]:
993 self.inner_file.write(self.prefix + line + '\n')
994 self.inner_file.flush()
998 logfile = LogFile(args.logfile)
999 pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
1000 sys.stdout = AbortPipe(pipe)
1001 sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
1002 if hasattr(signal, "SIGUSR1"):
1003 def sigusr1(signum, frame):
1004 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1006 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1007 signal.signal(signal.SIGUSR1, sigusr1)
1008 task.LoopingCall(logfile.reopen).start(5)
1010 if args.bitcoind_rpc_port is None:
1011 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1013 if args.bitcoind_p2p_port is None:
1014 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1016 if args.p2pool_port is None:
1017 args.p2pool_port = net.P2P_PORT
1019 if args.worker_port is None:
1020 args.worker_port = net.WORKER_PORT
1022 if args.address is not None:
1024 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1025 except Exception, e:
1026 parser.error('error parsing address: ' + repr(e))
1028 args.pubkey_hash = None
1030 if (args.merged_url is None) ^ (args.merged_userpass is None):
1031 parser.error('must specify --merged-url and --merged-userpass')
1033 reactor.callWhenRunning(main, args, net, datadir_path)