1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173 def get_height_rel_highest(block_hash):
174 this_height = height_cacher.call_now(block_hash, 0)
175 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177 return this_height - best_height_cached.value
179 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
181 def set_real_work2():
182 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184 t = dict(pre_current_work.value)
185 t['best_share_hash'] = best
186 t['mm_chains'] = pre_merged_work.value
190 for peer2, share_hash in desired:
191 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193 last_request_time, count = requested.get(share_hash, (None, 0))
194 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196 potential_peers = set()
197 for head in tracker.tails[share_hash]:
198 potential_peers.update(peer_heads.get(head, set()))
199 potential_peers = [peer for peer in potential_peers if peer.connected2]
200 if count == 0 and peer2 is not None and peer2.connected2:
203 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
211 stops=list(set(tracker.heads) | set(
212 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215 requested[share_hash] = t, count + 1
216 pre_current_work.changed.watch(lambda _: set_real_work2())
217 pre_merged_work.changed.watch(lambda _: set_real_work2())
223 @defer.inlineCallbacks
224 def set_merged_work(merged_url, merged_userpass):
225 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
227 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229 hash=int(auxblock['hash'], 16),
230 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231 merged_proxy=merged_proxy,
233 yield deferral.sleep(1)
234 for merged_url, merged_userpass in merged_urls:
235 set_merged_work(merged_url, merged_userpass)
237 @pre_merged_work.changed.watch
238 def _(new_merged_work):
239 print 'Got new merged mining work!'
241 # setup p2p logic and join p2pool network
243 class Node(p2p.Node):
244 def handle_shares(self, shares, peer):
246 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250 if share.hash in tracker.shares:
251 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260 if shares and peer is not None:
261 peer_heads.setdefault(shares[0].hash, set()).add(peer)
267 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269 def handle_share_hashes(self, hashes, peer):
272 for share_hash in hashes:
273 if share_hash in tracker.shares:
275 last_request_time, count = requested.get(share_hash, (None, 0))
276 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279 get_hashes.append(share_hash)
280 requested[share_hash] = t, count + 1
282 if hashes and peer is not None:
283 peer_heads.setdefault(hashes[0], set()).add(peer)
285 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287 def handle_get_shares(self, hashes, parents, stops, peer):
288 parents = min(parents, 1000//len(hashes))
291 for share_hash in hashes:
292 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293 if share.hash in stops:
296 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 peer.sendShares(shares)
299 @tracker.verified.added.watch
301 if share.pow_hash <= share.header['bits'].target:
302 if factory.conn.value is not None:
303 factory.conn.value.send_block(block=share.as_block(tracker))
305 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
307 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
309 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
311 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
313 @defer.inlineCallbacks
316 ip, port = x.split(':')
317 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
319 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
322 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
324 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
326 print >>sys.stderr, "error reading addrs"
327 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
330 if addr not in addrs:
331 addrs[addr] = (0, time.time(), time.time())
335 connect_addrs = set()
336 for addr_df in map(parse, args.p2pool_nodes):
338 connect_addrs.add((yield addr_df))
343 best_share_hash_func=lambda: current_work.value['best_share_hash'],
344 port=args.p2pool_port,
347 connect_addrs=connect_addrs,
351 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
353 # send share when the chain changes to their chain
354 def work_changed(new_work):
355 #print 'Work changed:', new_work
357 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
358 if share.hash in shared_share_hashes:
360 shared_share_hashes.add(share.hash)
363 for peer in p2p_node.peers.itervalues():
364 peer.sendShares([share for share in shares if share.peer is not peer])
366 current_work.changed.watch(work_changed)
369 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
371 if share.hash in tracker.verified.shares:
372 ss.add_verified_hash(share.hash)
373 task.LoopingCall(save_shares).start(60)
378 @defer.inlineCallbacks
382 is_lan, lan_ip = yield ipdiscover.get_local_ip()
384 pm = yield portmapper.get_port_mapper()
385 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
386 except defer.TimeoutError:
390 log.err(None, "UPnP error:")
391 yield deferral.sleep(random.expovariate(1/120))
396 # start listening for workers with a JSON-RPC server
398 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
400 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
401 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
402 vip_pass = f.read().strip('\r\n')
404 vip_pass = '%016x' % (random.randrange(2**64),)
405 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
407 print ' Worker password:', vip_pass, '(only required for generating graphs)'
411 removed_unstales_var = variable.Variable((0, 0, 0))
412 removed_doa_unstales_var = variable.Variable(0)
413 @tracker.verified.removed.watch
415 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
416 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
417 removed_unstales_var.set((
418 removed_unstales_var.value[0] + 1,
419 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
420 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
422 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
423 removed_doa_unstales.set(removed_doa_unstales.value + 1)
425 def get_stale_counts():
426 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
427 my_shares = len(my_share_hashes)
428 my_doa_shares = len(my_doa_share_hashes)
429 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
430 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
431 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
432 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
433 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
435 my_shares_not_in_chain = my_shares - my_shares_in_chain
436 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
438 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
441 local_rate_monitor = math.RateMonitor(10*60)
443 class WorkerBridge(worker_interface.WorkerBridge):
445 worker_interface.WorkerBridge.__init__(self)
446 self.new_work_event = current_work.changed
447 self.recent_shares_ts_work = []
449 def preprocess_request(self, request):
450 user = request.getUser() if request.getUser() is not None else ''
451 pubkey_hash = my_pubkey_hash
452 max_target = 2**256 - 1
454 user, min_diff_str = user.rsplit('/', 1)
456 max_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
460 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
463 if random.uniform(0, 100) < args.worker_fee:
464 pubkey_hash = my_pubkey_hash
465 return pubkey_hash, max_target
467 def get_work(self, pubkey_hash, max_target):
468 if len(p2p_node.peers) == 0 and net.PERSIST:
469 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
470 if current_work.value['best_share_hash'] is None and net.PERSIST:
471 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
472 if time.time() > current_work2.value['last_update'] + 60:
473 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
475 if current_work.value['mm_chains']:
476 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
477 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
478 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
479 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
483 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
488 new = time.time() > net.SWITCH_TIME
491 share_info, generate_tx = p2pool_data.new_generate_transaction(
494 previous_share_hash=current_work.value['best_share_hash'],
495 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
496 nonce=random.randrange(2**32),
497 pubkey_hash=pubkey_hash,
498 subsidy=current_work2.value['subsidy'],
499 donation=math.perfect_round(65535*args.donation_percentage/100),
500 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
501 253 if orphans > orphans_recorded_in_chain else
502 254 if doas > doas_recorded_in_chain else
504 )(*get_stale_counts()),
506 block_target=current_work.value['bits'].target,
507 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
508 desired_target=max_target,
512 share_info, generate_tx = p2pool_data.generate_transaction(
515 previous_share_hash=current_work.value['best_share_hash'],
516 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
517 nonce=struct.pack('<Q', random.randrange(2**64)),
518 new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
519 subsidy=current_work2.value['subsidy'],
520 donation=math.perfect_round(65535*args.donation_percentage/100),
521 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
522 253 if orphans > orphans_recorded_in_chain else
523 254 if doas > doas_recorded_in_chain else
525 )(*get_stale_counts()),
527 block_target=current_work.value['bits'].target,
528 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
532 target = net.PARENT.SANE_MAX_TARGET
533 if len(self.recent_shares_ts_work) == 50:
534 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
535 target = min(target, 2**256//(hash_rate * 5))
536 target = max(target, share_info['bits'].target)
537 for aux_work in current_work.value['mm_chains'].itervalues():
538 target = max(target, aux_work['target'])
540 transactions = [generate_tx] + list(current_work2.value['transactions'])
541 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
542 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
544 getwork_time = time.time()
545 merkle_branch = current_work2.value['merkle_branch']
547 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
548 bitcoin_data.target_to_difficulty(target),
549 bitcoin_data.target_to_difficulty(share_info['bits'].target),
550 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
551 len(current_work2.value['transactions']),
554 ba = bitcoin_getwork.BlockAttempt(
555 version=current_work.value['version'],
556 previous_block=current_work.value['previous_block'],
557 merkle_root=merkle_root,
558 timestamp=current_work2.value['time'],
559 bits=current_work.value['bits'],
563 received_header_hashes = set()
565 def got_response(header, request):
566 assert header['merkle_root'] == merkle_root
568 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
569 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
570 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
573 if pow_hash <= header['bits'].target or p2pool.DEBUG:
574 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
576 if factory.conn.value is None:
577 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
578 raise deferral.RetrySilentlyException()
579 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
581 if pow_hash <= header['bits'].target:
583 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
585 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
587 log.err(None, 'Error while processing potential block:')
589 for aux_work, index, hashes in mm_later:
591 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
592 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
593 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
594 bitcoin_data.aux_pow_type.pack(dict(
597 block_hash=header_hash,
598 merkle_branch=merkle_branch,
601 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
603 parent_block_header=header,
608 if result != (pow_hash <= aux_work['target']):
609 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
611 print 'Merged block submittal result: %s' % (result,)
614 log.err(err, 'Error submitting merged block:')
616 log.err(None, 'Error while processing merged mining POW:')
618 if pow_hash <= share_info['bits'].target:
620 min_header = dict(header);del min_header['merkle_root']
621 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
622 share = p2pool_data.NewShare(net, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
624 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
625 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
627 p2pool_data.format_hash(share.hash),
628 p2pool_data.format_hash(share.previous_hash),
629 time.time() - getwork_time,
630 ' DEAD ON ARRIVAL' if not on_time else '',
632 my_share_hashes.add(share.hash)
634 my_doa_share_hashes.add(share.hash)
638 tracker.verified.add(share)
642 if pow_hash <= header['bits'].target or p2pool.DEBUG:
643 for peer in p2p_node.peers.itervalues():
644 peer.sendShares([share])
645 shared_share_hashes.add(share.hash)
647 log.err(None, 'Error forwarding block solution:')
649 if pow_hash <= target and header_hash not in received_header_hashes:
650 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
651 if request.getPassword() == vip_pass:
652 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
653 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
654 while len(self.recent_shares_ts_work) > 50:
655 self.recent_shares_ts_work.pop(0)
656 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
658 if header_hash in received_header_hashes:
659 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
660 received_header_hashes.add(header_hash)
662 if pow_hash > target:
663 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
664 print ' Hash: %56x' % (pow_hash,)
665 print ' Target: %56x' % (target,)
669 return ba, got_response
671 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
673 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks)
674 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
676 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
678 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
685 @defer.inlineCallbacks
688 flag = factory.new_block.get_deferred()
690 yield set_real_work1()
693 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
698 print 'Started successfully!'
702 if hasattr(signal, 'SIGALRM'):
703 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
704 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
706 signal.siginterrupt(signal.SIGALRM, False)
707 task.LoopingCall(signal.alarm, 30).start(1)
709 if args.irc_announce:
710 from twisted.words.protocols import irc
711 class IRCClient(irc.IRCClient):
712 nickname = 'p2pool%02i' % (random.randrange(100),)
713 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
714 def lineReceived(self, line):
716 irc.IRCClient.lineReceived(self, line)
718 irc.IRCClient.signedOn(self)
719 self.factory.resetDelay()
720 self.join(self.channel)
721 self.watch_id = tracker.verified.added.watch(self._new_share)
722 self.announced_hashes = set()
723 self.delayed_messages = {}
724 def privmsg(self, user, channel, message):
725 if channel == self.channel and message in self.delayed_messages:
726 self.delayed_messages.pop(message).cancel()
727 def _new_share(self, share):
728 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
729 self.announced_hashes.add(share.header_hash)
730 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
731 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
732 def connectionLost(self, reason):
733 tracker.verified.added.unwatch(self.watch_id)
734 print 'IRC connection lost:', reason.getErrorMessage()
735 class IRCClientFactory(protocol.ReconnectingClientFactory):
737 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
739 @defer.inlineCallbacks
744 yield deferral.sleep(3)
746 if time.time() > current_work2.value['last_update'] + 60:
747 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
749 height = tracker.get_height(current_work.value['best_share_hash'])
750 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
752 len(tracker.verified.shares),
755 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
756 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
758 datums, dt = local_rate_monitor.get_datums_in_last()
759 my_att_s = sum(datum['work']/dt for datum in datums)
760 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
761 math.format(int(my_att_s)),
763 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
764 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
768 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
769 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
770 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
772 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
773 shares, stale_orphan_shares, stale_doa_shares,
774 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
775 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
776 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
778 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
779 math.format(int(real_att_s)),
781 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
784 if this_str != last_str or time.time() > last_time + 15:
787 last_time = time.time()
792 log.err(None, 'Fatal error:')
796 class FixedArgumentParser(argparse.ArgumentParser):
797 def _read_args_from_files(self, arg_strings):
798 # expand arguments referencing files
800 for arg_string in arg_strings:
802 # for regular arguments, just add them back into the list
803 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
804 new_arg_strings.append(arg_string)
806 # replace arguments referencing files with the file content
809 args_file = open(arg_string[1:])
812 for arg_line in args_file.read().splitlines():
813 for arg in self.convert_arg_line_to_args(arg_line):
814 arg_strings.append(arg)
815 arg_strings = self._read_args_from_files(arg_strings)
816 new_arg_strings.extend(arg_strings)
820 err = sys.exc_info()[1]
823 # return the modified argument list
824 return new_arg_strings
826 def convert_arg_line_to_args(self, arg_line):
827 return [arg for arg in arg_line.split() if arg.strip()]
830 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
832 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
833 parser.add_argument('--version', action='version', version=p2pool.__version__)
834 parser.add_argument('--net',
835 help='use specified network (default: bitcoin)',
836 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
837 parser.add_argument('--testnet',
838 help='''use the network's testnet''',
839 action='store_const', const=True, default=False, dest='testnet')
840 parser.add_argument('--debug',
841 help='enable debugging mode',
842 action='store_const', const=True, default=False, dest='debug')
843 parser.add_argument('-a', '--address',
844 help='generate payouts to this address (default: <address requested from bitcoind>)',
845 type=str, action='store', default=None, dest='address')
846 parser.add_argument('--datadir',
847 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
848 type=str, action='store', default=None, dest='datadir')
849 parser.add_argument('--logfile',
850 help='''log to this file (default: data/<NET>/log)''',
851 type=str, action='store', default=None, dest='logfile')
852 parser.add_argument('--merged',
853 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
854 type=str, action='append', default=[], dest='merged_urls')
855 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
856 help='donate this percentage of work to author of p2pool (default: 0.5)',
857 type=float, action='store', default=0.5, dest='donation_percentage')
858 parser.add_argument('--irc-announce',
859 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
860 action='store_true', default=False, dest='irc_announce')
862 p2pool_group = parser.add_argument_group('p2pool interface')
863 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
864 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
865 type=int, action='store', default=None, dest='p2pool_port')
866 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
867 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
868 type=str, action='append', default=[], dest='p2pool_nodes')
869 parser.add_argument('--disable-upnp',
870 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
871 action='store_false', default=True, dest='upnp')
873 worker_group = parser.add_argument_group('worker interface')
874 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
875 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
876 type=str, action='store', default=None, dest='worker_endpoint')
877 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
878 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
879 type=float, action='store', default=0, dest='worker_fee')
881 bitcoind_group = parser.add_argument_group('bitcoind interface')
882 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
883 help='connect to this address (default: 127.0.0.1)',
884 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
885 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
886 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
887 type=int, action='store', default=None, dest='bitcoind_rpc_port')
888 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
889 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
890 type=int, action='store', default=None, dest='bitcoind_p2p_port')
892 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
893 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
894 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
896 args = parser.parse_args()
901 net_name = args.net_name + ('_testnet' if args.testnet else '')
902 net = networks.nets[net_name]
904 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
905 if not os.path.exists(datadir_path):
906 os.makedirs(datadir_path)
908 if len(args.bitcoind_rpc_userpass) > 2:
909 parser.error('a maximum of two arguments are allowed')
910 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
912 if args.bitcoind_rpc_password is None:
913 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
914 parser.error('This network has no configuration file function. Manually enter your RPC password.')
915 conf_path = net.PARENT.CONF_FILE_FUNC()
916 if not os.path.exists(conf_path):
917 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
918 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
921 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
922 with open(conf_path, 'rb') as f:
923 cp = ConfigParser.RawConfigParser()
924 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
925 for conf_name, var_name, var_type in [
926 ('rpcuser', 'bitcoind_rpc_username', str),
927 ('rpcpassword', 'bitcoind_rpc_password', str),
928 ('rpcport', 'bitcoind_rpc_port', int),
929 ('port', 'bitcoind_p2p_port', int),
931 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
932 setattr(args, var_name, var_type(cp.get('x', conf_name)))
934 if args.bitcoind_rpc_username is None:
935 args.bitcoind_rpc_username = ''
937 if args.bitcoind_rpc_port is None:
938 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
940 if args.bitcoind_p2p_port is None:
941 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
943 if args.p2pool_port is None:
944 args.p2pool_port = net.P2P_PORT
946 if args.worker_endpoint is None:
947 worker_endpoint = '', net.WORKER_PORT
948 elif ':' not in args.worker_endpoint:
949 worker_endpoint = '', int(args.worker_endpoint)
951 addr, port = args.worker_endpoint.rsplit(':', 1)
952 worker_endpoint = addr, int(port)
954 if args.address is not None:
956 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
958 parser.error('error parsing address: ' + repr(e))
960 args.pubkey_hash = None
962 def separate_url(url):
963 s = urlparse.urlsplit(url)
964 if '@' not in s.netloc:
965 parser.error('merged url netloc must contain an "@"')
966 userpass, new_netloc = s.netloc.rsplit('@', 1)
967 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
968 merged_urls = map(separate_url, args.merged_urls)
970 if args.logfile is None:
971 args.logfile = os.path.join(datadir_path, 'log')
973 logfile = logging.LogFile(args.logfile)
974 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
975 sys.stdout = logging.AbortPipe(pipe)
976 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
977 if hasattr(signal, "SIGUSR1"):
978 def sigusr1(signum, frame):
979 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
981 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
982 signal.signal(signal.SIGUSR1, sigusr1)
983 task.LoopingCall(logfile.reopen).start(5)
985 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)