4 from __future__ import division
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks, graphs
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32 work = yield bitcoind.rpc_getmemorypool()
33 defer.returnValue(dict(
34 version=work['version'],
35 previous_block_hash=int(work['previousblockhash'], 16),
36 transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45 address = yield bitcoind.rpc_getaccountaddress('p2pool')
46 validate_response = yield bitcoind.rpc_validateaddress(address)
47 if 'pubkey' not in validate_response:
48 print ' Pubkey request failed. Falling back to payout to address.'
49 defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50 pubkey = validate_response['pubkey'].decode('hex')
51 assert bitcoin_data.pubkey_to_address(pubkey, net2) == address
52 defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
54 @defer.inlineCallbacks
55 def main(args, net, datadir_path):
57 print 'p2pool (version %s)' % (p2pool.__version__,)
63 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
66 bitcoin_data.Type.enable_caching()
68 # connect to bitcoind over JSON-RPC and do initial getmemorypool
69 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
70 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
71 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
72 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
74 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
76 temp_work = yield getwork(bitcoind)
78 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
81 # connect to bitcoind over bitcoin-p2p
82 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83 factory = bitcoin_p2p.ClientFactory(net.PARENT)
84 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85 yield factory.getProtocol() # waits until handshake is successful
89 if args.pubkey_hash is None:
90 print 'Getting payout address from bitcoind...'
91 my_script = yield get_payout_script2(bitcoind, net.PARENT)
93 print 'Computing payout script from provided address....'
94 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
96 print ' Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
99 ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
101 tracker = p2pool_data.OkayTracker(net)
102 shared_share_hashes = set()
103 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
104 known_verified = set()
106 print "Loading shares..."
107 for i, (mode, contents) in enumerate(ss.get_shares()):
109 if contents.hash in tracker.shares:
111 shared_share_hashes.add(contents.hash)
112 contents.time_seen = 0
113 tracker.add(contents)
114 if len(tracker.shares) % 1000 == 0 and tracker.shares:
115 print " %i" % (len(tracker.shares),)
116 elif mode == 'verified_hash':
117 known_verified.add(contents)
119 raise AssertionError()
120 print " ...inserting %i verified shares..." % (len(known_verified),)
121 for h in known_verified:
122 if h not in tracker.shares:
123 ss.forget_verified_share(h)
125 tracker.verified.add(tracker.shares[h])
126 print " ...done loading %i shares!" % (len(tracker.shares),)
128 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
129 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
130 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
132 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
134 pre_current_work = variable.Variable(None)
135 pre_merged_work = variable.Variable(None)
136 # information affecting work that should trigger a long-polling update
137 current_work = variable.Variable(None)
138 # information affecting work that should not trigger a long-polling update
139 current_work2 = variable.Variable(None)
141 requested = expiring_dict.ExpiringDict(300)
143 @defer.inlineCallbacks
144 def set_real_work1():
145 work = yield getwork(bitcoind)
146 current_work2.set(dict(
148 transactions=work['transactions'],
149 subsidy=work['subsidy'],
150 clock_offset=time.time() - work['time'],
151 last_update=time.time(),
152 )) # second set first because everything hooks on the first
153 pre_current_work.set(dict(
154 version=work['version'],
155 previous_block=work['previous_block_hash'],
159 def set_real_work2():
160 best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
162 t = dict(pre_current_work.value)
163 t['best_share_hash'] = best
164 t['aux_work'] = pre_merged_work.value
168 for peer2, share_hash in desired:
169 if share_hash not in tracker.tails: # was received in the time tracker.think was running
171 last_request_time, count = requested.get(share_hash, (None, 0))
172 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
174 potential_peers = set()
175 for head in tracker.tails[share_hash]:
176 potential_peers.update(peer_heads.get(head, set()))
177 potential_peers = [peer for peer in potential_peers if peer.connected2]
178 if count == 0 and peer2 is not None and peer2.connected2:
181 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
185 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
189 stops=list(set(tracker.heads) | set(
190 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
193 requested[share_hash] = t, count + 1
194 pre_current_work.changed.watch(lambda _: set_real_work2())
196 print 'Initializing work...'
197 yield set_real_work1()
201 pre_merged_work.changed.watch(lambda _: set_real_work2())
202 ht.updated.watch(set_real_work2)
204 merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
206 @defer.inlineCallbacks
207 def set_merged_work():
209 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
210 pre_merged_work.set(dict(
211 hash=int(auxblock['hash'], 16),
212 target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
213 chain_id=auxblock['chainid'],
215 yield deferral.sleep(1)
216 if merged_proxy is not None:
219 @pre_merged_work.changed.watch
220 def _(new_merged_work):
221 print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
223 start_time = time.time() - current_work2.value['clock_offset']
225 # setup p2p logic and join p2pool network
227 def p2p_shares(shares, peer=None):
229 print 'Processing %i shares...' % (len(shares),)
233 if share.hash in tracker.shares:
234 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
239 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
243 if shares and peer is not None:
244 peer_heads.setdefault(shares[0].hash, set()).add(peer)
250 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
252 @tracker.verified.added.watch
254 if share.pow_hash <= share.header['bits'].target:
255 if factory.conn.value is not None:
256 factory.conn.value.send_block(block=share.as_block(tracker))
258 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
260 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
262 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
264 def p2p_share_hashes(share_hashes, peer):
267 for share_hash in share_hashes:
268 if share_hash in tracker.shares:
270 last_request_time, count = requested.get(share_hash, (None, 0))
271 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
273 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
274 get_hashes.append(share_hash)
275 requested[share_hash] = t, count + 1
277 if share_hashes and peer is not None:
278 peer_heads.setdefault(share_hashes[0], set()).add(peer)
280 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
282 def p2p_get_shares(share_hashes, parents, stops, peer):
283 parents = min(parents, 1000//len(share_hashes))
286 for share_hash in share_hashes:
287 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
288 if share.hash in stops:
291 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
292 peer.sendShares(shares)
294 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
298 ip, port = x.split(':')
301 return x, net.P2P_PORT
304 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
306 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
308 print >>sys.stderr, "error reading addrs"
311 best_share_hash_func=lambda: current_work.value['best_share_hash'],
312 port=args.p2pool_port,
315 preferred_addrs=set(map(parse, args.p2pool_nodes)) | set(map(parse, net.BOOTSTRAP_ADDRS)),
317 p2p_node.handle_shares = p2p_shares
318 p2p_node.handle_share_hashes = p2p_share_hashes
319 p2p_node.handle_get_shares = p2p_get_shares
324 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
325 task.LoopingCall(save_addrs).start(60)
327 # send share when the chain changes to their chain
328 def work_changed(new_work):
329 #print 'Work changed:', new_work
331 for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
332 if share.hash in shared_share_hashes:
334 shared_share_hashes.add(share.hash)
337 for peer in p2p_node.peers.itervalues():
338 peer.sendShares([share for share in shares if share.peer is not peer])
340 current_work.changed.watch(work_changed)
343 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
345 if share.hash in tracker.verified.shares:
346 ss.add_verified_hash(share.hash)
347 task.LoopingCall(save_shares).start(60)
352 @defer.inlineCallbacks
356 is_lan, lan_ip = yield ipdiscover.get_local_ip()
358 pm = yield portmapper.get_port_mapper()
359 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
360 except defer.TimeoutError:
364 log.err(None, "UPnP error:")
365 yield deferral.sleep(random.expovariate(1/120))
370 # start listening for workers with a JSON-RPC server
372 print 'Listening for workers on port %i...' % (args.worker_port,)
376 removed_unstales_var = variable.Variable((0, 0, 0))
377 @tracker.verified.removed.watch
379 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
380 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
381 removed_unstales_var.set((
382 removed_unstales_var.value[0] + 1,
383 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
384 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
387 removed_doa_unstales_var = variable.Variable(0)
388 @tracker.verified.removed.watch
390 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
391 removed_doa_unstales.set(removed_doa_unstales.value + 1)
393 stale_counter = skiplists.SumSkipList(tracker, lambda share: (
394 1 if share.hash in my_share_hashes else 0,
395 1 if share.hash in my_doa_share_hashes else 0,
396 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
397 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
398 ), (0, 0, 0, 0), math.add_tuples)
399 def get_stale_counts():
400 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
401 my_shares = len(my_share_hashes)
402 my_doa_shares = len(my_doa_share_hashes)
403 my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
404 current_work.value['best_share_hash'],
405 tracker.verified.get_height(current_work.value['best_share_hash']),
407 my_shares_in_chain += removed_unstales_var.value[0]
408 my_doa_shares_in_chain += removed_doa_unstales_var.value
409 orphans_recorded_in_chain += removed_unstales_var.value[1]
410 doas_recorded_in_chain += removed_unstales_var.value[2]
412 my_shares_not_in_chain = my_shares - my_shares_in_chain
413 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
415 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
417 my_share_hashes = set()
418 my_doa_share_hashes = set()
420 class WorkerBridge(worker_interface.WorkerBridge):
422 worker_interface.WorkerBridge.__init__(self)
423 self.new_work_event = current_work.changed
425 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
427 def _get_payout_script_from_username(self, user):
431 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
434 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
436 def preprocess_request(self, request):
437 payout_script = self._get_payout_script_from_username(request.getUser())
438 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
439 payout_script = my_script
440 return payout_script,
442 def get_work(self, payout_script):
443 if len(p2p_node.peers) == 0 and net.PERSIST:
444 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
445 if current_work.value['best_share_hash'] is None and net.PERSIST:
446 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
447 if time.time() > current_work2.value['last_update'] + 60:
448 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
450 share_info, generate_tx = p2pool_data.generate_transaction(
453 previous_share_hash=current_work.value['best_share_hash'],
454 coinbase='' if current_work.value['aux_work'] is None else
455 '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
456 nonce=struct.pack('<Q', random.randrange(2**64)),
457 new_script=payout_script,
458 subsidy=current_work2.value['subsidy'],
459 donation=math.perfect_round(65535*args.donation_percentage/100),
460 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
461 253 if orphans > orphans_recorded_in_chain else
462 254 if doas > doas_recorded_in_chain else
464 )(*get_stale_counts()),
466 block_target=current_work.value['bits'].target,
467 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
471 print 'New work for worker! Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
472 bitcoin_data.target_to_difficulty(share_info['bits'].target),
473 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
474 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
475 len(current_work2.value['transactions']),
478 target = 2**256//2**32//8 - 1
479 target = max(target, share_info['bits'].target)
480 if current_work.value['aux_work']:
481 target = max(target, current_work.value['aux_work']['target'])
483 transactions = [generate_tx] + list(current_work2.value['transactions'])
484 merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
485 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target
487 return bitcoin_getwork.BlockAttempt(
488 version=current_work.value['version'],
489 previous_block=current_work.value['previous_block'],
490 merkle_root=merkle_root,
491 timestamp=current_work2.value['time'],
492 bits=current_work.value['bits'],
496 def got_response(self, header, request):
497 # match up with transactions
498 if header['merkle_root'] not in self.merkle_root_to_transactions:
499 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
501 share_info, transactions, getwork_time, aux_work, target = self.merkle_root_to_transactions[header['merkle_root']]
503 pow_hash = net.PARENT.POW_FUNC(header)
504 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
507 if pow_hash <= header['bits'].target or p2pool.DEBUG:
508 if factory.conn.value is not None:
509 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
511 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
512 if pow_hash <= header['bits'].target:
514 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.block_header_type.hash256(header),)
516 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.block_header_type.hash256(header),) })
518 log.err(None, 'Error while processing potential block:')
521 if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
522 assert bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex') == transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex')
523 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
524 bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex'),
525 bitcoin_data.aux_pow_type.pack(dict(
528 block_hash=bitcoin_data.block_header_type.hash256(header),
529 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
534 parent_block_header=header,
539 if result != (pow_hash <= aux_work['target']):
540 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
542 print 'Merged block submittal result: %s' % (result,)
545 log.err(err, 'Error submitting merged block:')
547 log.err(None, 'Error while processing merged mining POW:')
549 if pow_hash <= share_info['bits'].target:
550 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
551 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
553 p2pool_data.format_hash(share.hash),
554 p2pool_data.format_hash(share.previous_hash),
555 time.time() - getwork_time,
556 ' DEAD ON ARRIVAL' if not on_time else '',
558 my_share_hashes.add(share.hash)
560 my_doa_share_hashes.add(share.hash)
563 if pow_hash <= target:
564 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
566 if pow_hash > target:
567 print 'Worker submitted share with hash > target:'
568 print ' Hash: %56x' % (pow_hash,)
569 print ' Target: %56x' % (target,)
573 web_root = resource.Resource()
574 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
577 if tracker.get_height(current_work.value['best_share_hash']) < 720:
578 return json.dumps(None)
579 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
580 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
583 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
584 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
586 for script in sorted(weights, key=lambda s: weights[s]):
587 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
588 return json.dumps(res)
590 def get_current_txouts():
592 tmp_tag = str(random.randrange(2**64))
593 outputs = wb.merkle_root_to_transactions[wb.get_work(tmp_tag).merkle_root][1][0]['tx_outs']
594 total = sum(out['value'] for out in outputs)
595 total_without_tag = sum(out['value'] for out in outputs if out['script'] != tmp_tag)
596 total_diff = total - total_without_tag
597 return dict((out['script'], out['value'] + math.perfect_round(out['value']*total_diff/total)) for out in outputs if out['script'] != tmp_tag and out['value'])
599 def get_current_scaled_txouts(scale, trunc=0):
600 txouts = get_current_txouts()
601 total = sum(txouts.itervalues())
602 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
606 for s in sorted(results, key=results.__getitem__):
607 total_random += results[s]
609 if total_random >= trunc and results[s] >= trunc:
611 winner = math.weighted_choice((script, results[script]) for script in random_set)
612 for script in random_set:
614 results[winner] = total_random
615 if sum(results.itervalues()) < int(scale):
616 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
619 def get_current_payouts():
620 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
622 def get_patron_sendmany(this):
625 this, trunc = this.split('/', 1)
628 return json.dumps(dict(
629 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
630 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
631 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
634 return json.dumps(None)
636 def get_global_stats():
637 # averaged over last hour
638 lookbehind = 3600//net.SHARE_PERIOD
639 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
642 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
643 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
644 return json.dumps(dict(
645 pool_nonstale_hash_rate=nonstale_hash_rate,
646 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
647 pool_stale_prop=stale_prop,
650 def get_local_stats():
651 lookbehind = 3600//net.SHARE_PERIOD
652 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
655 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
657 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
658 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
659 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
660 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
661 my_stale_count = my_orphan_count + my_doa_count
663 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
665 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
666 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
667 if share.hash in my_share_hashes)
668 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
669 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
670 share_att_s = my_work / actual_time
672 return json.dumps(dict(
673 my_hash_rates_in_last_hour=dict(
674 nonstale=share_att_s,
675 rewarded=share_att_s/(1 - global_stale_prop),
676 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
678 my_share_counts_in_last_hour=dict(
679 shares=my_share_count,
680 unstale_shares=my_unstale_count,
681 stale_shares=my_stale_count,
682 orphan_stale_shares=my_orphan_count,
683 doa_stale_shares=my_doa_count,
685 my_stale_proportions_in_last_hour=dict(
687 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
688 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
692 def get_peer_addresses():
693 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
695 class WebInterface(resource.Resource):
696 def __init__(self, func, mime_type, *fields):
697 self.func, self.mime_type, self.fields = func, mime_type, fields
699 def render_GET(self, request):
700 request.setHeader('Content-Type', self.mime_type)
701 request.setHeader('Access-Control-Allow-Origin', '*')
702 return self.func(*(request.args[field][0] for field in self.fields))
704 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
705 web_root.putChild('users', WebInterface(get_users, 'application/json'))
706 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
707 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
708 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
709 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
710 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
711 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
712 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
713 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
715 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
717 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
718 web_root.putChild('graphs', grapher.get_resource())
720 if tracker.get_height(current_work.value['best_share_hash']) < 720:
722 grapher.add_poolrate_point(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
723 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
724 task.LoopingCall(add_point).start(100)
726 reactor.listenTCP(args.worker_port, server.Site(web_root))
732 @defer.inlineCallbacks
735 flag = factory.new_block.get_deferred()
737 yield set_real_work1()
740 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
745 print 'Started successfully!'
749 if hasattr(signal, 'SIGALRM'):
750 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
751 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
753 signal.siginterrupt(signal.SIGALRM, False)
754 task.LoopingCall(signal.alarm, 30).start(1)
756 @defer.inlineCallbacks
761 yield deferral.sleep(3)
763 if time.time() > current_work2.value['last_update'] + 60:
764 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
765 if current_work.value['best_share_hash'] is not None:
766 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
768 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
769 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
770 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
771 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
772 real_att_s = att_s / (1 - stale_prop)
773 my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
774 this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
775 math.format(int(real_att_s)),
777 len(tracker.verified.shares),
779 weights.get(my_script, 0)/total_weight*100,
780 math.format(int(my_att_s)),
785 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
786 this_str += '\nAverage time between blocks: %.2f days' % (
787 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
789 this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
792 stale_shares = stale_orphan_shares + stale_doa_shares
793 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
795 this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
796 if this_str != last_str or time.time() > last_time + 15:
799 last_time = time.time()
804 log.err(None, 'Fatal error:')
807 class FixedArgumentParser(argparse.ArgumentParser):
808 def _read_args_from_files(self, arg_strings):
809 # expand arguments referencing files
811 for arg_string in arg_strings:
813 # for regular arguments, just add them back into the list
814 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
815 new_arg_strings.append(arg_string)
817 # replace arguments referencing files with the file content
820 args_file = open(arg_string[1:])
823 for arg_line in args_file.read().splitlines():
824 for arg in self.convert_arg_line_to_args(arg_line):
825 arg_strings.append(arg)
826 arg_strings = self._read_args_from_files(arg_strings)
827 new_arg_strings.extend(arg_strings)
831 err = sys.exc_info()[1]
834 # return the modified argument list
835 return new_arg_strings
837 def convert_arg_line_to_args(self, arg_line):
838 return [arg for arg in arg_line.split() if arg.strip()]
840 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
841 parser.add_argument('--version', action='version', version=p2pool.__version__)
842 parser.add_argument('--net',
843 help='use specified network (default: bitcoin)',
844 action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
845 parser.add_argument('--testnet',
846 help='''use the network's testnet''',
847 action='store_const', const=True, default=False, dest='testnet')
848 parser.add_argument('--debug',
849 help='enable debugging mode',
850 action='store_const', const=True, default=False, dest='debug')
851 parser.add_argument('-a', '--address',
852 help='generate payouts to this address (default: <address requested from bitcoind>)',
853 type=str, action='store', default=None, dest='address')
854 parser.add_argument('--logfile',
855 help='''log to this file (default: data/<NET>/log)''',
856 type=str, action='store', default=None, dest='logfile')
857 parser.add_argument('--merged-url',
858 help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
859 type=str, action='store', default=None, dest='merged_url')
860 parser.add_argument('--merged-userpass',
861 help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
862 type=str, action='store', default=None, dest='merged_userpass')
863 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
864 help='donate this percentage of work to author of p2pool (default: 0.5)',
865 type=float, action='store', default=0.5, dest='donation_percentage')
867 p2pool_group = parser.add_argument_group('p2pool interface')
868 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
869 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
870 type=int, action='store', default=None, dest='p2pool_port')
871 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
872 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
873 type=str, action='append', default=[], dest='p2pool_nodes')
874 parser.add_argument('--disable-upnp',
875 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
876 action='store_false', default=True, dest='upnp')
878 worker_group = parser.add_argument_group('worker interface')
879 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
880 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
881 type=int, action='store', default=None, dest='worker_port')
882 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
883 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
884 type=float, action='store', default=0, dest='worker_fee')
886 bitcoind_group = parser.add_argument_group('bitcoind interface')
887 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
888 help='connect to this address (default: 127.0.0.1)',
889 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
890 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
891 help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
892 type=int, action='store', default=None, dest='bitcoind_rpc_port')
893 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
894 help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
895 type=int, action='store', default=None, dest='bitcoind_p2p_port')
897 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
898 help='bitcoind RPC interface username (default: <empty>)',
899 type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
900 bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
901 help='bitcoind RPC interface password',
902 type=str, action='store', dest='bitcoind_rpc_password')
904 args = parser.parse_args()
909 net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
911 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
912 if not os.path.exists(datadir_path):
913 os.makedirs(datadir_path)
915 if args.logfile is None:
916 args.logfile = os.path.join(datadir_path, 'log')
918 class EncodeReplacerPipe(object):
919 def __init__(self, inner_file):
920 self.inner_file = inner_file
922 def write(self, data):
923 if isinstance(data, unicode):
925 data = data.encode(self.inner_file.encoding, 'replace')
927 data = data.encode('ascii', 'replace')
928 self.inner_file.write(data)
930 self.inner_file.flush()
931 class LogFile(object):
932 def __init__(self, filename):
933 self.filename = filename
934 self.inner_file = None
937 if self.inner_file is not None:
938 self.inner_file.close()
939 open(self.filename, 'a').close()
940 f = open(self.filename, 'rb')
941 f.seek(0, os.SEEK_END)
943 if length > 100*1000*1000:
944 f.seek(-1000*1000, os.SEEK_END)
946 if f.read(1) in ('', '\n'):
950 f = open(self.filename, 'wb')
953 self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
954 def write(self, data):
955 self.inner_file.write(data)
957 self.inner_file.flush()
958 class TeePipe(object):
959 def __init__(self, outputs):
960 self.outputs = outputs
961 def write(self, data):
962 for output in self.outputs:
965 for output in self.outputs:
967 class TimestampingPipe(object):
968 def __init__(self, inner_file):
969 self.inner_file = inner_file
972 def write(self, data):
973 buf = self.buf + data
974 lines = buf.split('\n')
975 for line in lines[:-1]:
976 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
977 self.inner_file.flush()
981 class AbortPipe(object):
982 def __init__(self, inner_file):
983 self.inner_file = inner_file
985 def write(self, data):
987 self.inner_file.write(data)
989 sys.stdout = sys.__stdout__
990 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
993 self.inner_file.flush()
994 class PrefixPipe(object):
995 def __init__(self, inner_file, prefix):
996 self.inner_file = inner_file
1000 def write(self, data):
1001 buf = self.buf + data
1002 lines = buf.split('\n')
1003 for line in lines[:-1]:
1004 self.inner_file.write(self.prefix + line + '\n')
1005 self.inner_file.flush()
1006 self.buf = lines[-1]
1009 logfile = LogFile(args.logfile)
1010 pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
1011 sys.stdout = AbortPipe(pipe)
1012 sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
1013 if hasattr(signal, "SIGUSR1"):
1014 def sigusr1(signum, frame):
1015 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1017 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1018 signal.signal(signal.SIGUSR1, sigusr1)
1019 task.LoopingCall(logfile.reopen).start(5)
1021 if args.bitcoind_rpc_port is None:
1022 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1024 if args.bitcoind_p2p_port is None:
1025 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1027 if args.p2pool_port is None:
1028 args.p2pool_port = net.P2P_PORT
1030 if args.worker_port is None:
1031 args.worker_port = net.WORKER_PORT
1033 if args.address is not None:
1035 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1036 except Exception, e:
1037 parser.error('error parsing address: ' + repr(e))
1039 args.pubkey_hash = None
1041 if (args.merged_url is None) ^ (args.merged_userpass is None):
1042 parser.error('must specify --merged-url and --merged-userpass')
1044 reactor.callWhenRunning(main, args, net, datadir_path)