1 from __future__ import division
16 from twisted.internet import defer, error, reactor, protocol, task
17 from twisted.web import server, resource
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface
23 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
24 from . import p2p, networks, graphs
25 import p2pool, p2pool.data as p2pool_data
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 transactions = [bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']]
32 defer.returnValue(dict(
33 version=work['version'],
34 previous_block_hash=int(work['previousblockhash'], 16),
35 transactions=transactions,
36 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + [bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)) for tx in transactions], 0),
37 subsidy=work['coinbasevalue'],
39 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
43 @defer.inlineCallbacks
44 def main(args, net, datadir_path, merged_urls):
46 print 'p2pool (version %s)' % (p2pool.__version__,)
52 print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
55 # connect to bitcoind over JSON-RPC and do initial getmemorypool
56 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
57 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
58 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
59 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
61 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 temp_work = yield getwork(bitcoind)
65 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
68 # connect to bitcoind over bitcoin-p2p
69 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
70 factory = bitcoin_p2p.ClientFactory(net.PARENT)
71 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
72 yield factory.getProtocol() # waits until handshake is successful
76 print 'Determining payout script...'
77 if args.pubkey_hash is None:
78 address_path = os.path.join(datadir_path, 'cached_payout_address')
80 if os.path.exists(address_path):
81 with open(address_path, 'rb') as f:
82 address = f.read().strip('\r\n')
83 print ' Loaded cached address: %s...' % (address,)
87 if address is not None:
88 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
89 if not res['isvalid'] or not res['ismine']:
90 print ' Cached address is either invalid or not controlled by local bitcoind!'
94 print ' Getting payout address from bitcoind...'
95 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
97 with open(address_path, 'wb') as f:
100 my_script = bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net.PARENT))
102 print ' ...Computing payout script from provided address...'
103 my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
104 print ' ...success! Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
107 my_share_hashes = set()
108 my_doa_share_hashes = set()
110 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
111 shared_share_hashes = set()
112 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
113 known_verified = set()
115 print "Loading shares..."
116 for i, (mode, contents) in enumerate(ss.get_shares()):
118 if contents.hash in tracker.shares:
120 shared_share_hashes.add(contents.hash)
121 contents.time_seen = 0
122 tracker.add(contents)
123 if len(tracker.shares) % 1000 == 0 and tracker.shares:
124 print " %i" % (len(tracker.shares),)
125 elif mode == 'verified_hash':
126 known_verified.add(contents)
128 raise AssertionError()
129 print " ...inserting %i verified shares..." % (len(known_verified),)
130 for h in known_verified:
131 if h not in tracker.shares:
132 ss.forget_verified_share(h)
134 tracker.verified.add(tracker.shares[h])
135 print " ...done loading %i shares!" % (len(tracker.shares),)
137 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
138 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
139 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
141 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
143 pre_current_work = variable.Variable(None)
144 pre_merged_work = variable.Variable({})
145 # information affecting work that should trigger a long-polling update
146 current_work = variable.Variable(None)
147 # information affecting work that should not trigger a long-polling update
148 current_work2 = variable.Variable(None)
150 requested = expiring_dict.ExpiringDict(300)
152 @defer.inlineCallbacks
153 def set_real_work1():
154 work = yield getwork(bitcoind)
155 current_work2.set(dict(
157 transactions=work['transactions'],
158 merkle_branch=work['merkle_branch'],
159 subsidy=work['subsidy'],
160 clock_offset=time.time() - work['time'],
161 last_update=time.time(),
162 )) # second set first because everything hooks on the first
163 pre_current_work.set(dict(
164 version=work['version'],
165 previous_block=work['previous_block_hash'],
167 coinbaseflags=work['coinbaseflags'],
170 if '\ngetblock ' in (yield bitcoind.rpc_help()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172 def get_height_rel_highest(block_hash):
173 return height_cacher.call_now(block_hash, 0) - height_cacher.call_now(pre_current_work.value['previous_block'], 1000000000)
175 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory).get_height_rel_highest
177 def set_real_work2():
178 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'])
180 t = dict(pre_current_work.value)
181 t['best_share_hash'] = best
182 t['mm_chains'] = pre_merged_work.value
186 for peer2, share_hash in desired:
187 if share_hash not in tracker.tails: # was received in the time tracker.think was running
189 last_request_time, count = requested.get(share_hash, (None, 0))
190 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
192 potential_peers = set()
193 for head in tracker.tails[share_hash]:
194 potential_peers.update(peer_heads.get(head, set()))
195 potential_peers = [peer for peer in potential_peers if peer.connected2]
196 if count == 0 and peer2 is not None and peer2.connected2:
199 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
203 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
207 stops=list(set(tracker.heads) | set(
208 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
211 requested[share_hash] = t, count + 1
212 pre_current_work.changed.watch(lambda _: set_real_work2())
214 print 'Initializing work...'
215 yield set_real_work1()
219 pre_merged_work.changed.watch(lambda _: set_real_work2())
222 @defer.inlineCallbacks
223 def set_merged_work(merged_url, merged_userpass):
224 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
226 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
227 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
228 hash=int(auxblock['hash'], 16),
229 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
230 merged_proxy=merged_proxy,
232 yield deferral.sleep(1)
233 for merged_url, merged_userpass in merged_urls:
234 set_merged_work(merged_url, merged_userpass)
236 @pre_merged_work.changed.watch
237 def _(new_merged_work):
238 print 'Got new merged mining work!'
240 # setup p2p logic and join p2pool network
242 class Node(p2p.Node):
243 def handle_shares(self, shares, peer):
245 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
249 if share.hash in tracker.shares:
250 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
255 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
259 if shares and peer is not None:
260 peer_heads.setdefault(shares[0].hash, set()).add(peer)
266 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
268 def handle_share_hashes(self, hashes, peer):
271 for share_hash in hashes:
272 if share_hash in tracker.shares:
274 last_request_time, count = requested.get(share_hash, (None, 0))
275 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
277 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
278 get_hashes.append(share_hash)
279 requested[share_hash] = t, count + 1
281 if hashes and peer is not None:
282 peer_heads.setdefault(hashes[0], set()).add(peer)
284 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
286 def handle_get_shares(self, hashes, parents, stops, peer):
287 parents = min(parents, 1000//len(hashes))
290 for share_hash in hashes:
291 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
292 if share.hash in stops:
295 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
296 peer.sendShares(shares)
298 @tracker.verified.added.watch
300 if share.pow_hash <= share.header['bits'].target:
301 if factory.conn.value is not None:
302 factory.conn.value.send_block(block=share.as_block(tracker))
304 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
306 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
308 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
310 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
312 @defer.inlineCallbacks
315 ip, port = x.split(':')
316 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
318 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
321 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
323 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
325 print >>sys.stderr, "error reading addrs"
326 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
329 if addr not in addrs:
330 addrs[addr] = (0, time.time(), time.time())
334 connect_addrs = set()
335 for addr_df in map(parse, args.p2pool_nodes):
337 connect_addrs.add((yield addr_df))
342 best_share_hash_func=lambda: current_work.value['best_share_hash'],
343 port=args.p2pool_port,
346 connect_addrs=connect_addrs,
351 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
352 task.LoopingCall(save_addrs).start(60)
354 # send share when the chain changes to their chain
355 def work_changed(new_work):
356 #print 'Work changed:', new_work
358 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
359 if share.hash in shared_share_hashes:
361 shared_share_hashes.add(share.hash)
364 for peer in p2p_node.peers.itervalues():
365 peer.sendShares([share for share in shares if share.peer is not peer])
367 current_work.changed.watch(work_changed)
370 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
372 if share.hash in tracker.verified.shares:
373 ss.add_verified_hash(share.hash)
374 task.LoopingCall(save_shares).start(60)
379 @defer.inlineCallbacks
383 is_lan, lan_ip = yield ipdiscover.get_local_ip()
385 pm = yield portmapper.get_port_mapper()
386 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
387 except defer.TimeoutError:
391 log.err(None, "UPnP error:")
392 yield deferral.sleep(random.expovariate(1/120))
397 # start listening for workers with a JSON-RPC server
399 print 'Listening for workers on port %i...' % (args.worker_port,)
401 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
402 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
403 vip_pass = f.read().strip('\r\n')
405 vip_pass = '%016x' % (random.randrange(2**64),)
406 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
408 print ' Worker password:', vip_pass, '(only required for generating graphs)'
412 removed_unstales_var = variable.Variable((0, 0, 0))
413 @tracker.verified.removed.watch
415 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
416 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
417 removed_unstales_var.set((
418 removed_unstales_var.value[0] + 1,
419 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
420 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
423 removed_doa_unstales_var = variable.Variable(0)
424 @tracker.verified.removed.watch
426 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427 removed_doa_unstales.set(removed_doa_unstales.value + 1)
429 def get_stale_counts():
430 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
431 my_shares = len(my_share_hashes)
432 my_doa_shares = len(my_doa_share_hashes)
433 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
434 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
435 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
436 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
437 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
439 my_shares_not_in_chain = my_shares - my_shares_in_chain
440 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
442 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
445 recent_shares_ts_work2 = []
447 class WorkerBridge(worker_interface.WorkerBridge):
449 worker_interface.WorkerBridge.__init__(self)
450 self.new_work_event = current_work.changed
452 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
453 self.recent_shares_ts_work = []
455 def _get_payout_script_from_username(self, user):
459 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
462 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
464 def preprocess_request(self, request):
465 payout_script = self._get_payout_script_from_username(request.getUser())
466 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
467 payout_script = my_script
468 return payout_script,
470 def get_work(self, payout_script):
471 if len(p2p_node.peers) == 0 and net.PERSIST:
472 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
473 if current_work.value['best_share_hash'] is None and net.PERSIST:
474 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
475 if time.time() > current_work2.value['last_update'] + 60:
476 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
478 if current_work.value['mm_chains']:
479 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
480 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
481 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
482 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
486 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
491 share_info, generate_tx = p2pool_data.generate_transaction(
494 previous_share_hash=current_work.value['best_share_hash'],
495 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
496 nonce=struct.pack('<Q', random.randrange(2**64)),
497 new_script=payout_script,
498 subsidy=current_work2.value['subsidy'],
499 donation=math.perfect_round(65535*args.donation_percentage/100),
500 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
501 253 if orphans > orphans_recorded_in_chain else
502 254 if doas > doas_recorded_in_chain else
504 )(*get_stale_counts()),
506 block_target=current_work.value['bits'].target,
507 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
511 target = 2**256//2**32 - 1
512 if len(self.recent_shares_ts_work) == 50:
513 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
514 target = min(target, 2**256//(hash_rate * 5))
515 target = max(target, share_info['bits'].target)
516 for aux_work in current_work.value['mm_chains'].itervalues():
517 target = max(target, aux_work['target'])
519 transactions = [generate_tx] + list(current_work2.value['transactions'])
520 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(bitcoin_data.tx_type.pack(generate_tx)), 0, current_work2.value['merkle_branch'])
521 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), mm_later, target, current_work2.value['merkle_branch']
523 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
524 bitcoin_data.target_to_difficulty(target),
525 bitcoin_data.target_to_difficulty(share_info['bits'].target),
526 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
527 len(current_work2.value['transactions']),
530 return bitcoin_getwork.BlockAttempt(
531 version=current_work.value['version'],
532 previous_block=current_work.value['previous_block'],
533 merkle_root=merkle_root,
534 timestamp=current_work2.value['time'],
535 bits=current_work.value['bits'],
539 def got_response(self, header, request):
540 # match up with transactions
541 if header['merkle_root'] not in self.merkle_root_to_transactions:
542 print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
544 share_info, transactions, getwork_time, mm_later, target, merkle_branch = self.merkle_root_to_transactions[header['merkle_root']]
546 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
547 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
550 if pow_hash <= header['bits'].target or p2pool.DEBUG:
551 if factory.conn.value is not None:
552 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
554 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
555 if pow_hash <= header['bits'].target:
557 print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),)
559 recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),) })
561 log.err(None, 'Error while processing potential block:')
563 for aux_work, index, hashes in mm_later:
565 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
566 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
567 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
568 bitcoin_data.aux_pow_type.pack(dict(
571 block_hash=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)),
572 merkle_branch=merkle_branch,
575 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
577 parent_block_header=header,
582 if result != (pow_hash <= aux_work['target']):
583 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
585 print 'Merged block submittal result: %s' % (result,)
588 log.err(err, 'Error submitting merged block:')
590 log.err(None, 'Error while processing merged mining POW:')
592 if pow_hash <= share_info['bits'].target:
593 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
594 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
596 p2pool_data.format_hash(share.hash),
597 p2pool_data.format_hash(share.previous_hash),
598 time.time() - getwork_time,
599 ' DEAD ON ARRIVAL' if not on_time else '',
601 my_share_hashes.add(share.hash)
603 my_doa_share_hashes.add(share.hash)
607 tracker.verified.add(share)
611 if pow_hash <= header['bits'].target or p2pool.DEBUG:
612 for peer in p2p_node.peers.itervalues():
613 peer.sendShares([share])
614 shared_share_hashes.add(share.hash)
616 log.err(None, 'Error forwarding block solution:')
618 if pow_hash <= target:
619 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
620 if request.getPassword() == vip_pass:
621 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
622 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
623 while len(self.recent_shares_ts_work) > 50:
624 self.recent_shares_ts_work.pop(0)
625 recent_shares_ts_work2.append((time.time(), bitcoin_data.target_to_average_attempts(target), not on_time))
628 if pow_hash > target:
629 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
630 print ' Hash: %56x' % (pow_hash,)
631 print ' Target: %56x' % (target,)
635 web_root = resource.Resource()
636 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
639 if tracker.get_height(current_work.value['best_share_hash']) < 720:
640 return json.dumps(None)
641 return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
642 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
645 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
646 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
648 for script in sorted(weights, key=lambda s: weights[s]):
649 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
650 return json.dumps(res)
652 def get_current_txouts():
653 share = tracker.shares[current_work.value['best_share_hash']]
654 share_info, gentx = p2pool_data.generate_transaction(tracker, share.share_info['share_data'], share.header['bits'].target, share.share_info['timestamp'], share.net)
655 return dict((out['script'], out['value']) for out in gentx['tx_outs'])
657 def get_current_scaled_txouts(scale, trunc=0):
658 txouts = get_current_txouts()
659 total = sum(txouts.itervalues())
660 results = dict((script, value*scale//total) for script, value in txouts.iteritems())
664 for s in sorted(results, key=results.__getitem__):
665 if results[s] >= trunc:
667 total_random += results[s]
670 winner = math.weighted_choice((script, results[script]) for script in random_set)
671 for script in random_set:
673 results[winner] = total_random
674 if sum(results.itervalues()) < int(scale):
675 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
678 def get_current_payouts():
679 return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
681 def get_patron_sendmany(this):
684 this, trunc = this.split('/', 1)
687 return json.dumps(dict(
688 (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
689 for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
690 if bitcoin_data.script2_to_address(script, net.PARENT) is not None
693 return json.dumps(None)
695 def get_global_stats():
696 # averaged over last hour
697 lookbehind = 3600//net.SHARE_PERIOD
698 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
701 nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
702 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
703 return json.dumps(dict(
704 pool_nonstale_hash_rate=nonstale_hash_rate,
705 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
706 pool_stale_prop=stale_prop,
709 def get_local_stats():
710 lookbehind = 3600//net.SHARE_PERIOD
711 if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
714 global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
716 my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
717 my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
718 my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
719 my_share_count = my_unstale_count + my_orphan_count + my_doa_count
720 my_stale_count = my_orphan_count + my_doa_count
722 my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
724 my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
725 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
726 if share.hash in my_share_hashes)
727 actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
728 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
729 share_att_s = my_work / actual_time
731 return json.dumps(dict(
732 my_hash_rates_in_last_hour=dict(
733 nonstale=share_att_s,
734 rewarded=share_att_s/(1 - global_stale_prop),
735 actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
737 my_share_counts_in_last_hour=dict(
738 shares=my_share_count,
739 unstale_shares=my_unstale_count,
740 stale_shares=my_stale_count,
741 orphan_stale_shares=my_orphan_count,
742 doa_stale_shares=my_doa_count,
744 my_stale_proportions_in_last_hour=dict(
746 orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
747 dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
751 def get_peer_addresses():
752 return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
754 class WebInterface(resource.Resource):
755 def __init__(self, func, mime_type, *fields):
756 self.func, self.mime_type, self.fields = func, mime_type, fields
758 def render_GET(self, request):
759 request.setHeader('Content-Type', self.mime_type)
760 request.setHeader('Access-Control-Allow-Origin', '*')
761 return self.func(*(request.args[field][0] for field in self.fields))
763 web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
764 web_root.putChild('users', WebInterface(get_users, 'application/json'))
765 web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
766 web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
767 web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
768 web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
769 web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
770 web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
771 web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
772 web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
774 web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
776 grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
777 web_root.putChild('graphs', grapher.get_resource())
779 if tracker.get_height(current_work.value['best_share_hash']) < 720:
781 nonstalerate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
782 poolrate = nonstalerate / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720))
783 grapher.add_poolrate_point(poolrate, poolrate - nonstalerate)
784 task.LoopingCall(add_point).start(100)
786 def attempt_listen():
788 reactor.listenTCP(args.worker_port, server.Site(web_root))
789 except error.CannotListenError, e:
790 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
791 reactor.callLater(1, attempt_listen)
793 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
801 @defer.inlineCallbacks
804 flag = factory.new_block.get_deferred()
806 yield set_real_work1()
809 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
814 print 'Started successfully!'
818 if hasattr(signal, 'SIGALRM'):
819 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
820 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
822 signal.siginterrupt(signal.SIGALRM, False)
823 task.LoopingCall(signal.alarm, 30).start(1)
825 if args.irc_announce:
826 from twisted.words.protocols import irc
827 class IRCClient(irc.IRCClient):
829 def lineReceived(self, line):
831 irc.IRCClient.lineReceived(self, line)
833 irc.IRCClient.signedOn(self)
834 self.factory.resetDelay()
836 self.watch_id = tracker.verified.added.watch(self._new_share)
837 self.announced_hashes = set()
838 def _new_share(self, share):
839 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes:
840 self.announced_hashes.add(share.header_hash)
841 self.say('#p2pool', '\x02BLOCK FOUND by %s! http://blockexplorer.com/block/%064x' % (bitcoin_data.script2_to_address(share.new_script, net.PARENT), share.header_hash))
842 def connectionLost(self, reason):
843 tracker.verified.added.unwatch(self.watch_id)
844 print 'IRC connection lost:', reason.getErrorMessage()
845 class IRCClientFactory(protocol.ReconnectingClientFactory):
847 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
849 @defer.inlineCallbacks
852 first_pseudoshare_time = None
857 yield deferral.sleep(3)
859 if time.time() > current_work2.value['last_update'] + 60:
860 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
862 height = tracker.get_height(current_work.value['best_share_hash'])
863 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
865 len(tracker.verified.shares),
868 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
869 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
871 if first_pseudoshare_time is None and recent_shares_ts_work2:
872 first_pseudoshare_time = recent_shares_ts_work2[0][0]
873 while recent_shares_ts_work2 and recent_shares_ts_work2[0][0] < time.time() - average_period:
874 recent_shares_ts_work2.pop(0)
875 my_att_s = sum(work for ts, work, dead in recent_shares_ts_work2)/min(time.time() - first_pseudoshare_time, average_period) if first_pseudoshare_time is not None else 0
876 this_str += '\n Local: %sH/s (%.f min avg) Local dead on arrival: %s Expected time to share: %s' % (
877 math.format(int(my_att_s)),
878 (min(time.time() - first_pseudoshare_time, average_period) if first_pseudoshare_time is not None else 0)/60,
879 math.format_binomial_conf(sum(1 for tx, work, dead in recent_shares_ts_work2 if dead), len(recent_shares_ts_work2), 0.95),
880 '%.1f min' % (2**256 / tracker.shares[current_work.value['best_share_hash']].target / my_att_s / 60,) if my_att_s else '???',
884 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
885 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
886 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
888 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
889 shares, stale_orphan_shares, stale_doa_shares,
890 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
891 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
892 get_current_txouts().get(my_script, 0)*1e-8, net.PARENT.SYMBOL,
894 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Average time between blocks: %.2f days' % (
895 math.format(int(real_att_s)),
897 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
900 if this_str != last_str or time.time() > last_time + 15:
903 last_time = time.time()
908 log.err(None, 'Fatal error:')
912 class FixedArgumentParser(argparse.ArgumentParser):
913 def _read_args_from_files(self, arg_strings):
914 # expand arguments referencing files
916 for arg_string in arg_strings:
918 # for regular arguments, just add them back into the list
919 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
920 new_arg_strings.append(arg_string)
922 # replace arguments referencing files with the file content
925 args_file = open(arg_string[1:])
928 for arg_line in args_file.read().splitlines():
929 for arg in self.convert_arg_line_to_args(arg_line):
930 arg_strings.append(arg)
931 arg_strings = self._read_args_from_files(arg_strings)
932 new_arg_strings.extend(arg_strings)
936 err = sys.exc_info()[1]
939 # return the modified argument list
940 return new_arg_strings
942 def convert_arg_line_to_args(self, arg_line):
943 return [arg for arg in arg_line.split() if arg.strip()]
946 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
948 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
949 parser.add_argument('--version', action='version', version=p2pool.__version__)
950 parser.add_argument('--net',
951 help='use specified network (default: bitcoin)',
952 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
953 parser.add_argument('--testnet',
954 help='''use the network's testnet''',
955 action='store_const', const=True, default=False, dest='testnet')
956 parser.add_argument('--debug',
957 help='enable debugging mode',
958 action='store_const', const=True, default=False, dest='debug')
959 parser.add_argument('-a', '--address',
960 help='generate payouts to this address (default: <address requested from bitcoind>)',
961 type=str, action='store', default=None, dest='address')
962 parser.add_argument('--logfile',
963 help='''log to this file (default: data/<NET>/log)''',
964 type=str, action='store', default=None, dest='logfile')
965 parser.add_argument('--merged',
966 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
967 type=str, action='append', default=[], dest='merged_urls')
968 parser.add_argument('--merged-url',
969 help='DEPRECATED, use --merged',
970 type=str, action='store', default=None, dest='merged_url')
971 parser.add_argument('--merged-userpass',
972 help='DEPRECATED, use --merged',
973 type=str, action='store', default=None, dest='merged_userpass')
974 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
975 help='donate this percentage of work to author of p2pool (default: 0.5)',
976 type=float, action='store', default=0.5, dest='donation_percentage')
977 parser.add_argument('--irc-announce',
978 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
979 action='store_true', default=False, dest='irc_announce')
981 p2pool_group = parser.add_argument_group('p2pool interface')
982 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
983 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
984 type=int, action='store', default=None, dest='p2pool_port')
985 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
986 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
987 type=str, action='append', default=[], dest='p2pool_nodes')
988 parser.add_argument('--disable-upnp',
989 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
990 action='store_false', default=True, dest='upnp')
992 worker_group = parser.add_argument_group('worker interface')
993 worker_group.add_argument('-w', '--worker-port', metavar='PORT',
994 help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
995 type=int, action='store', default=None, dest='worker_port')
996 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
997 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
998 type=float, action='store', default=0, dest='worker_fee')
1000 bitcoind_group = parser.add_argument_group('bitcoind interface')
1001 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
1002 help='connect to this address (default: 127.0.0.1)',
1003 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
1004 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1005 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1006 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1007 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1008 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1009 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1011 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1012 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1013 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1015 args = parser.parse_args()
1020 net_name = args.net_name + ('_testnet' if args.testnet else '')
1021 net = networks.nets[net_name]
1023 datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net_name)
1024 if not os.path.exists(datadir_path):
1025 os.makedirs(datadir_path)
1027 if len(args.bitcoind_rpc_userpass) > 2:
1028 parser.error('a maximum of two arguments are allowed')
1029 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1031 if args.bitcoind_rpc_password is None:
1032 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1033 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1034 conf_path = net.PARENT.CONF_FILE_FUNC()
1035 if not os.path.exists(conf_path):
1036 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1037 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1040 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1041 with open(conf_path, 'rb') as f:
1042 cp = ConfigParser.RawConfigParser()
1043 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1044 for conf_name, var_name, var_type in [
1045 ('rpcuser', 'bitcoind_rpc_username', str),
1046 ('rpcpassword', 'bitcoind_rpc_password', str),
1047 ('rpcport', 'bitcoind_rpc_port', int),
1048 ('port', 'bitcoind_p2p_port', int),
1050 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1051 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1053 if args.bitcoind_rpc_username is None:
1054 args.bitcoind_rpc_username = ''
1056 if args.bitcoind_rpc_port is None:
1057 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1059 if args.bitcoind_p2p_port is None:
1060 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1062 if args.p2pool_port is None:
1063 args.p2pool_port = net.P2P_PORT
1065 if args.worker_port is None:
1066 args.worker_port = net.WORKER_PORT
1068 if args.address is not None:
1070 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1071 except Exception, e:
1072 parser.error('error parsing address: ' + repr(e))
1074 args.pubkey_hash = None
1076 def separate_url(url):
1077 s = urlparse.urlsplit(url)
1078 if '@' not in s.netloc:
1079 parser.error('merged url netloc must contain an "@"')
1080 userpass, new_netloc = s.netloc.rsplit('@', 1)
1081 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1082 merged_urls = map(separate_url, args.merged_urls)
1084 if args.merged_url is not None or args.merged_userpass is not None:
1085 print '--merged-url and --merged-userpass are deprecated! Use --merged http://USER:PASS@HOST:PORT/ instead!'
1086 print 'Pausing 10 seconds...'
1089 if args.merged_url is None or args.merged_userpass is None:
1090 parser.error('must specify both --merged-url and --merged-userpass')
1092 merged_urls = merged_urls + [(args.merged_url, args.merged_userpass)]
1095 if args.logfile is None:
1096 args.logfile = os.path.join(datadir_path, 'log')
1098 logfile = logging.LogFile(args.logfile)
1099 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1100 sys.stdout = logging.AbortPipe(pipe)
1101 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1102 if hasattr(signal, "SIGUSR1"):
1103 def sigusr1(signum, frame):
1104 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1106 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1107 signal.signal(signal.SIGUSR1, sigusr1)
1108 task.LoopingCall(logfile.reopen).start(5)
1110 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls)