1 from __future__ import division
17 from twisted.internet import iocpreactor
22 print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
38 work = yield bitcoind.rpc_getmemorypool()
39 except jsonrpc.Error, e:
40 if e.code == -32601: # Method not found
41 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42 raise deferral.RetrySilentlyException()
44 packed_transactions = [x.decode('hex') for x in work['transactions']]
45 defer.returnValue(dict(
46 version=work['version'],
47 previous_block_hash=int(work['previousblockhash'], 16),
48 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50 subsidy=work['coinbasevalue'],
52 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
59 print 'p2pool (version %s)' % (p2pool.__version__,)
62 # connect to bitcoind over JSON-RPC and do initial getmemorypool
63 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66 @deferral.retry('Error while checking Bitcoin connection:', 1)
67 @defer.inlineCallbacks
69 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71 raise deferral.RetrySilentlyException()
72 temp_work = yield getwork(bitcoind)
73 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75 raise deferral.RetrySilentlyException()
76 defer.returnValue(temp_work)
77 temp_work = yield check()
79 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
82 # connect to bitcoind over bitcoin-p2p
83 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84 factory = bitcoin_p2p.ClientFactory(net.PARENT)
85 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86 yield factory.getProtocol() # waits until handshake is successful
90 print 'Determining payout address...'
91 if args.pubkey_hash is None:
92 address_path = os.path.join(datadir_path, 'cached_payout_address')
94 if os.path.exists(address_path):
95 with open(address_path, 'rb') as f:
96 address = f.read().strip('\r\n')
97 print ' Loaded cached address: %s...' % (address,)
101 if address is not None:
102 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103 if not res['isvalid'] or not res['ismine']:
104 print ' Cached address is either invalid or not controlled by local bitcoind!'
108 print ' Getting payout address from bitcoind...'
109 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
111 with open(address_path, 'wb') as f:
114 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
116 my_pubkey_hash = args.pubkey_hash
117 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
120 my_share_hashes = set()
121 my_doa_share_hashes = set()
123 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124 shared_share_hashes = set()
125 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126 known_verified = set()
127 print "Loading shares..."
128 for i, (mode, contents) in enumerate(ss.get_shares()):
130 if contents.hash in tracker.shares:
132 shared_share_hashes.add(contents.hash)
133 contents.time_seen = 0
134 tracker.add(contents)
135 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136 print " %i" % (len(tracker.shares),)
137 elif mode == 'verified_hash':
138 known_verified.add(contents)
140 raise AssertionError()
141 print " ...inserting %i verified shares..." % (len(known_verified),)
142 for h in known_verified:
143 if h not in tracker.shares:
144 ss.forget_verified_share(h)
146 tracker.verified.add(tracker.shares[h])
147 print " ...done loading %i shares!" % (len(tracker.shares),)
149 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
153 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155 pre_current_work = variable.Variable(None)
156 pre_merged_work = variable.Variable({})
157 # information affecting work that should trigger a long-polling update
158 current_work = variable.Variable(None)
159 # information affecting work that should not trigger a long-polling update
160 current_work2 = variable.Variable(None)
162 requested = expiring_dict.ExpiringDict(300)
164 print 'Initializing work...'
165 @defer.inlineCallbacks
166 def set_real_work1():
167 work = yield getwork(bitcoind)
168 current_work2.set(dict(
170 transactions=work['transactions'],
171 merkle_link=work['merkle_link'],
172 subsidy=work['subsidy'],
173 clock_offset=time.time() - work['time'],
174 last_update=time.time(),
175 )) # second set first because everything hooks on the first
176 pre_current_work.set(dict(
177 version=work['version'],
178 previous_block=work['previous_block_hash'],
180 coinbaseflags=work['coinbaseflags'],
182 yield set_real_work1()
184 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
186 def set_real_work2():
187 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
189 t = dict(pre_current_work.value)
190 t['best_share_hash'] = best
191 t['mm_chains'] = pre_merged_work.value
195 for peer2, share_hash in desired:
196 if share_hash not in tracker.tails: # was received in the time tracker.think was running
198 last_request_time, count = requested.get(share_hash, (None, 0))
199 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
201 potential_peers = set()
202 for head in tracker.tails[share_hash]:
203 potential_peers.update(peer_heads.get(head, set()))
204 potential_peers = [peer for peer in potential_peers if peer.connected2]
205 if count == 0 and peer2 is not None and peer2.connected2:
208 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
212 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
216 stops=list(set(tracker.heads) | set(
217 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
220 requested[share_hash] = t, count + 1
221 pre_current_work.changed.watch(lambda _: set_real_work2())
222 pre_merged_work.changed.watch(lambda _: set_real_work2())
228 @defer.inlineCallbacks
229 def set_merged_work(merged_url, merged_userpass):
230 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
232 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234 hash=int(auxblock['hash'], 16),
235 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236 merged_proxy=merged_proxy,
238 yield deferral.sleep(1)
239 for merged_url, merged_userpass in merged_urls:
240 set_merged_work(merged_url, merged_userpass)
242 @pre_merged_work.changed.watch
243 def _(new_merged_work):
244 print 'Got new merged mining work!'
246 # setup p2p logic and join p2pool network
248 class Node(p2p.Node):
249 def handle_shares(self, shares, peer):
251 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255 if share.hash in tracker.shares:
256 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265 if shares and peer is not None:
266 peer_heads.setdefault(shares[0].hash, set()).add(peer)
272 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
274 def handle_share_hashes(self, hashes, peer):
277 for share_hash in hashes:
278 if share_hash in tracker.shares:
280 last_request_time, count = requested.get(share_hash, (None, 0))
281 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
283 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284 get_hashes.append(share_hash)
285 requested[share_hash] = t, count + 1
287 if hashes and peer is not None:
288 peer_heads.setdefault(hashes[0], set()).add(peer)
290 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
292 def handle_get_shares(self, hashes, parents, stops, peer):
293 parents = min(parents, 1000//len(hashes))
296 for share_hash in hashes:
297 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298 if share.hash in stops:
301 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305 def submit_block_p2p(block):
306 if factory.conn.value is None:
307 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308 raise deferral.RetrySilentlyException()
309 factory.conn.value.send_block(block=block)
311 @deferral.retry('Error submitting block: (will retry)', 10, 10)
312 @defer.inlineCallbacks
313 def submit_block_rpc(block, ignore_failure):
314 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
319 def submit_block(block, ignore_failure):
320 submit_block_p2p(block)
321 submit_block_rpc(block, ignore_failure)
323 @tracker.verified.added.watch
325 if share.pow_hash <= share.header['bits'].target:
326 submit_block(share.as_block(tracker), ignore_failure=True)
328 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
330 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
331 broadcast_share(share.hash)
333 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335 @defer.inlineCallbacks
338 ip, port = x.split(':')
339 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
344 if os.path.exists(os.path.join(datadir_path, 'addrs')):
346 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349 print >>sys.stderr, 'error parsing addrs'
350 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
352 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
354 print >>sys.stderr, "error reading addrs.txt"
355 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
358 if addr not in addrs:
359 addrs[addr] = (0, time.time(), time.time())
363 connect_addrs = set()
364 for addr_df in map(parse, args.p2pool_nodes):
366 connect_addrs.add((yield addr_df))
371 best_share_hash_func=lambda: current_work.value['best_share_hash'],
372 port=args.p2pool_port,
375 connect_addrs=connect_addrs,
376 max_incoming_conns=args.p2pool_conns,
381 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382 f.write(json.dumps(p2p_node.addr_store.items()))
383 task.LoopingCall(save_addrs).start(60)
385 def broadcast_share(share_hash):
387 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388 if share.hash in shared_share_hashes:
390 shared_share_hashes.add(share.hash)
393 for peer in p2p_node.peers.itervalues():
394 peer.sendShares([share for share in shares if share.peer is not peer])
396 # send share when the chain changes to their chain
397 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
400 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
402 if share.hash in tracker.verified.shares:
403 ss.add_verified_hash(share.hash)
404 task.LoopingCall(save_shares).start(60)
410 @defer.inlineCallbacks
414 is_lan, lan_ip = yield ipdiscover.get_local_ip()
416 pm = yield portmapper.get_port_mapper()
417 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418 except defer.TimeoutError:
422 log.err(None, 'UPnP error:')
423 yield deferral.sleep(random.expovariate(1/120))
426 # start listening for workers with a JSON-RPC server
428 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
432 removed_unstales_var = variable.Variable((0, 0, 0))
433 removed_doa_unstales_var = variable.Variable(0)
434 @tracker.verified.removed.watch
436 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
438 removed_unstales_var.set((
439 removed_unstales_var.value[0] + 1,
440 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
441 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
443 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444 removed_doa_unstales.set(removed_doa_unstales.value + 1)
446 def get_stale_counts():
447 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448 my_shares = len(my_share_hashes)
449 my_doa_shares = len(my_doa_share_hashes)
450 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
456 my_shares_not_in_chain = my_shares - my_shares_in_chain
457 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
459 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
462 pseudoshare_received = variable.Event()
463 share_received = variable.Event()
464 local_rate_monitor = math.RateMonitor(10*60)
466 class WorkerBridge(worker_interface.WorkerBridge):
468 worker_interface.WorkerBridge.__init__(self)
469 self.new_work_event = current_work.changed
470 self.recent_shares_ts_work = []
472 def get_user_details(self, request):
473 user = request.getUser() if request.getUser() is not None else ''
475 desired_pseudoshare_target = None
477 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
479 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
483 desired_share_target = 2**256 - 1
485 user, min_diff_str = user.rsplit('/', 1)
487 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
491 if random.uniform(0, 100) < args.worker_fee:
492 pubkey_hash = my_pubkey_hash
495 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
497 pubkey_hash = my_pubkey_hash
499 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
501 def preprocess_request(self, request):
502 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503 return pubkey_hash, desired_share_target, desired_pseudoshare_target
505 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506 if len(p2p_node.peers) == 0 and net.PERSIST:
507 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508 if current_work.value['best_share_hash'] is None and net.PERSIST:
509 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510 if time.time() > current_work2.value['last_update'] + 60:
511 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
513 if current_work.value['mm_chains']:
514 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
521 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
527 share_info, generate_tx = p2pool_data.Share.generate_transaction(
530 previous_share_hash=current_work.value['best_share_hash'],
531 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532 nonce=random.randrange(2**32),
533 pubkey_hash=pubkey_hash,
534 subsidy=current_work2.value['subsidy'],
535 donation=math.perfect_round(65535*args.donation_percentage/100),
536 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537 253 if orphans > orphans_recorded_in_chain else
538 254 if doas > doas_recorded_in_chain else
540 )(*get_stale_counts()),
543 block_target=current_work.value['bits'].target,
544 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545 desired_target=desired_share_target,
546 ref_merkle_link=dict(branch=[], index=0),
550 target = net.PARENT.SANE_MAX_TARGET
551 if desired_pseudoshare_target is None:
552 if len(self.recent_shares_ts_work) == 50:
553 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
554 target = min(target, 4*2**256//hash_rate)
556 target = min(target, desired_pseudoshare_target)
557 target = max(target, share_info['bits'].target)
558 for aux_work in current_work.value['mm_chains'].itervalues():
559 target = max(target, aux_work['target'])
561 transactions = [generate_tx] + list(current_work2.value['transactions'])
562 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
563 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
565 getwork_time = time.time()
566 merkle_link = current_work2.value['merkle_link']
568 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
569 bitcoin_data.target_to_difficulty(target),
570 bitcoin_data.target_to_difficulty(share_info['bits'].target),
571 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
572 len(current_work2.value['transactions']),
575 ba = bitcoin_getwork.BlockAttempt(
576 version=current_work.value['version'],
577 previous_block=current_work.value['previous_block'],
578 merkle_root=merkle_root,
579 timestamp=current_work2.value['time'],
580 bits=current_work.value['bits'],
584 received_header_hashes = set()
586 def got_response(header, request):
587 user, _, _, _ = self.get_user_details(request)
588 assert header['merkle_root'] == merkle_root
590 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
591 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
592 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
595 if pow_hash <= header['bits'].target or p2pool.DEBUG:
596 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
597 if pow_hash <= header['bits'].target:
599 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
602 log.err(None, 'Error while processing potential block:')
604 for aux_work, index, hashes in mm_later:
606 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
607 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
608 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
609 bitcoin_data.aux_pow_type.pack(dict(
612 block_hash=header_hash,
613 merkle_link=merkle_link,
615 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
616 parent_block_header=header,
621 if result != (pow_hash <= aux_work['target']):
622 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
624 print 'Merged block submittal result: %s' % (result,)
627 log.err(err, 'Error submitting merged block:')
629 log.err(None, 'Error while processing merged mining POW:')
631 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
632 min_header = dict(header);del min_header['merkle_root']
633 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
634 share = p2pool_data.Share(net, None, dict(
635 min_header=min_header, share_info=share_info, hash_link=hash_link,
636 ref_merkle_link=dict(branch=[], index=0),
637 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
639 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
641 p2pool_data.format_hash(share.hash),
642 p2pool_data.format_hash(share.previous_hash),
643 time.time() - getwork_time,
644 ' DEAD ON ARRIVAL' if not on_time else '',
646 my_share_hashes.add(share.hash)
648 my_doa_share_hashes.add(share.hash)
652 tracker.verified.add(share)
656 if pow_hash <= header['bits'].target or p2pool.DEBUG:
657 for peer in p2p_node.peers.itervalues():
658 peer.sendShares([share])
659 shared_share_hashes.add(share.hash)
661 log.err(None, 'Error forwarding block solution:')
663 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
665 if pow_hash > target:
666 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
667 print ' Hash: %56x' % (pow_hash,)
668 print ' Target: %56x' % (target,)
669 elif header_hash in received_header_hashes:
670 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
672 received_header_hashes.add(header_hash)
674 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
675 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
676 while len(self.recent_shares_ts_work) > 50:
677 self.recent_shares_ts_work.pop(0)
678 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
682 return ba, got_response
684 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
686 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
687 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
689 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
691 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
698 @defer.inlineCallbacks
701 flag = factory.new_block.get_deferred()
703 yield set_real_work1()
706 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
711 print 'Started successfully!'
712 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
713 if args.donation_percentage > 0.51:
714 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
715 elif args.donation_percentage < 0.49:
716 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
718 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
719 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
723 if hasattr(signal, 'SIGALRM'):
724 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
725 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
727 signal.siginterrupt(signal.SIGALRM, False)
728 task.LoopingCall(signal.alarm, 30).start(1)
730 if args.irc_announce:
731 from twisted.words.protocols import irc
732 class IRCClient(irc.IRCClient):
733 nickname = 'p2pool%02i' % (random.randrange(100),)
734 channel = net.ANNOUNCE_CHANNEL
735 def lineReceived(self, line):
738 irc.IRCClient.lineReceived(self, line)
740 irc.IRCClient.signedOn(self)
741 self.factory.resetDelay()
742 self.join(self.channel)
743 @defer.inlineCallbacks
744 def new_share(share):
745 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
746 yield deferral.sleep(random.expovariate(1/60))
747 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
748 if message not in self.recent_messages:
749 self.say(self.channel, message)
750 self._remember_message(message)
751 self.watch_id = tracker.verified.added.watch(new_share)
752 self.recent_messages = []
753 def _remember_message(self, message):
754 self.recent_messages.append(message)
755 while len(self.recent_messages) > 100:
756 self.recent_messages.pop(0)
757 def privmsg(self, user, channel, message):
758 if channel == self.channel:
759 self._remember_message(message)
760 def connectionLost(self, reason):
761 tracker.verified.added.unwatch(self.watch_id)
762 print 'IRC connection lost:', reason.getErrorMessage()
763 class IRCClientFactory(protocol.ReconnectingClientFactory):
765 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
767 @defer.inlineCallbacks
772 yield deferral.sleep(3)
774 if time.time() > current_work2.value['last_update'] + 60:
775 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
777 height = tracker.get_height(current_work.value['best_share_hash'])
778 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
780 len(tracker.verified.shares),
783 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
784 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
786 datums, dt = local_rate_monitor.get_datums_in_last()
787 my_att_s = sum(datum['work']/dt for datum in datums)
788 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
789 math.format(int(my_att_s)),
791 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
792 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
796 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
797 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
798 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
800 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
801 shares, stale_orphan_shares, stale_doa_shares,
802 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
803 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
804 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
806 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
807 math.format(int(real_att_s)),
809 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
812 for warning in p2pool_data.get_warnings(tracker, current_work):
813 print >>sys.stderr, '#'*40
814 print >>sys.stderr, '>>> Warning: ' + warning
815 print >>sys.stderr, '#'*40
817 if this_str != last_str or time.time() > last_time + 15:
820 last_time = time.time()
826 log.err(None, 'Fatal error:')
829 class FixedArgumentParser(argparse.ArgumentParser):
830 def _read_args_from_files(self, arg_strings):
831 # expand arguments referencing files
833 for arg_string in arg_strings:
835 # for regular arguments, just add them back into the list
836 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
837 new_arg_strings.append(arg_string)
839 # replace arguments referencing files with the file content
842 args_file = open(arg_string[1:])
845 for arg_line in args_file.read().splitlines():
846 for arg in self.convert_arg_line_to_args(arg_line):
847 arg_strings.append(arg)
848 arg_strings = self._read_args_from_files(arg_strings)
849 new_arg_strings.extend(arg_strings)
853 err = sys.exc_info()[1]
856 # return the modified argument list
857 return new_arg_strings
859 def convert_arg_line_to_args(self, arg_line):
860 return [arg for arg in arg_line.split() if arg.strip()]
863 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
865 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
866 parser.add_argument('--version', action='version', version=p2pool.__version__)
867 parser.add_argument('--net',
868 help='use specified network (default: bitcoin)',
869 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
870 parser.add_argument('--testnet',
871 help='''use the network's testnet''',
872 action='store_const', const=True, default=False, dest='testnet')
873 parser.add_argument('--debug',
874 help='enable debugging mode',
875 action='store_const', const=True, default=False, dest='debug')
876 parser.add_argument('-a', '--address',
877 help='generate payouts to this address (default: <address requested from bitcoind>)',
878 type=str, action='store', default=None, dest='address')
879 parser.add_argument('--datadir',
880 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
881 type=str, action='store', default=None, dest='datadir')
882 parser.add_argument('--logfile',
883 help='''log to this file (default: data/<NET>/log)''',
884 type=str, action='store', default=None, dest='logfile')
885 parser.add_argument('--merged',
886 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
887 type=str, action='append', default=[], dest='merged_urls')
888 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
889 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
890 type=float, action='store', default=0.5, dest='donation_percentage')
891 parser.add_argument('--irc-announce',
892 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
893 action='store_true', default=False, dest='irc_announce')
895 p2pool_group = parser.add_argument_group('p2pool interface')
896 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
897 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
898 type=int, action='store', default=None, dest='p2pool_port')
899 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
900 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
901 type=str, action='append', default=[], dest='p2pool_nodes')
902 parser.add_argument('--disable-upnp',
903 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
904 action='store_false', default=True, dest='upnp')
905 p2pool_group.add_argument('--max-conns', metavar='CONNS',
906 help='maximum incoming connections (default: 40)',
907 type=int, action='store', default=40, dest='p2pool_conns')
909 worker_group = parser.add_argument_group('worker interface')
910 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
911 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
912 type=str, action='store', default=None, dest='worker_endpoint')
913 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
914 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
915 type=float, action='store', default=0, dest='worker_fee')
917 bitcoind_group = parser.add_argument_group('bitcoind interface')
918 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
919 help='connect to this address (default: 127.0.0.1)',
920 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
921 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
922 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
923 type=int, action='store', default=None, dest='bitcoind_rpc_port')
924 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
925 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
926 type=int, action='store', default=None, dest='bitcoind_p2p_port')
928 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
929 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
930 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
932 args = parser.parse_args()
937 net_name = args.net_name + ('_testnet' if args.testnet else '')
938 net = networks.nets[net_name]
940 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
941 if not os.path.exists(datadir_path):
942 os.makedirs(datadir_path)
944 if len(args.bitcoind_rpc_userpass) > 2:
945 parser.error('a maximum of two arguments are allowed')
946 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
948 if args.bitcoind_rpc_password is None:
949 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
950 parser.error('This network has no configuration file function. Manually enter your RPC password.')
951 conf_path = net.PARENT.CONF_FILE_FUNC()
952 if not os.path.exists(conf_path):
953 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
954 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
957 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
958 with open(conf_path, 'rb') as f:
959 cp = ConfigParser.RawConfigParser()
960 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
961 for conf_name, var_name, var_type in [
962 ('rpcuser', 'bitcoind_rpc_username', str),
963 ('rpcpassword', 'bitcoind_rpc_password', str),
964 ('rpcport', 'bitcoind_rpc_port', int),
965 ('port', 'bitcoind_p2p_port', int),
967 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
968 setattr(args, var_name, var_type(cp.get('x', conf_name)))
969 if args.bitcoind_rpc_password is None:
970 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
972 if args.bitcoind_rpc_username is None:
973 args.bitcoind_rpc_username = ''
975 if args.bitcoind_rpc_port is None:
976 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
978 if args.bitcoind_p2p_port is None:
979 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
981 if args.p2pool_port is None:
982 args.p2pool_port = net.P2P_PORT
984 if args.worker_endpoint is None:
985 worker_endpoint = '', net.WORKER_PORT
986 elif ':' not in args.worker_endpoint:
987 worker_endpoint = '', int(args.worker_endpoint)
989 addr, port = args.worker_endpoint.rsplit(':', 1)
990 worker_endpoint = addr, int(port)
992 if args.address is not None:
994 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
996 parser.error('error parsing address: ' + repr(e))
998 args.pubkey_hash = None
1000 def separate_url(url):
1001 s = urlparse.urlsplit(url)
1002 if '@' not in s.netloc:
1003 parser.error('merged url netloc must contain an "@"')
1004 userpass, new_netloc = s.netloc.rsplit('@', 1)
1005 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1006 merged_urls = map(separate_url, args.merged_urls)
1008 if args.logfile is None:
1009 args.logfile = os.path.join(datadir_path, 'log')
1011 logfile = logging.LogFile(args.logfile)
1012 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1013 sys.stdout = logging.AbortPipe(pipe)
1014 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1015 if hasattr(signal, "SIGUSR1"):
1016 def sigusr1(signum, frame):
1017 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1019 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1020 signal.signal(signal.SIGUSR1, sigusr1)
1021 task.LoopingCall(logfile.reopen).start(5)
1023 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)