1 from __future__ import division
16 if '--iocp' in sys.argv:
17 from twisted.internet import iocpreactor
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
34 work = yield bitcoind.rpc_getmemorypool()
35 except jsonrpc.Error, e:
36 if e.code == -32601: # Method not found
37 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38 raise deferral.RetrySilentlyException()
40 packed_transactions = [x.decode('hex') for x in work['transactions']]
41 defer.returnValue(dict(
42 version=work['version'],
43 previous_block_hash=int(work['previousblockhash'], 16),
44 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46 subsidy=work['coinbasevalue'],
48 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55 print 'p2pool (version %s)' % (p2pool.__version__,)
58 # connect to bitcoind over JSON-RPC and do initial getmemorypool
59 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62 @deferral.retry('Error while checking Bitcoin connection:', 1)
63 @defer.inlineCallbacks
65 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67 raise deferral.RetrySilentlyException()
68 temp_work = yield getwork(bitcoind)
69 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71 raise deferral.RetrySilentlyException()
72 defer.returnValue(temp_work)
73 temp_work = yield check()
75 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
78 # connect to bitcoind over bitcoin-p2p
79 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80 factory = bitcoin_p2p.ClientFactory(net.PARENT)
81 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82 yield factory.getProtocol() # waits until handshake is successful
86 print 'Determining payout address...'
87 if args.pubkey_hash is None:
88 address_path = os.path.join(datadir_path, 'cached_payout_address')
90 if os.path.exists(address_path):
91 with open(address_path, 'rb') as f:
92 address = f.read().strip('\r\n')
93 print ' Loaded cached address: %s...' % (address,)
97 if address is not None:
98 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
99 if not res['isvalid'] or not res['ismine']:
100 print ' Cached address is either invalid or not controlled by local bitcoind!'
104 print ' Getting payout address from bitcoind...'
105 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
107 with open(address_path, 'wb') as f:
110 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
112 my_pubkey_hash = args.pubkey_hash
113 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
116 my_share_hashes = set()
117 my_doa_share_hashes = set()
119 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
120 shared_share_hashes = set()
121 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
122 known_verified = set()
123 print "Loading shares..."
124 for i, (mode, contents) in enumerate(ss.get_shares()):
126 if contents.hash in tracker.shares:
128 shared_share_hashes.add(contents.hash)
129 contents.time_seen = 0
130 tracker.add(contents)
131 if len(tracker.shares) % 1000 == 0 and tracker.shares:
132 print " %i" % (len(tracker.shares),)
133 elif mode == 'verified_hash':
134 known_verified.add(contents)
136 raise AssertionError()
137 print " ...inserting %i verified shares..." % (len(known_verified),)
138 for h in known_verified:
139 if h not in tracker.shares:
140 ss.forget_verified_share(h)
142 tracker.verified.add(tracker.shares[h])
143 print " ...done loading %i shares!" % (len(tracker.shares),)
145 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
146 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
147 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
149 print 'Initializing work...'
154 bitcoind_work = variable.Variable(None)
156 @defer.inlineCallbacks
158 work = yield getwork(bitcoind)
159 bitcoind_work.set(dict(
160 version=work['version'],
161 previous_block=work['previous_block_hash'],
163 coinbaseflags=work['coinbaseflags'],
165 transactions=work['transactions'],
166 merkle_link=work['merkle_link'],
167 subsidy=work['subsidy'],
168 clock_offset=time.time() - work['time'],
169 last_update=time.time(),
171 yield poll_bitcoind()
173 @defer.inlineCallbacks
176 flag = factory.new_block.get_deferred()
178 yield poll_bitcoind()
181 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
186 merged_work = variable.Variable({})
188 @defer.inlineCallbacks
189 def set_merged_work(merged_url, merged_userpass):
190 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
192 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
193 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
194 hash=int(auxblock['hash'], 16),
195 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
196 merged_proxy=merged_proxy,
198 yield deferral.sleep(1)
199 for merged_url, merged_userpass in merged_urls:
200 set_merged_work(merged_url, merged_userpass)
202 @merged_work.changed.watch
203 def _(new_merged_work):
204 print 'Got new merged mining work!'
208 current_work = variable.Variable(None)
210 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
211 requested = expiring_dict.ExpiringDict(300)
212 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
214 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
216 t = dict(bitcoind_work.value)
217 t['best_share_hash'] = best
218 t['mm_chains'] = merged_work.value
222 for peer2, share_hash in desired:
223 if share_hash not in tracker.tails: # was received in the time tracker.think was running
225 last_request_time, count = requested.get(share_hash, (None, 0))
226 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
228 potential_peers = set()
229 for head in tracker.tails[share_hash]:
230 potential_peers.update(peer_heads.get(head, set()))
231 potential_peers = [peer for peer in potential_peers if peer.connected2]
232 if count == 0 and peer2 is not None and peer2.connected2:
235 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
239 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
243 stops=list(set(tracker.heads) | set(
244 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
247 requested[share_hash] = t, count + 1
248 bitcoind_work.changed.watch(lambda _: compute_work())
249 merged_work.changed.watch(lambda _: compute_work())
254 lp_signal = variable.Event()
256 @current_work.transitioned.watch
257 def _(before, after):
258 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']):
265 # setup p2p logic and join p2pool network
267 class Node(p2p.Node):
268 def handle_shares(self, shares, peer):
270 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
274 if share.hash in tracker.shares:
275 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
280 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
284 if shares and peer is not None:
285 peer_heads.setdefault(shares[0].hash, set()).add(peer)
291 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
293 def handle_share_hashes(self, hashes, peer):
296 for share_hash in hashes:
297 if share_hash in tracker.shares:
299 last_request_time, count = requested.get(share_hash, (None, 0))
300 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
302 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
303 get_hashes.append(share_hash)
304 requested[share_hash] = t, count + 1
306 if hashes and peer is not None:
307 peer_heads.setdefault(hashes[0], set()).add(peer)
309 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
311 def handle_get_shares(self, hashes, parents, stops, peer):
312 parents = min(parents, 1000//len(hashes))
315 for share_hash in hashes:
316 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
317 if share.hash in stops:
320 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
323 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
324 def submit_block_p2p(block):
325 if factory.conn.value is None:
326 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
327 raise deferral.RetrySilentlyException()
328 factory.conn.value.send_block(block=block)
330 @deferral.retry('Error submitting block: (will retry)', 10, 10)
331 @defer.inlineCallbacks
332 def submit_block_rpc(block, ignore_failure):
333 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
334 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
335 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
336 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
338 def submit_block(block, ignore_failure):
339 submit_block_p2p(block)
340 submit_block_rpc(block, ignore_failure)
342 @tracker.verified.added.watch
344 if share.pow_hash <= share.header['bits'].target:
345 submit_block(share.as_block(tracker), ignore_failure=True)
347 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
350 if (get_height_rel_highest(share.header['previous_block']) > -5 or
351 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
352 broadcast_share(share.hash)
354 reactor.callLater(5, spread) # so get_height_rel_highest can update
356 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
358 @defer.inlineCallbacks
361 ip, port = x.split(':')
362 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
364 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
367 if os.path.exists(os.path.join(datadir_path, 'addrs')):
369 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
370 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
372 print >>sys.stderr, 'error parsing addrs'
373 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
375 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
377 print >>sys.stderr, "error reading addrs.txt"
378 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
381 if addr not in addrs:
382 addrs[addr] = (0, time.time(), time.time())
386 connect_addrs = set()
387 for addr_df in map(parse, args.p2pool_nodes):
389 connect_addrs.add((yield addr_df))
394 best_share_hash_func=lambda: current_work.value['best_share_hash'],
395 port=args.p2pool_port,
398 connect_addrs=connect_addrs,
399 max_incoming_conns=args.p2pool_conns,
404 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
405 f.write(json.dumps(p2p_node.addr_store.items()))
406 task.LoopingCall(save_addrs).start(60)
408 best_block = variable.Variable(None)
409 @bitcoind_work.changed.watch
411 best_block.set(work['previous_block'])
412 @best_block.changed.watch
413 @defer.inlineCallbacks
415 header = yield factory.conn.value.get_block_header(block_hash)
416 for peer in p2p_node.peers.itervalues():
417 peer.send_bestblock(header=header)
419 def broadcast_share(share_hash):
421 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
422 if share.hash in shared_share_hashes:
424 shared_share_hashes.add(share.hash)
427 for peer in p2p_node.peers.itervalues():
428 peer.sendShares([share for share in shares if share.peer is not peer])
430 # send share when the chain changes to their chain
431 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
434 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
436 if share.hash in tracker.verified.shares:
437 ss.add_verified_hash(share.hash)
438 task.LoopingCall(save_shares).start(60)
444 @defer.inlineCallbacks
448 is_lan, lan_ip = yield ipdiscover.get_local_ip()
450 pm = yield portmapper.get_port_mapper()
451 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
452 except defer.TimeoutError:
456 log.err(None, 'UPnP error:')
457 yield deferral.sleep(random.expovariate(1/120))
460 # start listening for workers with a JSON-RPC server
462 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
466 removed_unstales_var = variable.Variable((0, 0, 0))
467 removed_doa_unstales_var = variable.Variable(0)
468 @tracker.verified.removed.watch
470 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
471 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
472 removed_unstales_var.set((
473 removed_unstales_var.value[0] + 1,
474 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
475 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
477 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
478 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
480 def get_stale_counts():
481 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
482 my_shares = len(my_share_hashes)
483 my_doa_shares = len(my_doa_share_hashes)
484 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
485 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
486 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
487 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
488 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
490 my_shares_not_in_chain = my_shares - my_shares_in_chain
491 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
493 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
496 pseudoshare_received = variable.Event()
497 share_received = variable.Event()
498 local_rate_monitor = math.RateMonitor(10*60)
500 class WorkerBridge(worker_interface.WorkerBridge):
502 worker_interface.WorkerBridge.__init__(self)
503 self.new_work_event = lp_signal
504 self.recent_shares_ts_work = []
506 def get_user_details(self, request):
507 user = request.getUser() if request.getUser() is not None else ''
509 desired_pseudoshare_target = None
511 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
513 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
517 desired_share_target = 2**256 - 1
519 user, min_diff_str = user.rsplit('/', 1)
521 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
525 if random.uniform(0, 100) < args.worker_fee:
526 pubkey_hash = my_pubkey_hash
529 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
531 pubkey_hash = my_pubkey_hash
533 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
535 def preprocess_request(self, request):
536 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
537 return pubkey_hash, desired_share_target, desired_pseudoshare_target
539 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
540 if len(p2p_node.peers) == 0 and net.PERSIST:
541 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
542 if current_work.value['best_share_hash'] is None and net.PERSIST:
543 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
544 if time.time() > current_work.value['last_update'] + 60:
545 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
547 if current_work.value['mm_chains']:
548 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
549 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
550 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
551 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
555 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
561 share_info, generate_tx = p2pool_data.Share.generate_transaction(
564 previous_share_hash=current_work.value['best_share_hash'],
565 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
566 nonce=random.randrange(2**32),
567 pubkey_hash=pubkey_hash,
568 subsidy=current_work.value['subsidy'],
569 donation=math.perfect_round(65535*args.donation_percentage/100),
570 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
571 'orphan' if orphans > orphans_recorded_in_chain else
572 'doa' if doas > doas_recorded_in_chain else
574 )(*get_stale_counts()),
577 block_target=current_work.value['bits'].target,
578 desired_timestamp=int(time.time() - current_work.value['clock_offset']),
579 desired_target=desired_share_target,
580 ref_merkle_link=dict(branch=[], index=0),
584 if desired_pseudoshare_target is None:
586 if len(self.recent_shares_ts_work) == 50:
587 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
589 target = min(target, int(2**256/hash_rate))
591 target = desired_pseudoshare_target
592 target = max(target, share_info['bits'].target)
593 for aux_work in current_work.value['mm_chains'].itervalues():
594 target = max(target, aux_work['target'])
595 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
597 transactions = [generate_tx] + list(current_work.value['transactions'])
598 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
599 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
601 getwork_time = time.time()
602 merkle_link = current_work.value['merkle_link']
604 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
605 bitcoin_data.target_to_difficulty(target),
606 bitcoin_data.target_to_difficulty(share_info['bits'].target),
607 current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
608 len(current_work.value['transactions']),
611 bits = current_work.value['bits']
612 previous_block = current_work.value['previous_block']
613 ba = bitcoin_getwork.BlockAttempt(
614 version=current_work.value['version'],
615 previous_block=current_work.value['previous_block'],
616 merkle_root=merkle_root,
617 timestamp=current_work.value['time'],
618 bits=current_work.value['bits'],
622 received_header_hashes = set()
624 def got_response(header, request):
625 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
626 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
628 if pow_hash <= header['bits'].target or p2pool.DEBUG:
629 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
630 if pow_hash <= header['bits'].target:
632 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
635 log.err(None, 'Error while processing potential block:')
637 user, _, _, _ = self.get_user_details(request)
638 assert header['merkle_root'] == merkle_root
639 assert header['previous_block'] == previous_block
640 assert header['bits'] == bits
642 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
644 for aux_work, index, hashes in mm_later:
646 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
647 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
648 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
649 bitcoin_data.aux_pow_type.pack(dict(
652 block_hash=header_hash,
653 merkle_link=merkle_link,
655 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
656 parent_block_header=header,
661 if result != (pow_hash <= aux_work['target']):
662 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
664 print 'Merged block submittal result: %s' % (result,)
667 log.err(err, 'Error submitting merged block:')
669 log.err(None, 'Error while processing merged mining POW:')
671 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
672 min_header = dict(header);del min_header['merkle_root']
673 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
674 share = p2pool_data.Share(net, None, dict(
675 min_header=min_header, share_info=share_info, hash_link=hash_link,
676 ref_merkle_link=dict(branch=[], index=0),
677 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
679 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
681 p2pool_data.format_hash(share.hash),
682 p2pool_data.format_hash(share.previous_hash),
683 time.time() - getwork_time,
684 ' DEAD ON ARRIVAL' if not on_time else '',
686 my_share_hashes.add(share.hash)
688 my_doa_share_hashes.add(share.hash)
692 tracker.verified.add(share)
696 if pow_hash <= header['bits'].target or p2pool.DEBUG:
697 for peer in p2p_node.peers.itervalues():
698 peer.sendShares([share])
699 shared_share_hashes.add(share.hash)
701 log.err(None, 'Error forwarding block solution:')
703 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
705 if pow_hash > target:
706 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
707 print ' Hash: %56x' % (pow_hash,)
708 print ' Target: %56x' % (target,)
709 elif header_hash in received_header_hashes:
710 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
712 received_header_hashes.add(header_hash)
714 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
715 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
716 while len(self.recent_shares_ts_work) > 50:
717 self.recent_shares_ts_work.pop(0)
718 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
722 return ba, got_response
724 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
726 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
727 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
729 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
731 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
739 print 'Started successfully!'
740 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
741 if args.donation_percentage > 0.51:
742 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
743 elif args.donation_percentage < 0.49:
744 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
746 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
747 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
751 if hasattr(signal, 'SIGALRM'):
752 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
753 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
755 signal.siginterrupt(signal.SIGALRM, False)
756 task.LoopingCall(signal.alarm, 30).start(1)
758 if args.irc_announce:
759 from twisted.words.protocols import irc
760 class IRCClient(irc.IRCClient):
761 nickname = 'p2pool%02i' % (random.randrange(100),)
762 channel = net.ANNOUNCE_CHANNEL
763 def lineReceived(self, line):
766 irc.IRCClient.lineReceived(self, line)
768 irc.IRCClient.signedOn(self)
769 self.factory.resetDelay()
770 self.join(self.channel)
771 @defer.inlineCallbacks
772 def new_share(share):
773 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
774 yield deferral.sleep(random.expovariate(1/60))
775 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
776 if message not in self.recent_messages:
777 self.say(self.channel, message)
778 self._remember_message(message)
779 self.watch_id = tracker.verified.added.watch(new_share)
780 self.recent_messages = []
781 def _remember_message(self, message):
782 self.recent_messages.append(message)
783 while len(self.recent_messages) > 100:
784 self.recent_messages.pop(0)
785 def privmsg(self, user, channel, message):
786 if channel == self.channel:
787 self._remember_message(message)
788 def connectionLost(self, reason):
789 tracker.verified.added.unwatch(self.watch_id)
790 print 'IRC connection lost:', reason.getErrorMessage()
791 class IRCClientFactory(protocol.ReconnectingClientFactory):
793 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
795 @defer.inlineCallbacks
800 yield deferral.sleep(3)
802 if time.time() > current_work.value['last_update'] + 60:
803 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
805 height = tracker.get_height(current_work.value['best_share_hash'])
806 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
808 len(tracker.verified.shares),
811 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
812 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
814 datums, dt = local_rate_monitor.get_datums_in_last()
815 my_att_s = sum(datum['work']/dt for datum in datums)
816 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
817 math.format(int(my_att_s)),
819 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
820 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
824 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
825 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
826 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
828 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
829 shares, stale_orphan_shares, stale_doa_shares,
830 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
831 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
832 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
834 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
835 math.format(int(real_att_s)),
837 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
840 for warning in p2pool_data.get_warnings(tracker, current_work, net):
841 print >>sys.stderr, '#'*40
842 print >>sys.stderr, '>>> Warning: ' + warning
843 print >>sys.stderr, '#'*40
845 if this_str != last_str or time.time() > last_time + 15:
848 last_time = time.time()
854 log.err(None, 'Fatal error:')
857 class FixedArgumentParser(argparse.ArgumentParser):
858 def _read_args_from_files(self, arg_strings):
859 # expand arguments referencing files
861 for arg_string in arg_strings:
863 # for regular arguments, just add them back into the list
864 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
865 new_arg_strings.append(arg_string)
867 # replace arguments referencing files with the file content
870 args_file = open(arg_string[1:])
873 for arg_line in args_file.read().splitlines():
874 for arg in self.convert_arg_line_to_args(arg_line):
875 arg_strings.append(arg)
876 arg_strings = self._read_args_from_files(arg_strings)
877 new_arg_strings.extend(arg_strings)
881 err = sys.exc_info()[1]
884 # return the modified argument list
885 return new_arg_strings
887 def convert_arg_line_to_args(self, arg_line):
888 return [arg for arg in arg_line.split() if arg.strip()]
891 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
893 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
894 parser.add_argument('--version', action='version', version=p2pool.__version__)
895 parser.add_argument('--net',
896 help='use specified network (default: bitcoin)',
897 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
898 parser.add_argument('--testnet',
899 help='''use the network's testnet''',
900 action='store_const', const=True, default=False, dest='testnet')
901 parser.add_argument('--debug',
902 help='enable debugging mode',
903 action='store_const', const=True, default=False, dest='debug')
904 parser.add_argument('-a', '--address',
905 help='generate payouts to this address (default: <address requested from bitcoind>)',
906 type=str, action='store', default=None, dest='address')
907 parser.add_argument('--datadir',
908 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
909 type=str, action='store', default=None, dest='datadir')
910 parser.add_argument('--logfile',
911 help='''log to this file (default: data/<NET>/log)''',
912 type=str, action='store', default=None, dest='logfile')
913 parser.add_argument('--merged',
914 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
915 type=str, action='append', default=[], dest='merged_urls')
916 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
917 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
918 type=float, action='store', default=0.5, dest='donation_percentage')
919 parser.add_argument('--iocp',
920 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
921 action='store_true', default=False, dest='iocp')
922 parser.add_argument('--irc-announce',
923 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
924 action='store_true', default=False, dest='irc_announce')
925 parser.add_argument('--no-bugreport',
926 help='disable submitting caught exceptions to the author',
927 action='store_true', default=False, dest='no_bugreport')
929 p2pool_group = parser.add_argument_group('p2pool interface')
930 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
931 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
932 type=int, action='store', default=None, dest='p2pool_port')
933 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
934 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
935 type=str, action='append', default=[], dest='p2pool_nodes')
936 parser.add_argument('--disable-upnp',
937 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
938 action='store_false', default=True, dest='upnp')
939 p2pool_group.add_argument('--max-conns', metavar='CONNS',
940 help='maximum incoming connections (default: 40)',
941 type=int, action='store', default=40, dest='p2pool_conns')
943 worker_group = parser.add_argument_group('worker interface')
944 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
945 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
946 type=str, action='store', default=None, dest='worker_endpoint')
947 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
948 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
949 type=float, action='store', default=0, dest='worker_fee')
951 bitcoind_group = parser.add_argument_group('bitcoind interface')
952 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
953 help='connect to this address (default: 127.0.0.1)',
954 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
955 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
956 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
957 type=int, action='store', default=None, dest='bitcoind_rpc_port')
958 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
959 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
960 type=int, action='store', default=None, dest='bitcoind_p2p_port')
962 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
963 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
964 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
966 args = parser.parse_args()
971 net_name = args.net_name + ('_testnet' if args.testnet else '')
972 net = networks.nets[net_name]
974 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
975 if not os.path.exists(datadir_path):
976 os.makedirs(datadir_path)
978 if len(args.bitcoind_rpc_userpass) > 2:
979 parser.error('a maximum of two arguments are allowed')
980 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
982 if args.bitcoind_rpc_password is None:
983 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
984 parser.error('This network has no configuration file function. Manually enter your RPC password.')
985 conf_path = net.PARENT.CONF_FILE_FUNC()
986 if not os.path.exists(conf_path):
987 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
988 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
991 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
992 with open(conf_path, 'rb') as f:
993 cp = ConfigParser.RawConfigParser()
994 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
995 for conf_name, var_name, var_type in [
996 ('rpcuser', 'bitcoind_rpc_username', str),
997 ('rpcpassword', 'bitcoind_rpc_password', str),
998 ('rpcport', 'bitcoind_rpc_port', int),
999 ('port', 'bitcoind_p2p_port', int),
1001 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1002 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1003 if args.bitcoind_rpc_password is None:
1004 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1006 if args.bitcoind_rpc_username is None:
1007 args.bitcoind_rpc_username = ''
1009 if args.bitcoind_rpc_port is None:
1010 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1012 if args.bitcoind_p2p_port is None:
1013 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1015 if args.p2pool_port is None:
1016 args.p2pool_port = net.P2P_PORT
1018 if args.worker_endpoint is None:
1019 worker_endpoint = '', net.WORKER_PORT
1020 elif ':' not in args.worker_endpoint:
1021 worker_endpoint = '', int(args.worker_endpoint)
1023 addr, port = args.worker_endpoint.rsplit(':', 1)
1024 worker_endpoint = addr, int(port)
1026 if args.address is not None:
1028 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1029 except Exception, e:
1030 parser.error('error parsing address: ' + repr(e))
1032 args.pubkey_hash = None
1034 def separate_url(url):
1035 s = urlparse.urlsplit(url)
1036 if '@' not in s.netloc:
1037 parser.error('merged url netloc must contain an "@"')
1038 userpass, new_netloc = s.netloc.rsplit('@', 1)
1039 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1040 merged_urls = map(separate_url, args.merged_urls)
1042 if args.logfile is None:
1043 args.logfile = os.path.join(datadir_path, 'log')
1045 logfile = logging.LogFile(args.logfile)
1046 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1047 sys.stdout = logging.AbortPipe(pipe)
1048 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1049 if hasattr(signal, "SIGUSR1"):
1050 def sigusr1(signum, frame):
1051 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1053 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1054 signal.signal(signal.SIGUSR1, sigusr1)
1055 task.LoopingCall(logfile.reopen).start(5)
1057 class ErrorReporter(object):
1059 self.last_sent = None
1061 def emit(self, eventDict):
1062 if not eventDict["isError"]:
1065 if self.last_sent is not None and time.time() < self.last_sent + 5:
1067 self.last_sent = time.time()
1069 if 'failure' in eventDict:
1070 text = ((eventDict.get('why') or 'Unhandled Error')
1071 + '\n' + eventDict['failure'].getTraceback())
1073 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1075 from twisted.web import client
1077 url='http://u.forre.st/p2pool_error.cgi',
1079 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1081 ).addBoth(lambda x: None)
1082 if not args.no_bugreport:
1083 log.addObserver(ErrorReporter().emit)
1085 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)