1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58 @deferral.retry('Error while checking Bitcoin connection:', 1)
59 @defer.inlineCallbacks
61 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 raise deferral.RetrySilentlyException()
64 temp_work = yield getwork(bitcoind)
65 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
66 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
67 raise deferral.RetrySilentlyException()
68 defer.returnValue(temp_work)
69 temp_work = yield check()
71 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
74 # connect to bitcoind over bitcoin-p2p
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin_p2p.ClientFactory(net.PARENT)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 yield factory.getProtocol() # waits until handshake is successful
82 print 'Determining payout address...'
83 if args.pubkey_hash is None:
84 address_path = os.path.join(datadir_path, 'cached_payout_address')
86 if os.path.exists(address_path):
87 with open(address_path, 'rb') as f:
88 address = f.read().strip('\r\n')
89 print ' Loaded cached address: %s...' % (address,)
93 if address is not None:
94 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95 if not res['isvalid'] or not res['ismine']:
96 print ' Cached address is either invalid or not controlled by local bitcoind!'
100 print ' Getting payout address from bitcoind...'
101 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
103 with open(address_path, 'wb') as f:
106 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
108 my_pubkey_hash = args.pubkey_hash
109 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
112 my_share_hashes = set()
113 my_doa_share_hashes = set()
115 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
116 shared_share_hashes = set()
117 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
118 known_verified = set()
119 print "Loading shares..."
120 for i, (mode, contents) in enumerate(ss.get_shares()):
122 if contents.hash in tracker.shares:
124 shared_share_hashes.add(contents.hash)
125 contents.time_seen = 0
126 tracker.add(contents)
127 if len(tracker.shares) % 1000 == 0 and tracker.shares:
128 print " %i" % (len(tracker.shares),)
129 elif mode == 'verified_hash':
130 known_verified.add(contents)
132 raise AssertionError()
133 print " ...inserting %i verified shares..." % (len(known_verified),)
134 for h in known_verified:
135 if h not in tracker.shares:
136 ss.forget_verified_share(h)
138 tracker.verified.add(tracker.shares[h])
139 print " ...done loading %i shares!" % (len(tracker.shares),)
141 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
142 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
143 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
145 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
147 pre_current_work = variable.Variable(None)
148 pre_merged_work = variable.Variable({})
149 # information affecting work that should trigger a long-polling update
150 current_work = variable.Variable(None)
151 # information affecting work that should not trigger a long-polling update
152 current_work2 = variable.Variable(None)
154 requested = expiring_dict.ExpiringDict(300)
156 print 'Initializing work...'
157 @defer.inlineCallbacks
158 def set_real_work1():
159 work = yield getwork(bitcoind)
160 current_work2.set(dict(
162 transactions=work['transactions'],
163 merkle_link=work['merkle_link'],
164 subsidy=work['subsidy'],
165 clock_offset=time.time() - work['time'],
166 last_update=time.time(),
167 )) # second set first because everything hooks on the first
168 pre_current_work.set(dict(
169 version=work['version'],
170 previous_block=work['previous_block_hash'],
172 coinbaseflags=work['coinbaseflags'],
174 yield set_real_work1()
176 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
178 def set_real_work2():
179 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
181 t = dict(pre_current_work.value)
182 t['best_share_hash'] = best
183 t['mm_chains'] = pre_merged_work.value
187 for peer2, share_hash in desired:
188 if share_hash not in tracker.tails: # was received in the time tracker.think was running
190 last_request_time, count = requested.get(share_hash, (None, 0))
191 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
193 potential_peers = set()
194 for head in tracker.tails[share_hash]:
195 potential_peers.update(peer_heads.get(head, set()))
196 potential_peers = [peer for peer in potential_peers if peer.connected2]
197 if count == 0 and peer2 is not None and peer2.connected2:
200 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
204 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
208 stops=list(set(tracker.heads) | set(
209 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
212 requested[share_hash] = t, count + 1
213 pre_current_work.changed.watch(lambda _: set_real_work2())
214 pre_merged_work.changed.watch(lambda _: set_real_work2())
220 @defer.inlineCallbacks
221 def set_merged_work(merged_url, merged_userpass):
222 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
224 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
225 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
226 hash=int(auxblock['hash'], 16),
227 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
228 merged_proxy=merged_proxy,
230 yield deferral.sleep(1)
231 for merged_url, merged_userpass in merged_urls:
232 set_merged_work(merged_url, merged_userpass)
234 @pre_merged_work.changed.watch
235 def _(new_merged_work):
236 print 'Got new merged mining work!'
238 # setup p2p logic and join p2pool network
240 class Node(p2p.Node):
241 def handle_shares(self, shares, peer):
243 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
247 if share.hash in tracker.shares:
248 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
253 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
257 if shares and peer is not None:
258 peer_heads.setdefault(shares[0].hash, set()).add(peer)
264 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
266 def handle_share_hashes(self, hashes, peer):
269 for share_hash in hashes:
270 if share_hash in tracker.shares:
272 last_request_time, count = requested.get(share_hash, (None, 0))
273 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
275 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
276 get_hashes.append(share_hash)
277 requested[share_hash] = t, count + 1
279 if hashes and peer is not None:
280 peer_heads.setdefault(hashes[0], set()).add(peer)
282 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
284 def handle_get_shares(self, hashes, parents, stops, peer):
285 parents = min(parents, 1000//len(hashes))
288 for share_hash in hashes:
289 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
290 if share.hash in stops:
293 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
296 @deferral.retry('Error submitting block: (will retry)', 10, 10)
297 @defer.inlineCallbacks
298 def submit_block(block, ignore_failure):
299 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
300 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
301 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
302 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
304 @tracker.verified.added.watch
306 if share.pow_hash <= share.header['bits'].target:
307 submit_block(share.as_block(tracker), ignore_failure=True)
309 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
312 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
314 @defer.inlineCallbacks
317 ip, port = x.split(':')
318 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
320 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
323 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
325 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
327 print >>sys.stderr, "error reading addrs"
328 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
331 if addr not in addrs:
332 addrs[addr] = (0, time.time(), time.time())
336 connect_addrs = set()
337 for addr_df in map(parse, args.p2pool_nodes):
339 connect_addrs.add((yield addr_df))
344 best_share_hash_func=lambda: current_work.value['best_share_hash'],
345 port=args.p2pool_port,
348 connect_addrs=connect_addrs,
349 max_incoming_conns=args.p2pool_conns,
353 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
355 # send share when the chain changes to their chain
356 def work_changed(new_work):
357 #print 'Work changed:', new_work
359 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
360 if share.hash in shared_share_hashes:
362 shared_share_hashes.add(share.hash)
365 for peer in p2p_node.peers.itervalues():
366 peer.sendShares([share for share in shares if share.peer is not peer])
368 current_work.changed.watch(work_changed)
371 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
373 if share.hash in tracker.verified.shares:
374 ss.add_verified_hash(share.hash)
375 task.LoopingCall(save_shares).start(60)
381 @defer.inlineCallbacks
385 is_lan, lan_ip = yield ipdiscover.get_local_ip()
387 pm = yield portmapper.get_port_mapper()
388 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
389 except defer.TimeoutError:
393 log.err(None, 'UPnP error:')
394 yield deferral.sleep(random.expovariate(1/120))
397 # start listening for workers with a JSON-RPC server
399 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
403 removed_unstales_var = variable.Variable((0, 0, 0))
404 removed_doa_unstales_var = variable.Variable(0)
405 @tracker.verified.removed.watch
407 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
408 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
409 removed_unstales_var.set((
410 removed_unstales_var.value[0] + 1,
411 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
412 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
414 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415 removed_doa_unstales.set(removed_doa_unstales.value + 1)
417 def get_stale_counts():
418 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
419 my_shares = len(my_share_hashes)
420 my_doa_shares = len(my_doa_share_hashes)
421 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
422 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
423 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
424 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
425 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
427 my_shares_not_in_chain = my_shares - my_shares_in_chain
428 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
430 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
433 pseudoshare_received = variable.Event()
434 share_received = variable.Event()
435 local_rate_monitor = math.RateMonitor(10*60)
437 class WorkerBridge(worker_interface.WorkerBridge):
439 worker_interface.WorkerBridge.__init__(self)
440 self.new_work_event = current_work.changed
441 self.recent_shares_ts_work = []
443 def preprocess_request(self, request):
444 user = request.getUser() if request.getUser() is not None else ''
446 desired_pseudoshare_target = None
448 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
450 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
454 desired_share_target = 2**256 - 1
456 user, min_diff_str = user.rsplit('/', 1)
458 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
462 if random.uniform(0, 100) < args.worker_fee:
463 pubkey_hash = my_pubkey_hash
466 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
468 pubkey_hash = my_pubkey_hash
470 return pubkey_hash, desired_share_target, desired_pseudoshare_target
472 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
473 if len(p2p_node.peers) == 0 and net.PERSIST:
474 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
475 if current_work.value['best_share_hash'] is None and net.PERSIST:
476 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
477 if time.time() > current_work2.value['last_update'] + 60:
478 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
480 if current_work.value['mm_chains']:
481 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
482 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
483 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
484 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
488 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
494 share_info, generate_tx = p2pool_data.Share.generate_transaction(
497 previous_share_hash=current_work.value['best_share_hash'],
498 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
499 nonce=random.randrange(2**32),
500 pubkey_hash=pubkey_hash,
501 subsidy=current_work2.value['subsidy'],
502 donation=math.perfect_round(65535*args.donation_percentage/100),
503 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
504 253 if orphans > orphans_recorded_in_chain else
505 254 if doas > doas_recorded_in_chain else
507 )(*get_stale_counts()),
510 block_target=current_work.value['bits'].target,
511 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
512 desired_target=desired_share_target,
513 ref_merkle_link=dict(branch=[], index=0),
517 target = net.PARENT.SANE_MAX_TARGET
518 if desired_pseudoshare_target is None:
519 if len(self.recent_shares_ts_work) == 50:
520 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
521 target = min(target, 2**256//hash_rate)
523 target = min(target, desired_pseudoshare_target)
524 target = max(target, share_info['bits'].target)
525 for aux_work in current_work.value['mm_chains'].itervalues():
526 target = max(target, aux_work['target'])
528 transactions = [generate_tx] + list(current_work2.value['transactions'])
529 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
530 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
532 getwork_time = time.time()
533 merkle_link = current_work2.value['merkle_link']
535 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
536 bitcoin_data.target_to_difficulty(target),
537 bitcoin_data.target_to_difficulty(share_info['bits'].target),
538 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
539 len(current_work2.value['transactions']),
542 ba = bitcoin_getwork.BlockAttempt(
543 version=current_work.value['version'],
544 previous_block=current_work.value['previous_block'],
545 merkle_root=merkle_root,
546 timestamp=current_work2.value['time'],
547 bits=current_work.value['bits'],
551 received_header_hashes = set()
553 def got_response(header, request):
554 assert header['merkle_root'] == merkle_root
556 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
557 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
558 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
561 if pow_hash <= header['bits'].target or p2pool.DEBUG:
562 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
563 if pow_hash <= header['bits'].target:
565 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
568 log.err(None, 'Error while processing potential block:')
570 for aux_work, index, hashes in mm_later:
572 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
573 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
574 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
575 bitcoin_data.aux_pow_type.pack(dict(
578 block_hash=header_hash,
579 merkle_link=merkle_link,
581 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
582 parent_block_header=header,
587 if result != (pow_hash <= aux_work['target']):
588 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
590 print 'Merged block submittal result: %s' % (result,)
593 log.err(err, 'Error submitting merged block:')
595 log.err(None, 'Error while processing merged mining POW:')
597 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
598 min_header = dict(header);del min_header['merkle_root']
599 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
600 share = p2pool_data.Share(net, None, dict(
601 min_header=min_header, share_info=share_info, hash_link=hash_link,
602 ref_merkle_link=dict(branch=[], index=0),
603 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
605 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
607 p2pool_data.format_hash(share.hash),
608 p2pool_data.format_hash(share.previous_hash),
609 time.time() - getwork_time,
610 ' DEAD ON ARRIVAL' if not on_time else '',
612 my_share_hashes.add(share.hash)
614 my_doa_share_hashes.add(share.hash)
618 tracker.verified.add(share)
622 if pow_hash <= header['bits'].target or p2pool.DEBUG:
623 for peer in p2p_node.peers.itervalues():
624 peer.sendShares([share])
625 shared_share_hashes.add(share.hash)
627 log.err(None, 'Error forwarding block solution:')
629 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
631 if pow_hash > target:
632 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
633 print ' Hash: %56x' % (pow_hash,)
634 print ' Target: %56x' % (target,)
635 elif header_hash in received_header_hashes:
636 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
638 received_header_hashes.add(header_hash)
640 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser())
641 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
642 while len(self.recent_shares_ts_work) > 50:
643 self.recent_shares_ts_work.pop(0)
644 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
648 return ba, got_response
650 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
652 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
653 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
655 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
657 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
664 @defer.inlineCallbacks
667 flag = factory.new_block.get_deferred()
669 yield set_real_work1()
672 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
677 print 'Started successfully!'
678 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
679 if args.donation_percentage > 0.51:
680 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
681 elif args.donation_percentage < 0.49:
682 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
684 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
685 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
689 if hasattr(signal, 'SIGALRM'):
690 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
691 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
693 signal.siginterrupt(signal.SIGALRM, False)
694 task.LoopingCall(signal.alarm, 30).start(1)
696 if args.irc_announce:
697 from twisted.words.protocols import irc
698 class IRCClient(irc.IRCClient):
699 nickname = 'p2pool%02i' % (random.randrange(100),)
700 channel = net.ANNOUNCE_CHANNEL
701 def lineReceived(self, line):
704 irc.IRCClient.lineReceived(self, line)
706 irc.IRCClient.signedOn(self)
707 self.factory.resetDelay()
708 self.join(self.channel)
709 @defer.inlineCallbacks
710 def new_share(share):
711 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
712 yield deferral.sleep(random.expovariate(1/60))
713 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
714 if message not in self.recent_messages:
715 self.say(self.channel, message)
716 self._remember_message(message)
717 self.watch_id = tracker.verified.added.watch(new_share)
718 self.recent_messages = []
719 def _remember_message(self, message):
720 self.recent_messages.append(message)
721 while len(self.recent_messages) > 100:
722 self.recent_messages.pop(0)
723 def privmsg(self, user, channel, message):
724 if channel == self.channel:
725 self._remember_message(message)
726 def connectionLost(self, reason):
727 tracker.verified.added.unwatch(self.watch_id)
728 print 'IRC connection lost:', reason.getErrorMessage()
729 class IRCClientFactory(protocol.ReconnectingClientFactory):
731 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
733 @defer.inlineCallbacks
738 yield deferral.sleep(3)
740 if time.time() > current_work2.value['last_update'] + 60:
741 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
743 height = tracker.get_height(current_work.value['best_share_hash'])
744 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
746 len(tracker.verified.shares),
749 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
750 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
752 datums, dt = local_rate_monitor.get_datums_in_last()
753 my_att_s = sum(datum['work']/dt for datum in datums)
754 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
755 math.format(int(my_att_s)),
757 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
758 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
762 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
763 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
764 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
766 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
767 shares, stale_orphan_shares, stale_doa_shares,
768 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
769 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
770 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
772 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
773 math.format(int(real_att_s)),
775 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
778 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
779 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
780 if majority_desired_version not in [0, 1]:
781 print >>sys.stderr, '#'*40
782 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
783 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
784 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
785 print >>sys.stderr, '#'*40
787 if this_str != last_str or time.time() > last_time + 15:
790 last_time = time.time()
796 log.err(None, 'Fatal error:')
799 class FixedArgumentParser(argparse.ArgumentParser):
800 def _read_args_from_files(self, arg_strings):
801 # expand arguments referencing files
803 for arg_string in arg_strings:
805 # for regular arguments, just add them back into the list
806 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
807 new_arg_strings.append(arg_string)
809 # replace arguments referencing files with the file content
812 args_file = open(arg_string[1:])
815 for arg_line in args_file.read().splitlines():
816 for arg in self.convert_arg_line_to_args(arg_line):
817 arg_strings.append(arg)
818 arg_strings = self._read_args_from_files(arg_strings)
819 new_arg_strings.extend(arg_strings)
823 err = sys.exc_info()[1]
826 # return the modified argument list
827 return new_arg_strings
829 def convert_arg_line_to_args(self, arg_line):
830 return [arg for arg in arg_line.split() if arg.strip()]
833 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
835 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
836 parser.add_argument('--version', action='version', version=p2pool.__version__)
837 parser.add_argument('--net',
838 help='use specified network (default: bitcoin)',
839 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
840 parser.add_argument('--testnet',
841 help='''use the network's testnet''',
842 action='store_const', const=True, default=False, dest='testnet')
843 parser.add_argument('--debug',
844 help='enable debugging mode',
845 action='store_const', const=True, default=False, dest='debug')
846 parser.add_argument('-a', '--address',
847 help='generate payouts to this address (default: <address requested from bitcoind>)',
848 type=str, action='store', default=None, dest='address')
849 parser.add_argument('--datadir',
850 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
851 type=str, action='store', default=None, dest='datadir')
852 parser.add_argument('--logfile',
853 help='''log to this file (default: data/<NET>/log)''',
854 type=str, action='store', default=None, dest='logfile')
855 parser.add_argument('--merged',
856 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
857 type=str, action='append', default=[], dest='merged_urls')
858 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
859 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
860 type=float, action='store', default=0.5, dest='donation_percentage')
861 parser.add_argument('--irc-announce',
862 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
863 action='store_true', default=False, dest='irc_announce')
865 p2pool_group = parser.add_argument_group('p2pool interface')
866 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
867 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
868 type=int, action='store', default=None, dest='p2pool_port')
869 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
870 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
871 type=str, action='append', default=[], dest='p2pool_nodes')
872 parser.add_argument('--disable-upnp',
873 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
874 action='store_false', default=True, dest='upnp')
875 p2pool_group.add_argument('--max-conns', metavar='CONNS',
876 help='maximum incoming connections (default: 40)',
877 type=int, action='store', default=40, dest='p2pool_conns')
879 worker_group = parser.add_argument_group('worker interface')
880 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
881 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
882 type=str, action='store', default=None, dest='worker_endpoint')
883 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
884 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
885 type=float, action='store', default=0, dest='worker_fee')
887 bitcoind_group = parser.add_argument_group('bitcoind interface')
888 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
889 help='connect to this address (default: 127.0.0.1)',
890 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
891 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
892 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
893 type=int, action='store', default=None, dest='bitcoind_rpc_port')
894 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
895 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
896 type=int, action='store', default=None, dest='bitcoind_p2p_port')
898 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
899 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
900 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
902 args = parser.parse_args()
907 net_name = args.net_name + ('_testnet' if args.testnet else '')
908 net = networks.nets[net_name]
910 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
911 if not os.path.exists(datadir_path):
912 os.makedirs(datadir_path)
914 if len(args.bitcoind_rpc_userpass) > 2:
915 parser.error('a maximum of two arguments are allowed')
916 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
918 if args.bitcoind_rpc_password is None:
919 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
920 parser.error('This network has no configuration file function. Manually enter your RPC password.')
921 conf_path = net.PARENT.CONF_FILE_FUNC()
922 if not os.path.exists(conf_path):
923 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
924 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
927 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
928 with open(conf_path, 'rb') as f:
929 cp = ConfigParser.RawConfigParser()
930 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
931 for conf_name, var_name, var_type in [
932 ('rpcuser', 'bitcoind_rpc_username', str),
933 ('rpcpassword', 'bitcoind_rpc_password', str),
934 ('rpcport', 'bitcoind_rpc_port', int),
935 ('port', 'bitcoind_p2p_port', int),
937 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
938 setattr(args, var_name, var_type(cp.get('x', conf_name)))
939 if args.bitcoind_rpc_password is None:
940 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
942 if args.bitcoind_rpc_username is None:
943 args.bitcoind_rpc_username = ''
945 if args.bitcoind_rpc_port is None:
946 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
948 if args.bitcoind_p2p_port is None:
949 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
951 if args.p2pool_port is None:
952 args.p2pool_port = net.P2P_PORT
954 if args.worker_endpoint is None:
955 worker_endpoint = '', net.WORKER_PORT
956 elif ':' not in args.worker_endpoint:
957 worker_endpoint = '', int(args.worker_endpoint)
959 addr, port = args.worker_endpoint.rsplit(':', 1)
960 worker_endpoint = addr, int(port)
962 if args.address is not None:
964 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
966 parser.error('error parsing address: ' + repr(e))
968 args.pubkey_hash = None
970 def separate_url(url):
971 s = urlparse.urlsplit(url)
972 if '@' not in s.netloc:
973 parser.error('merged url netloc must contain an "@"')
974 userpass, new_netloc = s.netloc.rsplit('@', 1)
975 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
976 merged_urls = map(separate_url, args.merged_urls)
978 if args.logfile is None:
979 args.logfile = os.path.join(datadir_path, 'log')
981 logfile = logging.LogFile(args.logfile)
982 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
983 sys.stdout = logging.AbortPipe(pipe)
984 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
985 if hasattr(signal, "SIGUSR1"):
986 def sigusr1(signum, frame):
987 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
989 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
990 signal.signal(signal.SIGUSR1, sigusr1)
991 task.LoopingCall(logfile.reopen).start(5)
993 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)