1 from __future__ import division
14 from twisted.internet import defer, reactor, protocol, task
15 from twisted.web import server
16 from twisted.python import log
17 from nattraverso import portmapper, ipdiscover
19 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
20 from bitcoin import worker_interface
21 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
22 from . import p2p, networks, web
23 import p2pool, p2pool.data as p2pool_data
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
29 work = yield bitcoind.rpc_getmemorypool()
30 except jsonrpc.Error, e:
31 if e.code == -32601: # Method not found
32 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
33 raise deferral.RetrySilentlyException()
35 packed_transactions = [x.decode('hex') for x in work['transactions']]
36 defer.returnValue(dict(
37 version=work['version'],
38 previous_block_hash=int(work['previousblockhash'], 16),
39 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
40 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
41 subsidy=work['coinbasevalue'],
43 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
44 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
47 @defer.inlineCallbacks
48 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50 print 'p2pool (version %s)' % (p2pool.__version__,)
53 # connect to bitcoind over JSON-RPC and do initial getmemorypool
54 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
55 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
56 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
57 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61 temp_work = yield getwork(bitcoind)
63 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
66 # connect to bitcoind over bitcoin-p2p
67 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
68 factory = bitcoin_p2p.ClientFactory(net.PARENT)
69 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
70 yield factory.getProtocol() # waits until handshake is successful
74 print 'Determining payout address...'
75 if args.pubkey_hash is None:
76 address_path = os.path.join(datadir_path, 'cached_payout_address')
78 if os.path.exists(address_path):
79 with open(address_path, 'rb') as f:
80 address = f.read().strip('\r\n')
81 print ' Loaded cached address: %s...' % (address,)
85 if address is not None:
86 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
87 if not res['isvalid'] or not res['ismine']:
88 print ' Cached address is either invalid or not controlled by local bitcoind!'
92 print ' Getting payout address from bitcoind...'
93 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95 with open(address_path, 'wb') as f:
98 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100 my_pubkey_hash = args.pubkey_hash
101 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
104 my_share_hashes = set()
105 my_doa_share_hashes = set()
107 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
108 shared_share_hashes = set()
109 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
110 known_verified = set()
112 print "Loading shares..."
113 for i, (mode, contents) in enumerate(ss.get_shares()):
115 if contents.hash in tracker.shares:
117 shared_share_hashes.add(contents.hash)
118 contents.time_seen = 0
119 tracker.add(contents)
120 if len(tracker.shares) % 1000 == 0 and tracker.shares:
121 print " %i" % (len(tracker.shares),)
122 elif mode == 'verified_hash':
123 known_verified.add(contents)
125 raise AssertionError()
126 print " ...inserting %i verified shares..." % (len(known_verified),)
127 for h in known_verified:
128 if h not in tracker.shares:
129 ss.forget_verified_share(h)
131 tracker.verified.add(tracker.shares[h])
132 print " ...done loading %i shares!" % (len(tracker.shares),)
134 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
135 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
136 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140 pre_current_work = variable.Variable(None)
141 pre_merged_work = variable.Variable({})
142 # information affecting work that should trigger a long-polling update
143 current_work = variable.Variable(None)
144 # information affecting work that should not trigger a long-polling update
145 current_work2 = variable.Variable(None)
147 requested = expiring_dict.ExpiringDict(300)
149 print 'Initializing work...'
150 @defer.inlineCallbacks
151 def set_real_work1():
152 work = yield getwork(bitcoind)
153 current_work2.set(dict(
155 transactions=work['transactions'],
156 merkle_branch=work['merkle_branch'],
157 subsidy=work['subsidy'],
158 clock_offset=time.time() - work['time'],
159 last_update=time.time(),
160 )) # second set first because everything hooks on the first
161 pre_current_work.set(dict(
162 version=work['version'],
163 previous_block=work['previous_block_hash'],
165 coinbaseflags=work['coinbaseflags'],
167 yield set_real_work1()
169 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
170 @deferral.DeferredCacher
171 @defer.inlineCallbacks
172 def height_cacher(block_hash):
174 x = yield bitcoind.rpc_getblock('%x' % (block_hash,))
175 except jsonrpc.Error, e:
176 if e.code == -5 and not p2pool.DEBUG:
177 raise deferral.RetrySilentlyException()
179 defer.returnValue(x['blockcount'] if 'blockcount' in x else x['height'])
180 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
181 def get_height_rel_highest(block_hash):
182 this_height = height_cacher.call_now(block_hash, 0)
183 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
184 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
185 return this_height - best_height_cached.value
187 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
189 def set_real_work2():
190 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
192 t = dict(pre_current_work.value)
193 t['best_share_hash'] = best
194 t['mm_chains'] = pre_merged_work.value
198 for peer2, share_hash in desired:
199 if share_hash not in tracker.tails: # was received in the time tracker.think was running
201 last_request_time, count = requested.get(share_hash, (None, 0))
202 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
204 potential_peers = set()
205 for head in tracker.tails[share_hash]:
206 potential_peers.update(peer_heads.get(head, set()))
207 potential_peers = [peer for peer in potential_peers if peer.connected2]
208 if count == 0 and peer2 is not None and peer2.connected2:
211 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
215 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
219 stops=list(set(tracker.heads) | set(
220 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
223 requested[share_hash] = t, count + 1
224 pre_current_work.changed.watch(lambda _: set_real_work2())
225 pre_merged_work.changed.watch(lambda _: set_real_work2())
231 @defer.inlineCallbacks
232 def set_merged_work(merged_url, merged_userpass):
233 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
235 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
236 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
237 hash=int(auxblock['hash'], 16),
238 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
239 merged_proxy=merged_proxy,
241 yield deferral.sleep(1)
242 for merged_url, merged_userpass in merged_urls:
243 set_merged_work(merged_url, merged_userpass)
245 @pre_merged_work.changed.watch
246 def _(new_merged_work):
247 print 'Got new merged mining work!'
249 # setup p2p logic and join p2pool network
251 class Node(p2p.Node):
252 def handle_shares(self, shares, peer):
254 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
258 if share.hash in tracker.shares:
259 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
264 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
268 if shares and peer is not None:
269 peer_heads.setdefault(shares[0].hash, set()).add(peer)
275 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
277 def handle_share_hashes(self, hashes, peer):
280 for share_hash in hashes:
281 if share_hash in tracker.shares:
283 last_request_time, count = requested.get(share_hash, (None, 0))
284 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
286 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
287 get_hashes.append(share_hash)
288 requested[share_hash] = t, count + 1
290 if hashes and peer is not None:
291 peer_heads.setdefault(hashes[0], set()).add(peer)
293 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
295 def handle_get_shares(self, hashes, parents, stops, peer):
296 parents = min(parents, 1000//len(hashes))
299 for share_hash in hashes:
300 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
301 if share.hash in stops:
304 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
305 peer.sendShares(shares)
307 @deferral.retry('Error submitting block: (will retry)', 10, 10)
308 @defer.inlineCallbacks
309 def submit_block(block, ignore_failure):
310 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
312 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
313 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
315 @tracker.verified.added.watch
317 if share.pow_hash <= share.header['bits'].target:
318 submit_block(share.as_block(tracker), ignore_failure=True)
320 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
322 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
324 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
326 @defer.inlineCallbacks
329 ip, port = x.split(':')
330 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
332 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
335 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
337 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
339 print >>sys.stderr, "error reading addrs"
340 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
343 if addr not in addrs:
344 addrs[addr] = (0, time.time(), time.time())
348 connect_addrs = set()
349 for addr_df in map(parse, args.p2pool_nodes):
351 connect_addrs.add((yield addr_df))
356 best_share_hash_func=lambda: current_work.value['best_share_hash'],
357 port=args.p2pool_port,
360 connect_addrs=connect_addrs,
364 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
366 # send share when the chain changes to their chain
367 def work_changed(new_work):
368 #print 'Work changed:', new_work
370 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
371 if share.hash in shared_share_hashes:
373 shared_share_hashes.add(share.hash)
376 for peer in p2p_node.peers.itervalues():
377 peer.sendShares([share for share in shares if share.peer is not peer])
379 current_work.changed.watch(work_changed)
382 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
384 if share.hash in tracker.verified.shares:
385 ss.add_verified_hash(share.hash)
386 task.LoopingCall(save_shares).start(60)
392 @defer.inlineCallbacks
396 is_lan, lan_ip = yield ipdiscover.get_local_ip()
398 pm = yield portmapper.get_port_mapper()
399 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
400 except defer.TimeoutError:
404 log.err(None, 'UPnP error:')
405 yield deferral.sleep(random.expovariate(1/120))
408 # start listening for workers with a JSON-RPC server
410 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
412 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
413 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
414 vip_pass = f.read().strip('\r\n')
416 vip_pass = '%016x' % (random.randrange(2**64),)
417 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
419 print ' Worker password:', vip_pass, '(only required for generating graphs)'
423 removed_unstales_var = variable.Variable((0, 0, 0))
424 removed_doa_unstales_var = variable.Variable(0)
425 @tracker.verified.removed.watch
427 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
428 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
429 removed_unstales_var.set((
430 removed_unstales_var.value[0] + 1,
431 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
432 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
434 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
435 removed_doa_unstales.set(removed_doa_unstales.value + 1)
437 def get_stale_counts():
438 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
439 my_shares = len(my_share_hashes)
440 my_doa_shares = len(my_doa_share_hashes)
441 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
442 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
443 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
444 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
445 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
447 my_shares_not_in_chain = my_shares - my_shares_in_chain
448 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
450 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
453 pseudoshare_received = variable.Event()
454 local_rate_monitor = math.RateMonitor(10*60)
456 class WorkerBridge(worker_interface.WorkerBridge):
458 worker_interface.WorkerBridge.__init__(self)
459 self.new_work_event = current_work.changed
460 self.recent_shares_ts_work = []
462 def preprocess_request(self, request):
463 user = request.getUser() if request.getUser() is not None else ''
465 desired_pseudoshare_target = None
467 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
469 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
473 desired_share_target = 2**256 - 1
475 user, min_diff_str = user.rsplit('/', 1)
477 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
481 if random.uniform(0, 100) < args.worker_fee:
482 pubkey_hash = my_pubkey_hash
485 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
487 pubkey_hash = my_pubkey_hash
489 return pubkey_hash, desired_share_target, desired_pseudoshare_target
491 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
492 if len(p2p_node.peers) == 0 and net.PERSIST:
493 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
494 if current_work.value['best_share_hash'] is None and net.PERSIST:
495 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
496 if time.time() > current_work2.value['last_update'] + 60:
497 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
499 if current_work.value['mm_chains']:
500 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
501 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
502 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
503 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
507 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
512 share_info, generate_tx = p2pool_data.generate_transaction(
515 previous_share_hash=current_work.value['best_share_hash'],
516 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
517 nonce=random.randrange(2**32),
518 pubkey_hash=pubkey_hash,
519 subsidy=current_work2.value['subsidy'],
520 donation=math.perfect_round(65535*args.donation_percentage/100),
521 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
522 253 if orphans > orphans_recorded_in_chain else
523 254 if doas > doas_recorded_in_chain else
525 )(*get_stale_counts()),
527 block_target=current_work.value['bits'].target,
528 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
529 desired_target=desired_share_target,
533 target = net.PARENT.SANE_MAX_TARGET
534 if desired_pseudoshare_target is None:
535 if len(self.recent_shares_ts_work) == 50:
536 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
537 target = min(target, 2**256//hash_rate)
539 target = min(target, desired_pseudoshare_target)
540 target = max(target, share_info['bits'].target)
541 for aux_work in current_work.value['mm_chains'].itervalues():
542 target = max(target, aux_work['target'])
544 transactions = [generate_tx] + list(current_work2.value['transactions'])
545 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
546 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
548 getwork_time = time.time()
549 merkle_branch = current_work2.value['merkle_branch']
551 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
552 bitcoin_data.target_to_difficulty(target),
553 bitcoin_data.target_to_difficulty(share_info['bits'].target),
554 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
555 len(current_work2.value['transactions']),
558 ba = bitcoin_getwork.BlockAttempt(
559 version=current_work.value['version'],
560 previous_block=current_work.value['previous_block'],
561 merkle_root=merkle_root,
562 timestamp=current_work2.value['time'],
563 bits=current_work.value['bits'],
567 received_header_hashes = set()
569 def got_response(header, request):
570 assert header['merkle_root'] == merkle_root
572 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
573 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
574 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
577 if pow_hash <= header['bits'].target or p2pool.DEBUG:
578 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
579 if pow_hash <= header['bits'].target:
581 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
583 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
585 log.err(None, 'Error while processing potential block:')
587 for aux_work, index, hashes in mm_later:
589 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
590 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
591 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
592 bitcoin_data.aux_pow_type.pack(dict(
595 block_hash=header_hash,
596 merkle_branch=merkle_branch,
599 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
601 parent_block_header=header,
606 if result != (pow_hash <= aux_work['target']):
607 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
609 print 'Merged block submittal result: %s' % (result,)
612 log.err(err, 'Error submitting merged block:')
614 log.err(None, 'Error while processing merged mining POW:')
616 if pow_hash <= share_info['bits'].target:
617 min_header = dict(header);del min_header['merkle_root']
618 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
619 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
621 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
623 p2pool_data.format_hash(share.hash),
624 p2pool_data.format_hash(share.previous_hash),
625 time.time() - getwork_time,
626 ' DEAD ON ARRIVAL' if not on_time else '',
628 my_share_hashes.add(share.hash)
630 my_doa_share_hashes.add(share.hash)
634 tracker.verified.add(share)
638 if pow_hash <= header['bits'].target or p2pool.DEBUG:
639 for peer in p2p_node.peers.itervalues():
640 peer.sendShares([share])
641 shared_share_hashes.add(share.hash)
643 log.err(None, 'Error forwarding block solution:')
645 if pow_hash > target:
646 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
647 print ' Hash: %56x' % (pow_hash,)
648 print ' Target: %56x' % (target,)
649 elif header_hash in received_header_hashes:
650 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
652 received_header_hashes.add(header_hash)
654 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
655 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
656 while len(self.recent_shares_ts_work) > 50:
657 self.recent_shares_ts_work.pop(0)
658 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
662 return ba, got_response
664 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
666 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
667 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
669 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
671 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
678 @defer.inlineCallbacks
681 flag = factory.new_block.get_deferred()
683 yield set_real_work1()
686 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
691 print 'Started successfully!'
695 if hasattr(signal, 'SIGALRM'):
696 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
697 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
699 signal.siginterrupt(signal.SIGALRM, False)
700 task.LoopingCall(signal.alarm, 30).start(1)
702 if args.irc_announce:
703 from twisted.words.protocols import irc
704 class IRCClient(irc.IRCClient):
705 nickname = 'p2pool%02i' % (random.randrange(100),)
706 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
707 def lineReceived(self, line):
709 irc.IRCClient.lineReceived(self, line)
711 irc.IRCClient.signedOn(self)
712 self.factory.resetDelay()
713 self.join(self.channel)
714 self.watch_id = tracker.verified.added.watch(self._new_share)
715 self.announced_hashes = set()
716 self.delayed_messages = {}
717 def privmsg(self, user, channel, message):
718 if channel == self.channel and message in self.delayed_messages:
719 self.delayed_messages.pop(message).cancel()
720 def _new_share(self, share):
721 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
722 self.announced_hashes.add(share.header_hash)
723 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
724 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
725 def connectionLost(self, reason):
726 tracker.verified.added.unwatch(self.watch_id)
727 print 'IRC connection lost:', reason.getErrorMessage()
728 class IRCClientFactory(protocol.ReconnectingClientFactory):
730 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
732 @defer.inlineCallbacks
737 yield deferral.sleep(3)
739 if time.time() > current_work2.value['last_update'] + 60:
740 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
742 height = tracker.get_height(current_work.value['best_share_hash'])
743 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
745 len(tracker.verified.shares),
748 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
749 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
751 datums, dt = local_rate_monitor.get_datums_in_last()
752 my_att_s = sum(datum['work']/dt for datum in datums)
753 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
754 math.format(int(my_att_s)),
756 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
757 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
761 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
762 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
763 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
765 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
766 shares, stale_orphan_shares, stale_doa_shares,
767 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
768 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
769 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
771 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
772 math.format(int(real_att_s)),
774 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
777 if this_str != last_str or time.time() > last_time + 15:
780 last_time = time.time()
785 log.err(None, 'Fatal error:')
789 class FixedArgumentParser(argparse.ArgumentParser):
790 def _read_args_from_files(self, arg_strings):
791 # expand arguments referencing files
793 for arg_string in arg_strings:
795 # for regular arguments, just add them back into the list
796 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
797 new_arg_strings.append(arg_string)
799 # replace arguments referencing files with the file content
802 args_file = open(arg_string[1:])
805 for arg_line in args_file.read().splitlines():
806 for arg in self.convert_arg_line_to_args(arg_line):
807 arg_strings.append(arg)
808 arg_strings = self._read_args_from_files(arg_strings)
809 new_arg_strings.extend(arg_strings)
813 err = sys.exc_info()[1]
816 # return the modified argument list
817 return new_arg_strings
819 def convert_arg_line_to_args(self, arg_line):
820 return [arg for arg in arg_line.split() if arg.strip()]
823 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
825 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
826 parser.add_argument('--version', action='version', version=p2pool.__version__)
827 parser.add_argument('--net',
828 help='use specified network (default: bitcoin)',
829 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
830 parser.add_argument('--testnet',
831 help='''use the network's testnet''',
832 action='store_const', const=True, default=False, dest='testnet')
833 parser.add_argument('--debug',
834 help='enable debugging mode',
835 action='store_const', const=True, default=False, dest='debug')
836 parser.add_argument('-a', '--address',
837 help='generate payouts to this address (default: <address requested from bitcoind>)',
838 type=str, action='store', default=None, dest='address')
839 parser.add_argument('--datadir',
840 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
841 type=str, action='store', default=None, dest='datadir')
842 parser.add_argument('--logfile',
843 help='''log to this file (default: data/<NET>/log)''',
844 type=str, action='store', default=None, dest='logfile')
845 parser.add_argument('--merged',
846 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
847 type=str, action='append', default=[], dest='merged_urls')
848 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
849 help='donate this percentage of work to author of p2pool (default: 0.5)',
850 type=float, action='store', default=0.5, dest='donation_percentage')
851 parser.add_argument('--irc-announce',
852 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
853 action='store_true', default=False, dest='irc_announce')
855 p2pool_group = parser.add_argument_group('p2pool interface')
856 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
857 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
858 type=int, action='store', default=None, dest='p2pool_port')
859 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
860 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
861 type=str, action='append', default=[], dest='p2pool_nodes')
862 parser.add_argument('--disable-upnp',
863 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
864 action='store_false', default=True, dest='upnp')
866 worker_group = parser.add_argument_group('worker interface')
867 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
868 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
869 type=str, action='store', default=None, dest='worker_endpoint')
870 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
871 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
872 type=float, action='store', default=0, dest='worker_fee')
874 bitcoind_group = parser.add_argument_group('bitcoind interface')
875 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
876 help='connect to this address (default: 127.0.0.1)',
877 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
878 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
879 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
880 type=int, action='store', default=None, dest='bitcoind_rpc_port')
881 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
882 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
883 type=int, action='store', default=None, dest='bitcoind_p2p_port')
885 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
886 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
887 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
889 args = parser.parse_args()
894 net_name = args.net_name + ('_testnet' if args.testnet else '')
895 net = networks.nets[net_name]
897 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
898 if not os.path.exists(datadir_path):
899 os.makedirs(datadir_path)
901 if len(args.bitcoind_rpc_userpass) > 2:
902 parser.error('a maximum of two arguments are allowed')
903 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
905 if args.bitcoind_rpc_password is None:
906 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
907 parser.error('This network has no configuration file function. Manually enter your RPC password.')
908 conf_path = net.PARENT.CONF_FILE_FUNC()
909 if not os.path.exists(conf_path):
910 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
911 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
914 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
915 with open(conf_path, 'rb') as f:
916 cp = ConfigParser.RawConfigParser()
917 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
918 for conf_name, var_name, var_type in [
919 ('rpcuser', 'bitcoind_rpc_username', str),
920 ('rpcpassword', 'bitcoind_rpc_password', str),
921 ('rpcport', 'bitcoind_rpc_port', int),
922 ('port', 'bitcoind_p2p_port', int),
924 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
925 setattr(args, var_name, var_type(cp.get('x', conf_name)))
926 if args.bitcoind_rpc_password is None:
927 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
929 if args.bitcoind_rpc_username is None:
930 args.bitcoind_rpc_username = ''
932 if args.bitcoind_rpc_port is None:
933 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
935 if args.bitcoind_p2p_port is None:
936 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
938 if args.p2pool_port is None:
939 args.p2pool_port = net.P2P_PORT
941 if args.worker_endpoint is None:
942 worker_endpoint = '', net.WORKER_PORT
943 elif ':' not in args.worker_endpoint:
944 worker_endpoint = '', int(args.worker_endpoint)
946 addr, port = args.worker_endpoint.rsplit(':', 1)
947 worker_endpoint = addr, int(port)
949 if args.address is not None:
951 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
953 parser.error('error parsing address: ' + repr(e))
955 args.pubkey_hash = None
957 def separate_url(url):
958 s = urlparse.urlsplit(url)
959 if '@' not in s.netloc:
960 parser.error('merged url netloc must contain an "@"')
961 userpass, new_netloc = s.netloc.rsplit('@', 1)
962 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
963 merged_urls = map(separate_url, args.merged_urls)
965 if args.logfile is None:
966 args.logfile = os.path.join(datadir_path, 'log')
968 logfile = logging.LogFile(args.logfile)
969 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
970 sys.stdout = logging.AbortPipe(pipe)
971 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
972 if hasattr(signal, "SIGUSR1"):
973 def sigusr1(signum, frame):
974 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
976 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
977 signal.signal(signal.SIGUSR1, sigusr1)
978 task.LoopingCall(logfile.reopen).start(5)
980 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)