1 from __future__ import division
16 from twisted.internet import defer, error, reactor, protocol, task
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface
23 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
24 from . import p2p, networks, graphs
25 import p2pool, p2pool.data as p2pool_data
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 packed_transactions = [x.decode('hex') for x in work['transactions']]
32 defer.returnValue(dict(
33 version=work['version'],
34 previous_block_hash=int(work['previousblockhash'], 16),
35 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
36 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
43 @defer.inlineCallbacks
44 def main(args, net, datadir_path, merged_urls):
46 print 'p2pool (version %s)' % (p2pool.__version__,)
52 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
55 # connect to bitcoind over JSON-RPC and do initial getmemorypool
56 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
57 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
58 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
59 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
61 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 temp_work = yield getwork(bitcoind)
65 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
68 # connect to bitcoind over bitcoin-p2p
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin_p2p.ClientFactory(net.PARENT)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 yield factory.getProtocol() # waits until handshake is successful
76 print 'Determining payout script...'
77 if args.pubkey_hash is None:
78 address_path = os.path.join(datadir_path, 'cached_payout_address')
80 if os.path.exists(address_path):
81 with open(address_path, 'rb') as f:
82 address = f.read().strip('\r\n')
83 print ' Loaded cached address: %s...' % (address,)
87 if address is not None:
88 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
89 if not res['isvalid'] or not res['ismine']:
90 print ' Cached address is either invalid or not controlled by local bitcoind!'
94 print ' Getting payout address from bitcoind...'
95 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
97 with open(address_path, 'wb') as f:
100 my_script = bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net.PARENT))
102 print ' ...Computing payout script from provided address...'
103 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
104 print ' ...success! Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
107 my_share_hashes = set()
108 my_doa_share_hashes = set()
110 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
111 shared_share_hashes = set()
112 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
113 known_verified = set()
115 print "Loading shares..."
116 for i, (mode, contents) in enumerate(ss.get_shares()):
118 if contents.hash in tracker.shares:
120 shared_share_hashes.add(contents.hash)
121 contents.time_seen = 0
122 tracker.add(contents)
123 if len(tracker.shares) % 1000 == 0 and tracker.shares:
124 print " %i" % (len(tracker.shares),)
125 elif mode == 'verified_hash':
126 known_verified.add(contents)
128 raise AssertionError()
129 print " ...inserting %i verified shares..." % (len(known_verified),)
130 for h in known_verified:
131 if h not in tracker.shares:
132 ss.forget_verified_share(h)
134 tracker.verified.add(tracker.shares[h])
135 print " ...done loading %i shares!" % (len(tracker.shares),)
137 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
138 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
139 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
141 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
143 pre_current_work = variable.Variable(None)
144 pre_merged_work = variable.Variable({})
145 # information affecting work that should trigger a long-polling update
146 current_work = variable.Variable(None)
147 # information affecting work that should not trigger a long-polling update
148 current_work2 = variable.Variable(None)
150 requested = expiring_dict.ExpiringDict(300)
152 @defer.inlineCallbacks
153 def set_real_work1():
154 work = yield getwork(bitcoind)
155 current_work2.set(dict(
157 transactions=work['transactions'],
158 merkle_branch=work['merkle_branch'],
159 subsidy=work['subsidy'],
160 clock_offset=time.time() - work['time'],
161 last_update=time.time(),
162 )) # second set first because everything hooks on the first
163 pre_current_work.set(dict(
164 version=work['version'],
165 previous_block=work['previous_block_hash'],
167 coinbaseflags=work['coinbaseflags'],
170 if '\ngetblock ' in (yield bitcoind.rpc_help()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172 def get_height_rel_highest(block_hash):
173 return height_cacher.call_now(block_hash, 0) - height_cacher.call_now(pre_current_work.value['previous_block'], 1000000000)
175 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory).get_height_rel_highest
177 def set_real_work2():
178 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
180 t = dict(pre_current_work.value)
181 t['best_share_hash'] = best
182 t['mm_chains'] = pre_merged_work.value
186 for peer2, share_hash in desired:
187 if share_hash not in tracker.tails: # was received in the time tracker.think was running
189 last_request_time, count = requested.get(share_hash, (None, 0))
190 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
192 potential_peers = set()
193 for head in tracker.tails[share_hash]:
194 potential_peers.update(peer_heads.get(head, set()))
195 potential_peers = [peer for peer in potential_peers if peer.connected2]
196 if count == 0 and peer2 is not None and peer2.connected2:
199 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
203 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
207 stops=list(set(tracker.heads) | set(
208 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
211 requested[share_hash] = t, count + 1
212 pre_current_work.changed.watch(lambda _: set_real_work2())
214 print 'Initializing work...'
215 yield set_real_work1()
219 pre_merged_work.changed.watch(lambda _: set_real_work2())
222 @defer.inlineCallbacks
223 def set_merged_work(merged_url, merged_userpass):
224 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
227 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
228 hash=int(auxblock['hash'], 16),
229 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
230 merged_proxy=merged_proxy,
232 yield deferral.sleep(1)
233 for merged_url, merged_userpass in merged_urls:
234 set_merged_work(merged_url, merged_userpass)
236 @pre_merged_work.changed.watch
237 def _(new_merged_work):
238 print 'Got new merged mining work!'
240 # setup p2p logic and join p2pool network
242 class Node(p2p.Node):
243 def handle_shares(self, shares, peer):
245 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
249 if share.hash in tracker.shares:
250 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
255 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
259 if shares and peer is not None:
260 peer_heads.setdefault(shares[0].hash, set()).add(peer)
266 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268 def handle_share_hashes(self, hashes, peer):
271 for share_hash in hashes:
272 if share_hash in tracker.shares:
274 last_request_time, count = requested.get(share_hash, (None, 0))
275 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
278 get_hashes.append(share_hash)
279 requested[share_hash] = t, count + 1
281 if hashes and peer is not None:
282 peer_heads.setdefault(hashes[0], set()).add(peer)
284 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286 def handle_get_shares(self, hashes, parents, stops, peer):
287 parents = min(parents, 1000//len(hashes))
290 for share_hash in hashes:
291 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
292 if share.hash in stops:
295 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
296 peer.sendShares(shares)
298 @tracker.verified.added.watch
300 if share.pow_hash <= share.header['bits'].target:
301 if factory.conn.value is not None:
302 factory.conn.value.send_block(block=share.as_block(tracker))
304 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
306 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
308 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
310 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
312 @defer.inlineCallbacks
315 ip, port = x.split(':')
316 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
318 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
321 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
323 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
325 print >>sys.stderr, "error reading addrs"
326 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
329 if addr not in addrs:
330 addrs[addr] = (0, time.time(), time.time())
334 connect_addrs = set()
335 for addr_df in map(parse, args.p2pool_nodes):
337 connect_addrs.add((yield addr_df))
342 best_share_hash_func=lambda: current_work.value['best_share_hash'],
343 port=args.p2pool_port,
346 connect_addrs=connect_addrs,
351 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
352 task.LoopingCall(save_addrs).start(60)
354 # send share when the chain changes to their chain
355 def work_changed(new_work):
356 #print 'Work changed:', new_work
358 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
359 if share.hash in shared_share_hashes:
361 shared_share_hashes.add(share.hash)
364 for peer in p2p_node.peers.itervalues():
365 peer.sendShares([share for share in shares if share.peer is not peer])
367 current_work.changed.watch(work_changed)
370 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
372 if share.hash in tracker.verified.shares:
373 ss.add_verified_hash(share.hash)
374 task.LoopingCall(save_shares).start(60)
379 start_time = time.time()
381 @defer.inlineCallbacks
385 is_lan, lan_ip = yield ipdiscover.get_local_ip()
387 pm = yield portmapper.get_port_mapper()
388 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
389 except defer.TimeoutError:
393 log.err(None, "UPnP error:")
394 yield deferral.sleep(random.expovariate(1/120))
399 # start listening for workers with a JSON-RPC server
401 print 'Listening for workers on port %i...' % (args.worker_port,)
403 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
404 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
405 vip_pass = f.read().strip('\r\n')
407 vip_pass = '%016x' % (random.randrange(2**64),)
408 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
410 print ' Worker password:', vip_pass, '(only required for generating graphs)'
414 removed_unstales_var = variable.Variable((0, 0, 0))
415 @tracker.verified.removed.watch
417 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
418 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
419 removed_unstales_var.set((
420 removed_unstales_var.value[0] + 1,
421 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
422 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
425 removed_doa_unstales_var = variable.Variable(0)
426 @tracker.verified.removed.watch
428 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
429 removed_doa_unstales.set(removed_doa_unstales.value + 1)
431 def get_stale_counts():
432 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
433 my_shares = len(my_share_hashes)
434 my_doa_shares = len(my_doa_share_hashes)
435 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
436 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
437 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
438 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
439 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
441 my_shares_not_in_chain = my_shares - my_shares_in_chain
442 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
444 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
447 recent_shares_ts_work2 = []
449 class WorkerBridge(worker_interface.WorkerBridge):
451 worker_interface.WorkerBridge.__init__(self)
452 self.new_work_event = current_work.changed
454 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
455 self.recent_shares_ts_work = []
457 def _get_payout_script_from_username(self, user):
461 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
464 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
466 def preprocess_request(self, request):
467 payout_script = self._get_payout_script_from_username(request.getUser())
468 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
469 payout_script = my_script
470 return payout_script,
472 def get_work(self, payout_script):
473 if len(p2p_node.peers) == 0 and net.PERSIST:
474 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
475 if current_work.value['best_share_hash'] is None and net.PERSIST:
476 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
477 if time.time() > current_work2.value['last_update'] + 60:
478 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
480 if current_work.value['mm_chains']:
481 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
482 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
483 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
484 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
488 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
493 share_info, generate_tx = p2pool_data.generate_transaction(
496 previous_share_hash=current_work.value['best_share_hash'],
497 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
498 nonce=struct.pack('<Q', random.randrange(2**64)),
499 new_script=payout_script,
500 subsidy=current_work2.value['subsidy'],
501 donation=math.perfect_round(65535*args.donation_percentage/100),
502 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
503 253 if orphans > orphans_recorded_in_chain else
504 254 if doas > doas_recorded_in_chain else
506 )(*get_stale_counts()),
508 block_target=current_work.value['bits'].target,
509 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
513 target = 2**256//2**32 - 1
514 if len(self.recent_shares_ts_work) == 50:
515 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
516 target = min(target, 2**256//(hash_rate * 5))
517 target = max(target, share_info['bits'].target)
518 for aux_work in current_work.value['mm_chains'].itervalues():
519 target = max(target, aux_work['target'])
521 transactions = [generate_tx] + list(current_work2.value['transactions'])
522 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
523 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
525 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
526 bitcoin_data.target_to_difficulty(target),
527 bitcoin_data.target_to_difficulty(share_info['bits'].target),
528 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
529 len(current_work2.value['transactions']),
532 return bitcoin_getwork.BlockAttempt(
533 version=current_work.value['version'],
534 previous_block=current_work.value['previous_block'],
535 merkle_root=merkle_root,
536 timestamp=current_work2.value['time'],
537 bits=current_work.value['bits'],
541 def got_response(self, header, request):
542 # match up with transactions
543 if header['merkle_root'] not in self.merkle_root_to_transactions:
544 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
546 share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
548 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
549 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
552 if pow_hash <= header['bits'].target or p2pool.DEBUG:
553 if factory.conn.value is not None:
554 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
556 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
557 if pow_hash <= header['bits'].target:
559 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
561 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
563 log.err(None, 'Error while processing potential block:')
565 for aux_work, index, hashes in mm_later:
567 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
568 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
569 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
570 bitcoin_data.aux_pow_type.pack(dict(
573 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
574 merkle_branch=merkle_branch,
577 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
579 parent_block_header=header,
584 if result != (pow_hash <= aux_work['target']):
585 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
587 print 'Merged block submittal result: %s' % (result,)
590 log.err(err, 'Error submitting merged block:')
592 log.err(None, 'Error while processing merged mining POW:')
594 if pow_hash <= share_info['bits'].target:
595 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
596 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
598 p2pool_data.format_hash(share.hash),
599 p2pool_data.format_hash(share.previous_hash),
600 time.time() - getwork_time,
601 ' DEAD ON ARRIVAL' if not on_time else '',
603 my_share_hashes.add(share.hash)
605 my_doa_share_hashes.add(share.hash)
609 tracker.verified.add(share)
613 if pow_hash <= header['bits'].target or p2pool.DEBUG:
614 for peer in p2p_node.peers.itervalues():
615 peer.sendShares([share])
616 shared_share_hashes.add(share.hash)
618 log.err(None, 'Error forwarding block solution:')
620 if pow_hash <= target:
621 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
622 if request.getPassword() == vip_pass:
623 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
624 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
625 while len(self.recent_shares_ts_work) > 50:
626 self.recent_shares_ts_work.pop(0)
627 recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
630 if pow_hash > target:
631 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
632 print ' Hash: %56x' % (pow_hash,)
633 print ' Target: %56x' % (target,)
637 web_root = resource.Resource()
638 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
641 if tracker.get_height(current_work.value['best_share_hash']) < 720:
642 return json.dumps(None)
643 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
644 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
647 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
648 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
650 for script in sorted(weights, key=lambda s: weights[s]):
651 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
652 return json.dumps(res)
654 def get_current_txouts():
655 share = tracker.shares[current_work.value['best_share_hash']]
656 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
657 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
659 def get_current_scaled_txouts(scale, trunc=0):
660 txouts = get_current_txouts()
661 total = sum(txouts.itervalues())
662 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
666 for s in sorted(results, key=results.__getitem__):
667 if results[s] >= trunc:
669 total_random += results[s]
672 winner = math.weighted_choice((script, results[script]) for script in random_set)
673 for script in random_set:
675 results[winner] = total_random
676 if sum(results.itervalues()) < int(scale):
677 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
680 def get_current_payouts():
681 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
683 def get_patron_sendmany(this):
686 this, trunc = this.split('/', 1)
689 return json.dumps(dict(
690 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
691 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
692 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
695 return json.dumps(None)
697 def get_global_stats():
698 # averaged over last hour
699 lookbehind = 3600//net.SHARE_PERIOD
700 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
703 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
704 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
705 return json.dumps(dict(
706 pool_nonstale_hash_rate=nonstale_hash_rate,
707 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
708 pool_stale_prop=stale_prop,
711 def get_local_stats():
712 lookbehind = 3600//net.SHARE_PERIOD
713 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
716 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
718 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
719 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
720 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
721 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
722 my_stale_count = my_orphan_count + my_doa_count
724 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
726 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
727 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
728 if share.hash in my_share_hashes)
729 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
730 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
731 share_att_s = my_work / actual_time
733 return json.dumps(dict(
734 my_hash_rates_in_last_hour=dict(
735 nonstale=share_att_s,
736 rewarded=share_att_s/(1 - global_stale_prop),
737 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
739 my_share_counts_in_last_hour=dict(
740 shares=my_share_count,
741 unstale_shares=my_unstale_count,
742 stale_shares=my_stale_count,
743 orphan_stale_shares=my_orphan_count,
744 doa_stale_shares=my_doa_count,
746 my_stale_proportions_in_last_hour=dict(
748 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
749 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
753 def get_peer_addresses():
754 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
757 return json.dumps(time.time() - start_time)
759 class WebInterface(resource.Resource):
760 def __init__(self, func, mime_type, *fields):
761 self.func, self.mime_type, self.fields = func, mime_type, fields
763 def render_GET(self, request):
764 request.setHeader('Content-Type', self.mime_type)
765 request.setHeader('Access-Control-Allow-Origin', '*')
766 return self.func(*(request.args[field][0] for field in self.fields))
768 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
769 web_root.putChild('users', WebInterface(get_users, 'application/json'))
770 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
771 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
772 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
773 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
774 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
775 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
776 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
777 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
778 web_root.putChild('uptime', WebInterface(get_uptime, 'application/json'))
780 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
782 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
783 web_root.putChild('graphs', grapher.get_resource())
785 if tracker.get_height(current_work.value['best_share_hash']) < 720:
787 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
788 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
789 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
790 task.LoopingCall(add_point).start(100)
792 def attempt_listen():
794 reactor.listenTCP(args.worker_port, server.Site(web_root))
795 except error.CannotListenError, e:
796 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
797 reactor.callLater(1, attempt_listen)
799 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
807 @defer.inlineCallbacks
810 flag = factory.new_block.get_deferred()
812 yield set_real_work1()
815 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
820 print 'Started successfully!'
824 if hasattr(signal, 'SIGALRM'):
825 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
826 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
828 signal.siginterrupt(signal.SIGALRM, False)
829 task.LoopingCall(signal.alarm, 30).start(1)
831 if args.irc_announce:
832 from twisted.words.protocols import irc
833 class IRCClient(irc.IRCClient):
835 def lineReceived(self, line):
837 irc.IRCClient.lineReceived(self, line)
839 irc.IRCClient.signedOn(self)
840 self.factory.resetDelay()
842 self.watch_id = tracker.verified.added.watch(self._new_share)
843 self.announced_hashes = set()
844 def _new_share(self, share):
845 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
846 self.announced_hashes.add(share.header_hash)
847 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.share_data['new_script'], net.PARENT), share.header_hash))
848 def connectionLost(self, reason):
849 tracker.verified.added.unwatch(self.watch_id)
850 print 'IRC connection lost:', reason.getErrorMessage()
851 class IRCClientFactory(protocol.ReconnectingClientFactory):
853 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
855 @defer.inlineCallbacks
858 first_pseudoshare_time = None
863 yield deferral.sleep(3)
865 if time.time() > current_work2.value['last_update'] + 60:
866 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
868 height = tracker.get_height(current_work.value['best_share_hash'])
869 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
871 len(tracker.verified.shares),
874 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
875 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
877 if first_pseudoshare_time is None and recent_shares_ts_work2:
878 first_pseudoshare_time = recent_shares_ts_work2[0][0]
879 while recent_shares_ts_work2 and recent_shares_ts_work2[0][0] < time.time() - average_period:
880 recent_shares_ts_work2.pop(0)
881 my_att_s = sum(work for ts, work, dead in recent_shares_ts_work2)/min(time.time() - first_pseudoshare_time, average_period) if first_pseudoshare_time is not None else 0
882 this_str += '\n Local: %sH/s (%.f min avg) Local dead on arrival: %s Expected time to share: %s' % (
883 math.format(int(my_att_s)),
884 (min(time.time() - first_pseudoshare_time, average_period) if first_pseudoshare_time is not None else 0)/60,
885 math.format_binomial_conf(sum(1 for tx, work, dead in recent_shares_ts_work2 if dead), len(recent_shares_ts_work2), 0.95),
886 '%.1f min' % (2**256 / tracker.shares[current_work.value['best_share_hash']].target / my_att_s / 60,) if my_att_s else '???',
890 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
891 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
892 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
894 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
895 shares, stale_orphan_shares, stale_doa_shares,
896 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
897 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
898 get_current_txouts().get(my_script, 0)*1e-8, net.PARENT.SYMBOL,
900 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Average time between blocks: %.2f days' % (
901 math.format(int(real_att_s)),
903 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
906 if this_str != last_str or time.time() > last_time + 15:
909 last_time = time.time()
914 log.err(None, 'Fatal error:')
918 class FixedArgumentParser(argparse.ArgumentParser):
919 def _read_args_from_files(self, arg_strings):
920 # expand arguments referencing files
922 for arg_string in arg_strings:
924 # for regular arguments, just add them back into the list
925 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
926 new_arg_strings.append(arg_string)
928 # replace arguments referencing files with the file content
931 args_file = open(arg_string[1:])
934 for arg_line in args_file.read().splitlines():
935 for arg in self.convert_arg_line_to_args(arg_line):
936 arg_strings.append(arg)
937 arg_strings = self._read_args_from_files(arg_strings)
938 new_arg_strings.extend(arg_strings)
942 err = sys.exc_info()[1]
945 # return the modified argument list
946 return new_arg_strings
948 def convert_arg_line_to_args(self, arg_line):
949 return [arg for arg in arg_line.split() if arg.strip()]
952 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
954 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
955 parser.add_argument('--version', action='version', version=p2pool.__version__)
956 parser.add_argument('--net',
957 help='use specified network (default: bitcoin)',
958 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
959 parser.add_argument('--testnet',
960 help='''use the network's testnet''',
961 action='store_const', const=True, default=False, dest='testnet')
962 parser.add_argument('--debug',
963 help='enable debugging mode',
964 action='store_const', const=True, default=False, dest='debug')
965 parser.add_argument('-a', '--address',
966 help='generate payouts to this address (default: <address requested from bitcoind>)',
967 type=str, action='store', default=None, dest='address')
968 parser.add_argument('--logfile',
969 help='''log to this file (default: data/<NET>/log)''',
970 type=str, action='store', default=None, dest='logfile')
971 parser.add_argument('--merged',
972 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
973 type=str, action='append', default=[], dest='merged_urls')
974 parser.add_argument('--merged-url',
975 help='DEPRECATED, use --merged',
976 type=str, action='store', default=None, dest='merged_url')
977 parser.add_argument('--merged-userpass',
978 help='DEPRECATED, use --merged',
979 type=str, action='store', default=None, dest='merged_userpass')
980 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
981 help='donate this percentage of work to author of p2pool (default: 0.5)',
982 type=float, action='store', default=0.5, dest='donation_percentage')
983 parser.add_argument('--irc-announce',
984 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
985 action='store_true', default=False, dest='irc_announce')
987 p2pool_group = parser.add_argument_group('p2pool interface')
988 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
989 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
990 type=int, action='store', default=None, dest='p2pool_port')
991 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
992 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
993 type=str, action='append', default=[], dest='p2pool_nodes')
994 parser.add_argument('--disable-upnp',
995 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
996 action='store_false', default=True, dest='upnp')
998 worker_group = parser.add_argument_group('worker interface')
999 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
1000 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
1001 type=int, action='store', default=None, dest='worker_port')
1002 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
1003 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
1004 type=float, action='store', default=0, dest='worker_fee')
1006 bitcoind_group = parser.add_argument_group('bitcoind interface')
1007 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
1008 help='connect to this address (default: 127.0.0.1)',
1009 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
1010 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1011 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1012 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1013 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1014 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1015 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1017 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1018 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1019 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1021 args = parser.parse_args()
1026 net_name = args.net_name + ('_testnet' if args.testnet else '')
1027 net = networks.nets[net_name]
1029 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net_name)
1030 if not os.path.exists(datadir_path):
1031 os.makedirs(datadir_path)
1033 if len(args.bitcoind_rpc_userpass) > 2:
1034 parser.error('a maximum of two arguments are allowed')
1035 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1037 if args.bitcoind_rpc_password is None:
1038 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1039 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1040 conf_path = net.PARENT.CONF_FILE_FUNC()
1041 if not os.path.exists(conf_path):
1042 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1043 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1046 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1047 with open(conf_path, 'rb') as f:
1048 cp = ConfigParser.RawConfigParser()
1049 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1050 for conf_name, var_name, var_type in [
1051 ('rpcuser', 'bitcoind_rpc_username', str),
1052 ('rpcpassword', 'bitcoind_rpc_password', str),
1053 ('rpcport', 'bitcoind_rpc_port', int),
1054 ('port', 'bitcoind_p2p_port', int),
1056 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1057 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1059 if args.bitcoind_rpc_username is None:
1060 args.bitcoind_rpc_username = ''
1062 if args.bitcoind_rpc_port is None:
1063 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1065 if args.bitcoind_p2p_port is None:
1066 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1068 if args.p2pool_port is None:
1069 args.p2pool_port = net.P2P_PORT
1071 if args.worker_port is None:
1072 args.worker_port = net.WORKER_PORT
1074 if args.address is not None:
1076 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1077 except Exception, e:
1078 parser.error('error parsing address: ' + repr(e))
1080 args.pubkey_hash = None
1082 def separate_url(url):
1083 s = urlparse.urlsplit(url)
1084 if '@' not in s.netloc:
1085 parser.error('merged url netloc must contain an "@"')
1086 userpass, new_netloc = s.netloc.rsplit('@', 1)
1087 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1088 merged_urls = map(separate_url, args.merged_urls)
1090 if args.merged_url is not None or args.merged_userpass is not None:
1091 print '--merged-url and --merged-userpass are deprecated! Use --merged http://USER:PASS@HOST:PORT/ instead!'
1092 print 'Pausing 10 seconds...'
1095 if args.merged_url is None or args.merged_userpass is None:
1096 parser.error('must specify both --merged-url and --merged-userpass')
1098 merged_urls = merged_urls + [(args.merged_url, args.merged_userpass)]
1101 if args.logfile is None:
1102 args.logfile = os.path.join(datadir_path, 'log')
1104 logfile = logging.LogFile(args.logfile)
1105 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1106 sys.stdout = logging.AbortPipe(pipe)
1107 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1108 if hasattr(signal, "SIGUSR1"):
1109 def sigusr1(signum, frame):
1110 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1112 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1113 signal.signal(signal.SIGUSR1, sigusr1)
1114 task.LoopingCall(logfile.reopen).start(5)
1116 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls)