1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
58 @deferral.retry('Error while checking Bitcoin connection:', 1)
59 @defer.inlineCallbacks
61 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
62 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
63 raise deferral.RetrySilentlyException()
64 temp_work = yield getwork(bitcoind)
65 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
66 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
67 raise deferral.RetrySilentlyException()
68 defer.returnValue(temp_work)
69 temp_work = yield check()
71 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
74 # connect to bitcoind over bitcoin-p2p
75 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
76 factory = bitcoin_p2p.ClientFactory(net.PARENT)
77 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
78 yield factory.getProtocol() # waits until handshake is successful
82 print 'Determining payout address...'
83 if args.pubkey_hash is None:
84 address_path = os.path.join(datadir_path, 'cached_payout_address')
86 if os.path.exists(address_path):
87 with open(address_path, 'rb') as f:
88 address = f.read().strip('\r\n')
89 print ' Loaded cached address: %s...' % (address,)
93 if address is not None:
94 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
95 if not res['isvalid'] or not res['ismine']:
96 print ' Cached address is either invalid or not controlled by local bitcoind!'
100 print ' Getting payout address from bitcoind...'
101 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
103 with open(address_path, 'wb') as f:
106 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
108 my_pubkey_hash = args.pubkey_hash
109 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
112 my_share_hashes = set()
113 my_doa_share_hashes = set()
115 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
116 shared_share_hashes = set()
117 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
118 known_verified = set()
120 print "Loading shares..."
121 for i, (mode, contents) in enumerate(ss.get_shares()):
123 if contents.hash in tracker.shares:
125 shared_share_hashes.add(contents.hash)
126 contents.time_seen = 0
127 tracker.add(contents)
128 if len(tracker.shares) % 1000 == 0 and tracker.shares:
129 print " %i" % (len(tracker.shares),)
130 elif mode == 'verified_hash':
131 known_verified.add(contents)
133 raise AssertionError()
134 print " ...inserting %i verified shares..." % (len(known_verified),)
135 for h in known_verified:
136 if h not in tracker.shares:
137 ss.forget_verified_share(h)
139 tracker.verified.add(tracker.shares[h])
140 print " ...done loading %i shares!" % (len(tracker.shares),)
142 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
143 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
144 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
146 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
148 pre_current_work = variable.Variable(None)
149 pre_merged_work = variable.Variable({})
150 # information affecting work that should trigger a long-polling update
151 current_work = variable.Variable(None)
152 # information affecting work that should not trigger a long-polling update
153 current_work2 = variable.Variable(None)
155 requested = expiring_dict.ExpiringDict(300)
157 print 'Initializing work...'
158 @defer.inlineCallbacks
159 def set_real_work1():
160 work = yield getwork(bitcoind)
161 current_work2.set(dict(
163 transactions=work['transactions'],
164 merkle_link=work['merkle_link'],
165 subsidy=work['subsidy'],
166 clock_offset=time.time() - work['time'],
167 last_update=time.time(),
168 )) # second set first because everything hooks on the first
169 pre_current_work.set(dict(
170 version=work['version'],
171 previous_block=work['previous_block_hash'],
173 coinbaseflags=work['coinbaseflags'],
175 yield set_real_work1()
177 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
179 def set_real_work2():
180 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
182 t = dict(pre_current_work.value)
183 t['best_share_hash'] = best
184 t['mm_chains'] = pre_merged_work.value
188 for peer2, share_hash in desired:
189 if share_hash not in tracker.tails: # was received in the time tracker.think was running
191 last_request_time, count = requested.get(share_hash, (None, 0))
192 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
194 potential_peers = set()
195 for head in tracker.tails[share_hash]:
196 potential_peers.update(peer_heads.get(head, set()))
197 potential_peers = [peer for peer in potential_peers if peer.connected2]
198 if count == 0 and peer2 is not None and peer2.connected2:
201 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
205 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
209 stops=list(set(tracker.heads) | set(
210 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
213 requested[share_hash] = t, count + 1
214 pre_current_work.changed.watch(lambda _: set_real_work2())
215 pre_merged_work.changed.watch(lambda _: set_real_work2())
221 @defer.inlineCallbacks
222 def set_merged_work(merged_url, merged_userpass):
223 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
225 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
226 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
227 hash=int(auxblock['hash'], 16),
228 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
229 merged_proxy=merged_proxy,
231 yield deferral.sleep(1)
232 for merged_url, merged_userpass in merged_urls:
233 set_merged_work(merged_url, merged_userpass)
235 @pre_merged_work.changed.watch
236 def _(new_merged_work):
237 print 'Got new merged mining work!'
239 # setup p2p logic and join p2pool network
241 class Node(p2p.Node):
242 def handle_shares(self, shares, peer):
244 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
248 if share.hash in tracker.shares:
249 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
254 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
258 if shares and peer is not None:
259 peer_heads.setdefault(shares[0].hash, set()).add(peer)
265 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
267 def handle_share_hashes(self, hashes, peer):
270 for share_hash in hashes:
271 if share_hash in tracker.shares:
273 last_request_time, count = requested.get(share_hash, (None, 0))
274 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
276 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
277 get_hashes.append(share_hash)
278 requested[share_hash] = t, count + 1
280 if hashes and peer is not None:
281 peer_heads.setdefault(hashes[0], set()).add(peer)
283 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
285 def handle_get_shares(self, hashes, parents, stops, peer):
286 parents = min(parents, 1000//len(hashes))
289 for share_hash in hashes:
290 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
291 if share.hash in stops:
294 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 @deferral.retry('Error submitting block: (will retry)', 10, 10)
298 @defer.inlineCallbacks
299 def submit_block(block, ignore_failure):
300 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
301 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
302 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
303 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
305 @tracker.verified.added.watch
307 if share.pow_hash <= share.header['bits'].target:
308 submit_block(share.as_block(tracker), ignore_failure=True)
310 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
312 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
314 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
316 @defer.inlineCallbacks
319 ip, port = x.split(':')
320 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
322 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
325 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
327 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
329 print >>sys.stderr, "error reading addrs"
330 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
333 if addr not in addrs:
334 addrs[addr] = (0, time.time(), time.time())
338 connect_addrs = set()
339 for addr_df in map(parse, args.p2pool_nodes):
341 connect_addrs.add((yield addr_df))
346 best_share_hash_func=lambda: current_work.value['best_share_hash'],
347 port=args.p2pool_port,
350 connect_addrs=connect_addrs,
351 max_incoming_conns=args.p2pool_conns,
355 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
357 # send share when the chain changes to their chain
358 def work_changed(new_work):
359 #print 'Work changed:', new_work
361 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
362 if share.hash in shared_share_hashes:
364 shared_share_hashes.add(share.hash)
367 for peer in p2p_node.peers.itervalues():
368 peer.sendShares([share for share in shares if share.peer is not peer])
370 current_work.changed.watch(work_changed)
373 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
375 if share.hash in tracker.verified.shares:
376 ss.add_verified_hash(share.hash)
377 task.LoopingCall(save_shares).start(60)
383 @defer.inlineCallbacks
387 is_lan, lan_ip = yield ipdiscover.get_local_ip()
389 pm = yield portmapper.get_port_mapper()
390 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
391 except defer.TimeoutError:
395 log.err(None, 'UPnP error:')
396 yield deferral.sleep(random.expovariate(1/120))
399 # start listening for workers with a JSON-RPC server
401 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
403 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
404 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
405 vip_pass = f.read().strip('\r\n')
407 vip_pass = '%016x' % (random.randrange(2**64),)
408 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
413 removed_unstales_var = variable.Variable((0, 0, 0))
414 removed_doa_unstales_var = variable.Variable(0)
415 @tracker.verified.removed.watch
417 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
418 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
419 removed_unstales_var.set((
420 removed_unstales_var.value[0] + 1,
421 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
422 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
424 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
425 removed_doa_unstales.set(removed_doa_unstales.value + 1)
427 def get_stale_counts():
428 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
429 my_shares = len(my_share_hashes)
430 my_doa_shares = len(my_doa_share_hashes)
431 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
432 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
433 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
434 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
435 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
437 my_shares_not_in_chain = my_shares - my_shares_in_chain
438 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
440 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
443 pseudoshare_received = variable.Event()
444 share_received = variable.Event()
445 local_rate_monitor = math.RateMonitor(10*60)
447 class WorkerBridge(worker_interface.WorkerBridge):
449 worker_interface.WorkerBridge.__init__(self)
450 self.new_work_event = current_work.changed
451 self.recent_shares_ts_work = []
453 def preprocess_request(self, request):
454 user = request.getUser() if request.getUser() is not None else ''
456 desired_pseudoshare_target = None
458 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
460 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
464 desired_share_target = 2**256 - 1
466 user, min_diff_str = user.rsplit('/', 1)
468 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
472 if random.uniform(0, 100) < args.worker_fee:
473 pubkey_hash = my_pubkey_hash
476 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
478 pubkey_hash = my_pubkey_hash
480 return pubkey_hash, desired_share_target, desired_pseudoshare_target
482 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
483 if len(p2p_node.peers) == 0 and net.PERSIST:
484 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
485 if current_work.value['best_share_hash'] is None and net.PERSIST:
486 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
487 if time.time() > current_work2.value['last_update'] + 60:
488 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
490 if current_work.value['mm_chains']:
491 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
492 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
493 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
494 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
498 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
503 def predict_timestamp():
504 desired_timestamp = int(time.time() - current_work2.value['clock_offset'])
505 previous_share = tracker.shares[current_work.value['best_share_hash']] if current_work.value['best_share_hash'] is not None else None
506 return math.clip(desired_timestamp, (
507 (previous_share.timestamp + net.SHARE_PERIOD) - (net.SHARE_PERIOD - 1), # = previous_share.timestamp + 1
508 (previous_share.timestamp + net.SHARE_PERIOD) + (net.SHARE_PERIOD - 1),
509 )) if previous_share is not None else desired_timestamp
510 new = predict_timestamp() >= net.SWITCH_TIME
512 share_info, generate_tx = p2pool_data.NewShare.generate_transaction(
515 previous_share_hash=current_work.value['best_share_hash'],
516 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
517 nonce=random.randrange(2**32),
518 pubkey_hash=pubkey_hash,
519 subsidy=current_work2.value['subsidy'],
520 donation=math.perfect_round(65535*args.donation_percentage/100),
521 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
522 253 if orphans > orphans_recorded_in_chain else
523 254 if doas > doas_recorded_in_chain else
525 )(*get_stale_counts()),
528 block_target=current_work.value['bits'].target,
529 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
530 desired_target=desired_share_target,
531 ref_merkle_link=dict(branch=[], index=0),
535 share_info, generate_tx = p2pool_data.Share.generate_transaction(
538 previous_share_hash=current_work.value['best_share_hash'],
539 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
540 nonce=random.randrange(2**31),
541 pubkey_hash=pubkey_hash,
542 subsidy=current_work2.value['subsidy'],
543 donation=math.perfect_round(65535*args.donation_percentage/100),
544 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
545 253 if orphans > orphans_recorded_in_chain else
546 254 if doas > doas_recorded_in_chain else
548 )(*get_stale_counts()),
550 block_target=current_work.value['bits'].target,
551 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
552 desired_target=desired_share_target,
556 target = net.PARENT.SANE_MAX_TARGET
557 if desired_pseudoshare_target is None:
558 if len(self.recent_shares_ts_work) == 50:
559 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
560 target = min(target, 2**256//hash_rate)
562 target = min(target, desired_pseudoshare_target)
563 target = max(target, share_info['bits'].target)
564 for aux_work in current_work.value['mm_chains'].itervalues():
565 target = max(target, aux_work['target'])
567 transactions = [generate_tx] + list(current_work2.value['transactions'])
568 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
569 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
571 getwork_time = time.time()
572 merkle_link = current_work2.value['merkle_link']
574 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
575 bitcoin_data.target_to_difficulty(target),
576 bitcoin_data.target_to_difficulty(share_info['bits'].target),
577 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
578 len(current_work2.value['transactions']),
581 ba = bitcoin_getwork.BlockAttempt(
582 version=current_work.value['version'],
583 previous_block=current_work.value['previous_block'],
584 merkle_root=merkle_root,
585 timestamp=current_work2.value['time'],
586 bits=current_work.value['bits'],
590 received_header_hashes = set()
592 def got_response(header, request):
593 assert header['merkle_root'] == merkle_root
595 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
596 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
597 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
600 if pow_hash <= header['bits'].target or p2pool.DEBUG:
601 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
602 if pow_hash <= header['bits'].target:
604 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
606 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
608 log.err(None, 'Error while processing potential block:')
610 for aux_work, index, hashes in mm_later:
612 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
613 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
614 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
615 bitcoin_data.aux_pow_type.pack(dict(
618 block_hash=header_hash,
619 merkle_link=merkle_link,
621 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
622 parent_block_header=header,
627 if result != (pow_hash <= aux_work['target']):
628 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
630 print 'Merged block submittal result: %s' % (result,)
633 log.err(err, 'Error submitting merged block:')
635 log.err(None, 'Error while processing merged mining POW:')
637 if pow_hash <= share_info['bits'].target:
638 min_header = dict(header);del min_header['merkle_root']
639 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
641 share = p2pool_data.NewShare(net, None, dict(
642 min_header=min_header, share_info=share_info, hash_link=hash_link,
643 ref_merkle_link=dict(branch=[], index=0),
644 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
646 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
648 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
650 p2pool_data.format_hash(share.hash),
651 p2pool_data.format_hash(share.previous_hash),
652 time.time() - getwork_time,
653 ' DEAD ON ARRIVAL' if not on_time else '',
655 my_share_hashes.add(share.hash)
657 my_doa_share_hashes.add(share.hash)
661 tracker.verified.add(share)
665 if pow_hash <= header['bits'].target or p2pool.DEBUG:
666 for peer in p2p_node.peers.itervalues():
667 peer.sendShares([share])
668 shared_share_hashes.add(share.hash)
670 log.err(None, 'Error forwarding block solution:')
672 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
674 if pow_hash > target:
675 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
676 print ' Hash: %56x' % (pow_hash,)
677 print ' Target: %56x' % (target,)
678 elif header_hash in received_header_hashes:
679 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
681 received_header_hashes.add(header_hash)
683 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser(), request.getPassword() == vip_pass)
684 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
685 while len(self.recent_shares_ts_work) > 50:
686 self.recent_shares_ts_work.pop(0)
687 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
691 return ba, got_response
693 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
695 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received, share_received)
696 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
698 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
700 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
707 @defer.inlineCallbacks
710 flag = factory.new_block.get_deferred()
712 yield set_real_work1()
715 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
720 print 'Started successfully!'
721 print 'Go to http://127.0.0.1:%i/static to view graphs and statistics.' % (worker_endpoint[1],)
722 if args.donation_percentage > 0.51:
723 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
724 elif args.donation_percentage < 0.49:
725 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
727 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
728 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
732 if hasattr(signal, 'SIGALRM'):
733 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
734 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
736 signal.siginterrupt(signal.SIGALRM, False)
737 task.LoopingCall(signal.alarm, 30).start(1)
739 if args.irc_announce:
740 from twisted.words.protocols import irc
741 class IRCClient(irc.IRCClient):
742 nickname = 'p2pool%02i' % (random.randrange(100),)
743 channel = net.ANNOUNCE_CHANNEL
744 def lineReceived(self, line):
746 irc.IRCClient.lineReceived(self, line)
748 irc.IRCClient.signedOn(self)
749 self.factory.resetDelay()
750 self.join(self.channel)
751 @defer.inlineCallbacks
752 def new_share(share):
753 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
754 yield deferral.sleep(random.expovariate(1/60))
755 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
756 if message not in self.recent_messages:
757 self.say(self.channel, message)
758 self._remember_message(message)
759 self.watch_id = tracker.verified.added.watch(new_share)
760 self.recent_messages = []
761 def _remember_message(self, message):
762 self.recent_messages.append(message)
763 while len(self.recent_message) > 100:
764 self.recent_messages.pop(0)
765 def privmsg(self, user, channel, message):
766 if channel == self.channel:
767 self._remember_message(message)
768 def connectionLost(self, reason):
769 tracker.verified.added.unwatch(self.watch_id)
770 print 'IRC connection lost:', reason.getErrorMessage()
771 class IRCClientFactory(protocol.ReconnectingClientFactory):
773 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
775 @defer.inlineCallbacks
780 yield deferral.sleep(3)
782 if time.time() > current_work2.value['last_update'] + 60:
783 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
785 height = tracker.get_height(current_work.value['best_share_hash'])
786 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
788 len(tracker.verified.shares),
791 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
792 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
794 datums, dt = local_rate_monitor.get_datums_in_last()
795 my_att_s = sum(datum['work']/dt for datum in datums)
796 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
797 math.format(int(my_att_s)),
799 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
800 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
804 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
805 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
806 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
808 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
809 shares, stale_orphan_shares, stale_doa_shares,
810 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
811 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
812 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
814 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
815 math.format(int(real_att_s)),
817 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
820 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
821 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
822 if majority_desired_version not in [0, 1]:
823 print >>sys.stderr, '#'*40
824 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
825 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
826 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
827 print >>sys.stderr, '#'*40
829 if this_str != last_str or time.time() > last_time + 15:
832 last_time = time.time()
838 log.err(None, 'Fatal error:')
841 class FixedArgumentParser(argparse.ArgumentParser):
842 def _read_args_from_files(self, arg_strings):
843 # expand arguments referencing files
845 for arg_string in arg_strings:
847 # for regular arguments, just add them back into the list
848 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
849 new_arg_strings.append(arg_string)
851 # replace arguments referencing files with the file content
854 args_file = open(arg_string[1:])
857 for arg_line in args_file.read().splitlines():
858 for arg in self.convert_arg_line_to_args(arg_line):
859 arg_strings.append(arg)
860 arg_strings = self._read_args_from_files(arg_strings)
861 new_arg_strings.extend(arg_strings)
865 err = sys.exc_info()[1]
868 # return the modified argument list
869 return new_arg_strings
871 def convert_arg_line_to_args(self, arg_line):
872 return [arg for arg in arg_line.split() if arg.strip()]
875 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
877 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
878 parser.add_argument('--version', action='version', version=p2pool.__version__)
879 parser.add_argument('--net',
880 help='use specified network (default: bitcoin)',
881 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
882 parser.add_argument('--testnet',
883 help='''use the network's testnet''',
884 action='store_const', const=True, default=False, dest='testnet')
885 parser.add_argument('--debug',
886 help='enable debugging mode',
887 action='store_const', const=True, default=False, dest='debug')
888 parser.add_argument('-a', '--address',
889 help='generate payouts to this address (default: <address requested from bitcoind>)',
890 type=str, action='store', default=None, dest='address')
891 parser.add_argument('--datadir',
892 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
893 type=str, action='store', default=None, dest='datadir')
894 parser.add_argument('--logfile',
895 help='''log to this file (default: data/<NET>/log)''',
896 type=str, action='store', default=None, dest='logfile')
897 parser.add_argument('--merged',
898 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
899 type=str, action='append', default=[], dest='merged_urls')
900 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
901 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
902 type=float, action='store', default=0.5, dest='donation_percentage')
903 parser.add_argument('--irc-announce',
904 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
905 action='store_true', default=False, dest='irc_announce')
907 p2pool_group = parser.add_argument_group('p2pool interface')
908 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
909 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
910 type=int, action='store', default=None, dest='p2pool_port')
911 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
912 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
913 type=str, action='append', default=[], dest='p2pool_nodes')
914 parser.add_argument('--disable-upnp',
915 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
916 action='store_false', default=True, dest='upnp')
917 p2pool_group.add_argument('--max-conns', metavar='CONNS',
918 help='maximum incoming connections (default: 40)',
919 type=int, action='store', default=40, dest='p2pool_conns')
921 worker_group = parser.add_argument_group('worker interface')
922 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
923 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
924 type=str, action='store', default=None, dest='worker_endpoint')
925 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
926 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
927 type=float, action='store', default=0, dest='worker_fee')
929 bitcoind_group = parser.add_argument_group('bitcoind interface')
930 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
931 help='connect to this address (default: 127.0.0.1)',
932 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
933 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
934 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
935 type=int, action='store', default=None, dest='bitcoind_rpc_port')
936 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
937 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
938 type=int, action='store', default=None, dest='bitcoind_p2p_port')
940 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
941 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
942 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
944 args = parser.parse_args()
949 net_name = args.net_name + ('_testnet' if args.testnet else '')
950 net = networks.nets[net_name]
952 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
953 if not os.path.exists(datadir_path):
954 os.makedirs(datadir_path)
956 if len(args.bitcoind_rpc_userpass) > 2:
957 parser.error('a maximum of two arguments are allowed')
958 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
960 if args.bitcoind_rpc_password is None:
961 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
962 parser.error('This network has no configuration file function. Manually enter your RPC password.')
963 conf_path = net.PARENT.CONF_FILE_FUNC()
964 if not os.path.exists(conf_path):
965 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
966 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
969 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
970 with open(conf_path, 'rb') as f:
971 cp = ConfigParser.RawConfigParser()
972 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
973 for conf_name, var_name, var_type in [
974 ('rpcuser', 'bitcoind_rpc_username', str),
975 ('rpcpassword', 'bitcoind_rpc_password', str),
976 ('rpcport', 'bitcoind_rpc_port', int),
977 ('port', 'bitcoind_p2p_port', int),
979 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
980 setattr(args, var_name, var_type(cp.get('x', conf_name)))
981 if args.bitcoind_rpc_password is None:
982 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
984 if args.bitcoind_rpc_username is None:
985 args.bitcoind_rpc_username = ''
987 if args.bitcoind_rpc_port is None:
988 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
990 if args.bitcoind_p2p_port is None:
991 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
993 if args.p2pool_port is None:
994 args.p2pool_port = net.P2P_PORT
996 if args.worker_endpoint is None:
997 worker_endpoint = '', net.WORKER_PORT
998 elif ':' not in args.worker_endpoint:
999 worker_endpoint = '', int(args.worker_endpoint)
1001 addr, port = args.worker_endpoint.rsplit(':', 1)
1002 worker_endpoint = addr, int(port)
1004 if args.address is not None:
1006 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1007 except Exception, e:
1008 parser.error('error parsing address: ' + repr(e))
1010 args.pubkey_hash = None
1012 def separate_url(url):
1013 s = urlparse.urlsplit(url)
1014 if '@' not in s.netloc:
1015 parser.error('merged url netloc must contain an "@"')
1016 userpass, new_netloc = s.netloc.rsplit('@', 1)
1017 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1018 merged_urls = map(separate_url, args.merged_urls)
1020 if args.logfile is None:
1021 args.logfile = os.path.join(datadir_path, 'log')
1023 logfile = logging.LogFile(args.logfile)
1024 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1025 sys.stdout = logging.AbortPipe(pipe)
1026 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1027 if hasattr(signal, "SIGUSR1"):
1028 def sigusr1(signum, frame):
1029 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1031 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1032 signal.signal(signal.SIGUSR1, sigusr1)
1033 task.LoopingCall(logfile.reopen).start(5)
1035 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)