1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface, height_tracker
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
172 def set_real_work2():
173 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
175 t = dict(pre_current_work.value)
176 t['best_share_hash'] = best
177 t['mm_chains'] = pre_merged_work.value
181 for peer2, share_hash in desired:
182 if share_hash not in tracker.tails: # was received in the time tracker.think was running
184 last_request_time, count = requested.get(share_hash, (None, 0))
185 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
187 potential_peers = set()
188 for head in tracker.tails[share_hash]:
189 potential_peers.update(peer_heads.get(head, set()))
190 potential_peers = [peer for peer in potential_peers if peer.connected2]
191 if count == 0 and peer2 is not None and peer2.connected2:
194 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
198 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
202 stops=list(set(tracker.heads) | set(
203 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
206 requested[share_hash] = t, count + 1
207 pre_current_work.changed.watch(lambda _: set_real_work2())
208 pre_merged_work.changed.watch(lambda _: set_real_work2())
214 @defer.inlineCallbacks
215 def set_merged_work(merged_url, merged_userpass):
216 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
218 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
219 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
220 hash=int(auxblock['hash'], 16),
221 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
222 merged_proxy=merged_proxy,
224 yield deferral.sleep(1)
225 for merged_url, merged_userpass in merged_urls:
226 set_merged_work(merged_url, merged_userpass)
228 @pre_merged_work.changed.watch
229 def _(new_merged_work):
230 print 'Got new merged mining work!'
232 # setup p2p logic and join p2pool network
234 class Node(p2p.Node):
235 def handle_shares(self, shares, peer):
237 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
241 if share.hash in tracker.shares:
242 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
247 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
251 if shares and peer is not None:
252 peer_heads.setdefault(shares[0].hash, set()).add(peer)
258 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
260 def handle_share_hashes(self, hashes, peer):
263 for share_hash in hashes:
264 if share_hash in tracker.shares:
266 last_request_time, count = requested.get(share_hash, (None, 0))
267 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
269 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
270 get_hashes.append(share_hash)
271 requested[share_hash] = t, count + 1
273 if hashes and peer is not None:
274 peer_heads.setdefault(hashes[0], set()).add(peer)
276 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
278 def handle_get_shares(self, hashes, parents, stops, peer):
279 parents = min(parents, 1000//len(hashes))
282 for share_hash in hashes:
283 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284 if share.hash in stops:
287 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288 peer.sendShares(shares)
290 @deferral.retry('Error submitting block: (will retry)', 10, 10)
291 @defer.inlineCallbacks
292 def submit_block(block, ignore_failure):
293 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
294 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
295 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
296 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
298 @tracker.verified.added.watch
300 if share.pow_hash <= share.header['bits'].target:
301 submit_block(share.as_block(tracker), ignore_failure=True)
303 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
305 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
307 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
309 @defer.inlineCallbacks
312 ip, port = x.split(':')
313 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
315 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
318 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
320 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
322 print >>sys.stderr, "error reading addrs"
323 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
326 if addr not in addrs:
327 addrs[addr] = (0, time.time(), time.time())
331 connect_addrs = set()
332 for addr_df in map(parse, args.p2pool_nodes):
334 connect_addrs.add((yield addr_df))
339 best_share_hash_func=lambda: current_work.value['best_share_hash'],
340 port=args.p2pool_port,
343 connect_addrs=connect_addrs,
344 max_incoming_conns=args.p2pool_conns,
348 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
350 # send share when the chain changes to their chain
351 def work_changed(new_work):
352 #print 'Work changed:', new_work
354 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
355 if share.hash in shared_share_hashes:
357 shared_share_hashes.add(share.hash)
360 for peer in p2p_node.peers.itervalues():
361 peer.sendShares([share for share in shares if share.peer is not peer])
363 current_work.changed.watch(work_changed)
366 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
368 if share.hash in tracker.verified.shares:
369 ss.add_verified_hash(share.hash)
370 task.LoopingCall(save_shares).start(60)
376 @defer.inlineCallbacks
380 is_lan, lan_ip = yield ipdiscover.get_local_ip()
382 pm = yield portmapper.get_port_mapper()
383 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
384 except defer.TimeoutError:
388 log.err(None, 'UPnP error:')
389 yield deferral.sleep(random.expovariate(1/120))
392 # start listening for workers with a JSON-RPC server
394 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
396 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
397 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
398 vip_pass = f.read().strip('\r\n')
400 vip_pass = '%016x' % (random.randrange(2**64),)
401 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
403 print ' Worker password:', vip_pass, '(only required for generating graphs)'
407 removed_unstales_var = variable.Variable((0, 0, 0))
408 removed_doa_unstales_var = variable.Variable(0)
409 @tracker.verified.removed.watch
411 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
412 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
413 removed_unstales_var.set((
414 removed_unstales_var.value[0] + 1,
415 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
416 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
418 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
419 removed_doa_unstales.set(removed_doa_unstales.value + 1)
421 def get_stale_counts():
422 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
423 my_shares = len(my_share_hashes)
424 my_doa_shares = len(my_doa_share_hashes)
425 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
426 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
427 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
428 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
429 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
431 my_shares_not_in_chain = my_shares - my_shares_in_chain
432 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
434 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
437 pseudoshare_received = variable.Event()
438 local_rate_monitor = math.RateMonitor(10*60)
440 class WorkerBridge(worker_interface.WorkerBridge):
442 worker_interface.WorkerBridge.__init__(self)
443 self.new_work_event = current_work.changed
444 self.recent_shares_ts_work = []
446 def preprocess_request(self, request):
447 user = request.getUser() if request.getUser() is not None else ''
449 desired_pseudoshare_target = None
451 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
453 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
457 desired_share_target = 2**256 - 1
459 user, min_diff_str = user.rsplit('/', 1)
461 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
465 if random.uniform(0, 100) < args.worker_fee:
466 pubkey_hash = my_pubkey_hash
469 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
471 pubkey_hash = my_pubkey_hash
473 return pubkey_hash, desired_share_target, desired_pseudoshare_target
475 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
476 if len(p2p_node.peers) == 0 and net.PERSIST:
477 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
478 if current_work.value['best_share_hash'] is None and net.PERSIST:
479 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
480 if time.time() > current_work2.value['last_update'] + 60:
481 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
483 if current_work.value['mm_chains']:
484 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
485 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
486 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
487 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
491 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
496 share_info, generate_tx = p2pool_data.Share.generate_transaction(
499 previous_share_hash=current_work.value['best_share_hash'],
500 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
501 nonce=random.randrange(2**32),
502 pubkey_hash=pubkey_hash,
503 subsidy=current_work2.value['subsidy'],
504 donation=math.perfect_round(65535*args.donation_percentage/100),
505 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
506 253 if orphans > orphans_recorded_in_chain else
507 254 if doas > doas_recorded_in_chain else
509 )(*get_stale_counts()),
511 block_target=current_work.value['bits'].target,
512 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
513 desired_target=desired_share_target,
517 target = net.PARENT.SANE_MAX_TARGET
518 if desired_pseudoshare_target is None:
519 if len(self.recent_shares_ts_work) == 50:
520 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
521 target = min(target, 2**256//hash_rate)
523 target = min(target, desired_pseudoshare_target)
524 target = max(target, share_info['bits'].target)
525 for aux_work in current_work.value['mm_chains'].itervalues():
526 target = max(target, aux_work['target'])
528 transactions = [generate_tx] + list(current_work2.value['transactions'])
529 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
530 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
532 getwork_time = time.time()
533 merkle_branch = current_work2.value['merkle_branch']
535 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
536 bitcoin_data.target_to_difficulty(target),
537 bitcoin_data.target_to_difficulty(share_info['bits'].target),
538 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
539 len(current_work2.value['transactions']),
542 ba = bitcoin_getwork.BlockAttempt(
543 version=current_work.value['version'],
544 previous_block=current_work.value['previous_block'],
545 merkle_root=merkle_root,
546 timestamp=current_work2.value['time'],
547 bits=current_work.value['bits'],
551 received_header_hashes = set()
553 def got_response(header, request):
554 assert header['merkle_root'] == merkle_root
556 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
557 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
558 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
561 if pow_hash <= header['bits'].target or p2pool.DEBUG:
562 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
563 if pow_hash <= header['bits'].target:
565 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
567 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
569 log.err(None, 'Error while processing potential block:')
571 for aux_work, index, hashes in mm_later:
573 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
574 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
575 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
576 bitcoin_data.aux_pow_type.pack(dict(
579 block_hash=header_hash,
580 merkle_branch=merkle_branch,
583 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
585 parent_block_header=header,
590 if result != (pow_hash <= aux_work['target']):
591 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
593 print 'Merged block submittal result: %s' % (result,)
596 log.err(err, 'Error submitting merged block:')
598 log.err(None, 'Error while processing merged mining POW:')
600 if pow_hash <= share_info['bits'].target:
601 min_header = dict(header);del min_header['merkle_root']
602 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
603 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
605 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
607 p2pool_data.format_hash(share.hash),
608 p2pool_data.format_hash(share.previous_hash),
609 time.time() - getwork_time,
610 ' DEAD ON ARRIVAL' if not on_time else '',
612 my_share_hashes.add(share.hash)
614 my_doa_share_hashes.add(share.hash)
618 tracker.verified.add(share)
622 if pow_hash <= header['bits'].target or p2pool.DEBUG:
623 for peer in p2p_node.peers.itervalues():
624 peer.sendShares([share])
625 shared_share_hashes.add(share.hash)
627 log.err(None, 'Error forwarding block solution:')
629 if pow_hash > target:
630 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
631 print ' Hash: %56x' % (pow_hash,)
632 print ' Target: %56x' % (target,)
633 elif header_hash in received_header_hashes:
634 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
636 received_header_hashes.add(header_hash)
638 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
639 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
640 while len(self.recent_shares_ts_work) > 50:
641 self.recent_shares_ts_work.pop(0)
642 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
646 return ba, got_response
648 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
650 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
651 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
653 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
655 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
662 @defer.inlineCallbacks
665 flag = factory.new_block.get_deferred()
667 yield set_real_work1()
670 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
675 print 'Started successfully!'
679 if hasattr(signal, 'SIGALRM'):
680 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
681 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
683 signal.siginterrupt(signal.SIGALRM, False)
684 task.LoopingCall(signal.alarm, 30).start(1)
686 if args.irc_announce:
687 from twisted.words.protocols import irc
688 class IRCClient(irc.IRCClient):
689 nickname = 'p2pool%02i' % (random.randrange(100),)
690 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
691 def lineReceived(self, line):
693 irc.IRCClient.lineReceived(self, line)
695 irc.IRCClient.signedOn(self)
696 self.factory.resetDelay()
697 self.join(self.channel)
698 self.watch_id = tracker.verified.added.watch(self._new_share)
699 self.announced_hashes = set()
700 self.delayed_messages = {}
701 def privmsg(self, user, channel, message):
702 if channel == self.channel and message in self.delayed_messages:
703 self.delayed_messages.pop(message).cancel()
704 def _new_share(self, share):
705 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
706 self.announced_hashes.add(share.header_hash)
707 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
708 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
709 def connectionLost(self, reason):
710 tracker.verified.added.unwatch(self.watch_id)
711 print 'IRC connection lost:', reason.getErrorMessage()
712 class IRCClientFactory(protocol.ReconnectingClientFactory):
714 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
716 @defer.inlineCallbacks
721 yield deferral.sleep(3)
723 if time.time() > current_work2.value['last_update'] + 60:
724 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
726 height = tracker.get_height(current_work.value['best_share_hash'])
727 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
729 len(tracker.verified.shares),
732 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
733 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
735 datums, dt = local_rate_monitor.get_datums_in_last()
736 my_att_s = sum(datum['work']/dt for datum in datums)
737 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
738 math.format(int(my_att_s)),
740 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
741 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
745 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
746 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
747 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
749 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
750 shares, stale_orphan_shares, stale_doa_shares,
751 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
752 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
753 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
755 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
756 math.format(int(real_att_s)),
758 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
761 if this_str != last_str or time.time() > last_time + 15:
764 last_time = time.time()
769 log.err(None, 'Fatal error:')
773 class FixedArgumentParser(argparse.ArgumentParser):
774 def _read_args_from_files(self, arg_strings):
775 # expand arguments referencing files
777 for arg_string in arg_strings:
779 # for regular arguments, just add them back into the list
780 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
781 new_arg_strings.append(arg_string)
783 # replace arguments referencing files with the file content
786 args_file = open(arg_string[1:])
789 for arg_line in args_file.read().splitlines():
790 for arg in self.convert_arg_line_to_args(arg_line):
791 arg_strings.append(arg)
792 arg_strings = self._read_args_from_files(arg_strings)
793 new_arg_strings.extend(arg_strings)
797 err = sys.exc_info()[1]
800 # return the modified argument list
801 return new_arg_strings
803 def convert_arg_line_to_args(self, arg_line):
804 return [arg for arg in arg_line.split() if arg.strip()]
807 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
809 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
810 parser.add_argument('--version', action='version', version=p2pool.__version__)
811 parser.add_argument('--net',
812 help='use specified network (default: bitcoin)',
813 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
814 parser.add_argument('--testnet',
815 help='''use the network's testnet''',
816 action='store_const', const=True, default=False, dest='testnet')
817 parser.add_argument('--debug',
818 help='enable debugging mode',
819 action='store_const', const=True, default=False, dest='debug')
820 parser.add_argument('-a', '--address',
821 help='generate payouts to this address (default: <address requested from bitcoind>)',
822 type=str, action='store', default=None, dest='address')
823 parser.add_argument('--datadir',
824 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
825 type=str, action='store', default=None, dest='datadir')
826 parser.add_argument('--logfile',
827 help='''log to this file (default: data/<NET>/log)''',
828 type=str, action='store', default=None, dest='logfile')
829 parser.add_argument('--merged',
830 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
831 type=str, action='append', default=[], dest='merged_urls')
832 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
833 help='donate this percentage of work to author of p2pool (default: 0.5)',
834 type=float, action='store', default=0.5, dest='donation_percentage')
835 parser.add_argument('--irc-announce',
836 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
837 action='store_true', default=False, dest='irc_announce')
839 p2pool_group = parser.add_argument_group('p2pool interface')
840 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
841 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
842 type=int, action='store', default=None, dest='p2pool_port')
843 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
844 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
845 type=str, action='append', default=[], dest='p2pool_nodes')
846 parser.add_argument('--disable-upnp',
847 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
848 action='store_false', default=True, dest='upnp')
849 p2pool_group.add_argument('--max-conns', metavar='CONNS',
850 help='maximum incoming connections (default: 40)',
851 type=int, action='store', default=40, dest='p2pool_conns')
853 worker_group = parser.add_argument_group('worker interface')
854 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
855 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
856 type=str, action='store', default=None, dest='worker_endpoint')
857 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
858 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
859 type=float, action='store', default=0, dest='worker_fee')
861 bitcoind_group = parser.add_argument_group('bitcoind interface')
862 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
863 help='connect to this address (default: 127.0.0.1)',
864 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
865 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
866 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
867 type=int, action='store', default=None, dest='bitcoind_rpc_port')
868 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
869 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
870 type=int, action='store', default=None, dest='bitcoind_p2p_port')
872 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
873 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
874 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
876 args = parser.parse_args()
881 net_name = args.net_name + ('_testnet' if args.testnet else '')
882 net = networks.nets[net_name]
884 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
885 if not os.path.exists(datadir_path):
886 os.makedirs(datadir_path)
888 if len(args.bitcoind_rpc_userpass) > 2:
889 parser.error('a maximum of two arguments are allowed')
890 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
892 if args.bitcoind_rpc_password is None:
893 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
894 parser.error('This network has no configuration file function. Manually enter your RPC password.')
895 conf_path = net.PARENT.CONF_FILE_FUNC()
896 if not os.path.exists(conf_path):
897 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
898 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
901 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
902 with open(conf_path, 'rb') as f:
903 cp = ConfigParser.RawConfigParser()
904 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
905 for conf_name, var_name, var_type in [
906 ('rpcuser', 'bitcoind_rpc_username', str),
907 ('rpcpassword', 'bitcoind_rpc_password', str),
908 ('rpcport', 'bitcoind_rpc_port', int),
909 ('port', 'bitcoind_p2p_port', int),
911 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
912 setattr(args, var_name, var_type(cp.get('x', conf_name)))
913 if args.bitcoind_rpc_password is None:
914 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
916 if args.bitcoind_rpc_username is None:
917 args.bitcoind_rpc_username = ''
919 if args.bitcoind_rpc_port is None:
920 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
922 if args.bitcoind_p2p_port is None:
923 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
925 if args.p2pool_port is None:
926 args.p2pool_port = net.P2P_PORT
928 if args.worker_endpoint is None:
929 worker_endpoint = '', net.WORKER_PORT
930 elif ':' not in args.worker_endpoint:
931 worker_endpoint = '', int(args.worker_endpoint)
933 addr, port = args.worker_endpoint.rsplit(':', 1)
934 worker_endpoint = addr, int(port)
936 if args.address is not None:
938 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
940 parser.error('error parsing address: ' + repr(e))
942 args.pubkey_hash = None
944 def separate_url(url):
945 s = urlparse.urlsplit(url)
946 if '@' not in s.netloc:
947 parser.error('merged url netloc must contain an "@"')
948 userpass, new_netloc = s.netloc.rsplit('@', 1)
949 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
950 merged_urls = map(separate_url, args.merged_urls)
952 if args.logfile is None:
953 args.logfile = os.path.join(datadir_path, 'log')
955 logfile = logging.LogFile(args.logfile)
956 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
957 sys.stdout = logging.AbortPipe(pipe)
958 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
959 if hasattr(signal, "SIGUSR1"):
960 def sigusr1(signum, frame):
961 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
963 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
964 signal.signal(signal.SIGUSR1, sigusr1)
965 task.LoopingCall(logfile.reopen).start(5)
967 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)