1 from __future__ import division
14 from twisted.internet import defer, reactor, protocol, task
15 from twisted.web import server
16 from twisted.python import log
17 from nattraverso import portmapper, ipdiscover
19 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
20 from bitcoin import worker_interface, height_tracker
21 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
22 from . import p2p, networks, web
23 import p2pool, p2pool.data as p2pool_data
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
29 work = yield bitcoind.rpc_getmemorypool()
30 except jsonrpc.Error, e:
31 if e.code == -32601: # Method not found
32 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
33 raise deferral.RetrySilentlyException()
35 packed_transactions = [x.decode('hex') for x in work['transactions']]
36 defer.returnValue(dict(
37 version=work['version'],
38 previous_block_hash=int(work['previousblockhash'], 16),
39 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
40 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
41 subsidy=work['coinbasevalue'],
43 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
44 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
47 @defer.inlineCallbacks
48 def main(args, net, datadir_path, merged_urls, worker_endpoint):
50 print 'p2pool (version %s)' % (p2pool.__version__,)
53 # connect to bitcoind over JSON-RPC and do initial getmemorypool
54 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
55 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
56 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
57 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61 temp_work = yield getwork(bitcoind)
63 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
66 # connect to bitcoind over bitcoin-p2p
67 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
68 factory = bitcoin_p2p.ClientFactory(net.PARENT)
69 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
70 yield factory.getProtocol() # waits until handshake is successful
74 print 'Determining payout address...'
75 if args.pubkey_hash is None:
76 address_path = os.path.join(datadir_path, 'cached_payout_address')
78 if os.path.exists(address_path):
79 with open(address_path, 'rb') as f:
80 address = f.read().strip('\r\n')
81 print ' Loaded cached address: %s...' % (address,)
85 if address is not None:
86 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
87 if not res['isvalid'] or not res['ismine']:
88 print ' Cached address is either invalid or not controlled by local bitcoind!'
92 print ' Getting payout address from bitcoind...'
93 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
95 with open(address_path, 'wb') as f:
98 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
100 my_pubkey_hash = args.pubkey_hash
101 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
104 my_share_hashes = set()
105 my_doa_share_hashes = set()
107 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
108 shared_share_hashes = set()
109 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
110 known_verified = set()
112 print "Loading shares..."
113 for i, (mode, contents) in enumerate(ss.get_shares()):
115 if contents.hash in tracker.shares:
117 shared_share_hashes.add(contents.hash)
118 contents.time_seen = 0
119 tracker.add(contents)
120 if len(tracker.shares) % 1000 == 0 and tracker.shares:
121 print " %i" % (len(tracker.shares),)
122 elif mode == 'verified_hash':
123 known_verified.add(contents)
125 raise AssertionError()
126 print " ...inserting %i verified shares..." % (len(known_verified),)
127 for h in known_verified:
128 if h not in tracker.shares:
129 ss.forget_verified_share(h)
131 tracker.verified.add(tracker.shares[h])
132 print " ...done loading %i shares!" % (len(tracker.shares),)
134 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
135 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
136 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
138 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
140 pre_current_work = variable.Variable(None)
141 pre_merged_work = variable.Variable({})
142 # information affecting work that should trigger a long-polling update
143 current_work = variable.Variable(None)
144 # information affecting work that should not trigger a long-polling update
145 current_work2 = variable.Variable(None)
147 requested = expiring_dict.ExpiringDict(300)
149 print 'Initializing work...'
150 @defer.inlineCallbacks
151 def set_real_work1():
152 work = yield getwork(bitcoind)
153 current_work2.set(dict(
155 transactions=work['transactions'],
156 merkle_branch=work['merkle_branch'],
157 subsidy=work['subsidy'],
158 clock_offset=time.time() - work['time'],
159 last_update=time.time(),
160 )) # second set first because everything hooks on the first
161 pre_current_work.set(dict(
162 version=work['version'],
163 previous_block=work['previous_block_hash'],
165 coinbaseflags=work['coinbaseflags'],
167 yield set_real_work1()
169 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, pre_current_work, net)
171 def set_real_work2():
172 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
174 t = dict(pre_current_work.value)
175 t['best_share_hash'] = best
176 t['mm_chains'] = pre_merged_work.value
180 for peer2, share_hash in desired:
181 if share_hash not in tracker.tails: # was received in the time tracker.think was running
183 last_request_time, count = requested.get(share_hash, (None, 0))
184 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
186 potential_peers = set()
187 for head in tracker.tails[share_hash]:
188 potential_peers.update(peer_heads.get(head, set()))
189 potential_peers = [peer for peer in potential_peers if peer.connected2]
190 if count == 0 and peer2 is not None and peer2.connected2:
193 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
197 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
201 stops=list(set(tracker.heads) | set(
202 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
205 requested[share_hash] = t, count + 1
206 pre_current_work.changed.watch(lambda _: set_real_work2())
207 pre_merged_work.changed.watch(lambda _: set_real_work2())
213 @defer.inlineCallbacks
214 def set_merged_work(merged_url, merged_userpass):
215 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
217 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
218 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
219 hash=int(auxblock['hash'], 16),
220 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
221 merged_proxy=merged_proxy,
223 yield deferral.sleep(1)
224 for merged_url, merged_userpass in merged_urls:
225 set_merged_work(merged_url, merged_userpass)
227 @pre_merged_work.changed.watch
228 def _(new_merged_work):
229 print 'Got new merged mining work!'
231 # setup p2p logic and join p2pool network
233 class Node(p2p.Node):
234 def handle_shares(self, shares, peer):
236 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
240 if share.hash in tracker.shares:
241 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
246 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
250 if shares and peer is not None:
251 peer_heads.setdefault(shares[0].hash, set()).add(peer)
257 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
259 def handle_share_hashes(self, hashes, peer):
262 for share_hash in hashes:
263 if share_hash in tracker.shares:
265 last_request_time, count = requested.get(share_hash, (None, 0))
266 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
268 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
269 get_hashes.append(share_hash)
270 requested[share_hash] = t, count + 1
272 if hashes and peer is not None:
273 peer_heads.setdefault(hashes[0], set()).add(peer)
275 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
277 def handle_get_shares(self, hashes, parents, stops, peer):
278 parents = min(parents, 1000//len(hashes))
281 for share_hash in hashes:
282 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
283 if share.hash in stops:
286 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
287 peer.sendShares(shares)
289 @deferral.retry('Error submitting block: (will retry)', 10, 10)
290 @defer.inlineCallbacks
291 def submit_block(block, ignore_failure):
292 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
293 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
294 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
295 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
297 @tracker.verified.added.watch
299 if share.pow_hash <= share.header['bits'].target:
300 submit_block(share.as_block(tracker), ignore_failure=True)
302 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
304 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
306 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
308 @defer.inlineCallbacks
311 ip, port = x.split(':')
312 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
314 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
317 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
319 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
321 print >>sys.stderr, "error reading addrs"
322 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
325 if addr not in addrs:
326 addrs[addr] = (0, time.time(), time.time())
330 connect_addrs = set()
331 for addr_df in map(parse, args.p2pool_nodes):
333 connect_addrs.add((yield addr_df))
338 best_share_hash_func=lambda: current_work.value['best_share_hash'],
339 port=args.p2pool_port,
342 connect_addrs=connect_addrs,
346 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
348 # send share when the chain changes to their chain
349 def work_changed(new_work):
350 #print 'Work changed:', new_work
352 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
353 if share.hash in shared_share_hashes:
355 shared_share_hashes.add(share.hash)
358 for peer in p2p_node.peers.itervalues():
359 peer.sendShares([share for share in shares if share.peer is not peer])
361 current_work.changed.watch(work_changed)
364 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
366 if share.hash in tracker.verified.shares:
367 ss.add_verified_hash(share.hash)
368 task.LoopingCall(save_shares).start(60)
374 @defer.inlineCallbacks
378 is_lan, lan_ip = yield ipdiscover.get_local_ip()
380 pm = yield portmapper.get_port_mapper()
381 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
382 except defer.TimeoutError:
386 log.err(None, 'UPnP error:')
387 yield deferral.sleep(random.expovariate(1/120))
390 # start listening for workers with a JSON-RPC server
392 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
394 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
395 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
396 vip_pass = f.read().strip('\r\n')
398 vip_pass = '%016x' % (random.randrange(2**64),)
399 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
401 print ' Worker password:', vip_pass, '(only required for generating graphs)'
405 removed_unstales_var = variable.Variable((0, 0, 0))
406 removed_doa_unstales_var = variable.Variable(0)
407 @tracker.verified.removed.watch
409 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
410 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
411 removed_unstales_var.set((
412 removed_unstales_var.value[0] + 1,
413 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
414 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
416 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
417 removed_doa_unstales.set(removed_doa_unstales.value + 1)
419 def get_stale_counts():
420 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
421 my_shares = len(my_share_hashes)
422 my_doa_shares = len(my_doa_share_hashes)
423 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
424 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
425 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
426 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
427 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
429 my_shares_not_in_chain = my_shares - my_shares_in_chain
430 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
432 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
435 pseudoshare_received = variable.Event()
436 local_rate_monitor = math.RateMonitor(10*60)
438 class WorkerBridge(worker_interface.WorkerBridge):
440 worker_interface.WorkerBridge.__init__(self)
441 self.new_work_event = current_work.changed
442 self.recent_shares_ts_work = []
444 def preprocess_request(self, request):
445 user = request.getUser() if request.getUser() is not None else ''
447 desired_pseudoshare_target = None
449 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
451 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
455 desired_share_target = 2**256 - 1
457 user, min_diff_str = user.rsplit('/', 1)
459 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
463 if random.uniform(0, 100) < args.worker_fee:
464 pubkey_hash = my_pubkey_hash
467 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
469 pubkey_hash = my_pubkey_hash
471 return pubkey_hash, desired_share_target, desired_pseudoshare_target
473 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
474 if len(p2p_node.peers) == 0 and net.PERSIST:
475 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
476 if current_work.value['best_share_hash'] is None and net.PERSIST:
477 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
478 if time.time() > current_work2.value['last_update'] + 60:
479 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
481 if current_work.value['mm_chains']:
482 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
483 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
484 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
485 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
489 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
494 share_info, generate_tx = p2pool_data.generate_transaction(
497 previous_share_hash=current_work.value['best_share_hash'],
498 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
499 nonce=random.randrange(2**32),
500 pubkey_hash=pubkey_hash,
501 subsidy=current_work2.value['subsidy'],
502 donation=math.perfect_round(65535*args.donation_percentage/100),
503 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
504 253 if orphans > orphans_recorded_in_chain else
505 254 if doas > doas_recorded_in_chain else
507 )(*get_stale_counts()),
509 block_target=current_work.value['bits'].target,
510 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
511 desired_target=desired_share_target,
515 target = net.PARENT.SANE_MAX_TARGET
516 if desired_pseudoshare_target is None:
517 if len(self.recent_shares_ts_work) == 50:
518 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
519 target = min(target, 2**256//hash_rate)
521 target = min(target, desired_pseudoshare_target)
522 target = max(target, share_info['bits'].target)
523 for aux_work in current_work.value['mm_chains'].itervalues():
524 target = max(target, aux_work['target'])
526 transactions = [generate_tx] + list(current_work2.value['transactions'])
527 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
528 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
530 getwork_time = time.time()
531 merkle_branch = current_work2.value['merkle_branch']
533 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
534 bitcoin_data.target_to_difficulty(target),
535 bitcoin_data.target_to_difficulty(share_info['bits'].target),
536 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
537 len(current_work2.value['transactions']),
540 ba = bitcoin_getwork.BlockAttempt(
541 version=current_work.value['version'],
542 previous_block=current_work.value['previous_block'],
543 merkle_root=merkle_root,
544 timestamp=current_work2.value['time'],
545 bits=current_work.value['bits'],
549 received_header_hashes = set()
551 def got_response(header, request):
552 assert header['merkle_root'] == merkle_root
554 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
555 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
556 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
559 if pow_hash <= header['bits'].target or p2pool.DEBUG:
560 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
561 if pow_hash <= header['bits'].target:
563 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
565 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
567 log.err(None, 'Error while processing potential block:')
569 for aux_work, index, hashes in mm_later:
571 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
572 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
573 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
574 bitcoin_data.aux_pow_type.pack(dict(
577 block_hash=header_hash,
578 merkle_branch=merkle_branch,
581 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
583 parent_block_header=header,
588 if result != (pow_hash <= aux_work['target']):
589 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
591 print 'Merged block submittal result: %s' % (result,)
594 log.err(err, 'Error submitting merged block:')
596 log.err(None, 'Error while processing merged mining POW:')
598 if pow_hash <= share_info['bits'].target:
599 min_header = dict(header);del min_header['merkle_root']
600 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
601 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
603 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
605 p2pool_data.format_hash(share.hash),
606 p2pool_data.format_hash(share.previous_hash),
607 time.time() - getwork_time,
608 ' DEAD ON ARRIVAL' if not on_time else '',
610 my_share_hashes.add(share.hash)
612 my_doa_share_hashes.add(share.hash)
616 tracker.verified.add(share)
620 if pow_hash <= header['bits'].target or p2pool.DEBUG:
621 for peer in p2p_node.peers.itervalues():
622 peer.sendShares([share])
623 shared_share_hashes.add(share.hash)
625 log.err(None, 'Error forwarding block solution:')
627 if pow_hash > target:
628 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
629 print ' Hash: %56x' % (pow_hash,)
630 print ' Target: %56x' % (target,)
631 elif header_hash in received_header_hashes:
632 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
634 received_header_hashes.add(header_hash)
636 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
637 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
638 while len(self.recent_shares_ts_work) > 50:
639 self.recent_shares_ts_work.pop(0)
640 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
644 return ba, got_response
646 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
648 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
649 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
651 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
653 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
660 @defer.inlineCallbacks
663 flag = factory.new_block.get_deferred()
665 yield set_real_work1()
668 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
673 print 'Started successfully!'
677 if hasattr(signal, 'SIGALRM'):
678 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
679 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
681 signal.siginterrupt(signal.SIGALRM, False)
682 task.LoopingCall(signal.alarm, 30).start(1)
684 if args.irc_announce:
685 from twisted.words.protocols import irc
686 class IRCClient(irc.IRCClient):
687 nickname = 'p2pool%02i' % (random.randrange(100),)
688 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
689 def lineReceived(self, line):
691 irc.IRCClient.lineReceived(self, line)
693 irc.IRCClient.signedOn(self)
694 self.factory.resetDelay()
695 self.join(self.channel)
696 self.watch_id = tracker.verified.added.watch(self._new_share)
697 self.announced_hashes = set()
698 self.delayed_messages = {}
699 def privmsg(self, user, channel, message):
700 if channel == self.channel and message in self.delayed_messages:
701 self.delayed_messages.pop(message).cancel()
702 def _new_share(self, share):
703 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
704 self.announced_hashes.add(share.header_hash)
705 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
706 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/60), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
707 def connectionLost(self, reason):
708 tracker.verified.added.unwatch(self.watch_id)
709 print 'IRC connection lost:', reason.getErrorMessage()
710 class IRCClientFactory(protocol.ReconnectingClientFactory):
712 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
714 @defer.inlineCallbacks
719 yield deferral.sleep(3)
721 if time.time() > current_work2.value['last_update'] + 60:
722 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
724 height = tracker.get_height(current_work.value['best_share_hash'])
725 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
727 len(tracker.verified.shares),
730 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
731 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
733 datums, dt = local_rate_monitor.get_datums_in_last()
734 my_att_s = sum(datum['work']/dt for datum in datums)
735 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
736 math.format(int(my_att_s)),
738 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
739 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
743 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
744 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
745 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
747 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
748 shares, stale_orphan_shares, stale_doa_shares,
749 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
750 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
751 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
753 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
754 math.format(int(real_att_s)),
756 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
759 if this_str != last_str or time.time() > last_time + 15:
762 last_time = time.time()
767 log.err(None, 'Fatal error:')
771 class FixedArgumentParser(argparse.ArgumentParser):
772 def _read_args_from_files(self, arg_strings):
773 # expand arguments referencing files
775 for arg_string in arg_strings:
777 # for regular arguments, just add them back into the list
778 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
779 new_arg_strings.append(arg_string)
781 # replace arguments referencing files with the file content
784 args_file = open(arg_string[1:])
787 for arg_line in args_file.read().splitlines():
788 for arg in self.convert_arg_line_to_args(arg_line):
789 arg_strings.append(arg)
790 arg_strings = self._read_args_from_files(arg_strings)
791 new_arg_strings.extend(arg_strings)
795 err = sys.exc_info()[1]
798 # return the modified argument list
799 return new_arg_strings
801 def convert_arg_line_to_args(self, arg_line):
802 return [arg for arg in arg_line.split() if arg.strip()]
805 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
807 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
808 parser.add_argument('--version', action='version', version=p2pool.__version__)
809 parser.add_argument('--net',
810 help='use specified network (default: bitcoin)',
811 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
812 parser.add_argument('--testnet',
813 help='''use the network's testnet''',
814 action='store_const', const=True, default=False, dest='testnet')
815 parser.add_argument('--debug',
816 help='enable debugging mode',
817 action='store_const', const=True, default=False, dest='debug')
818 parser.add_argument('-a', '--address',
819 help='generate payouts to this address (default: <address requested from bitcoind>)',
820 type=str, action='store', default=None, dest='address')
821 parser.add_argument('--datadir',
822 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
823 type=str, action='store', default=None, dest='datadir')
824 parser.add_argument('--logfile',
825 help='''log to this file (default: data/<NET>/log)''',
826 type=str, action='store', default=None, dest='logfile')
827 parser.add_argument('--merged',
828 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
829 type=str, action='append', default=[], dest='merged_urls')
830 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
831 help='donate this percentage of work to author of p2pool (default: 0.5)',
832 type=float, action='store', default=0.5, dest='donation_percentage')
833 parser.add_argument('--irc-announce',
834 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
835 action='store_true', default=False, dest='irc_announce')
837 p2pool_group = parser.add_argument_group('p2pool interface')
838 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
839 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
840 type=int, action='store', default=None, dest='p2pool_port')
841 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
842 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
843 type=str, action='append', default=[], dest='p2pool_nodes')
844 parser.add_argument('--disable-upnp',
845 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
846 action='store_false', default=True, dest='upnp')
848 worker_group = parser.add_argument_group('worker interface')
849 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
850 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
851 type=str, action='store', default=None, dest='worker_endpoint')
852 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
853 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
854 type=float, action='store', default=0, dest='worker_fee')
856 bitcoind_group = parser.add_argument_group('bitcoind interface')
857 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
858 help='connect to this address (default: 127.0.0.1)',
859 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
860 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
861 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
862 type=int, action='store', default=None, dest='bitcoind_rpc_port')
863 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
864 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
865 type=int, action='store', default=None, dest='bitcoind_p2p_port')
867 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
868 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
869 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
871 args = parser.parse_args()
876 net_name = args.net_name + ('_testnet' if args.testnet else '')
877 net = networks.nets[net_name]
879 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
880 if not os.path.exists(datadir_path):
881 os.makedirs(datadir_path)
883 if len(args.bitcoind_rpc_userpass) > 2:
884 parser.error('a maximum of two arguments are allowed')
885 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
887 if args.bitcoind_rpc_password is None:
888 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
889 parser.error('This network has no configuration file function. Manually enter your RPC password.')
890 conf_path = net.PARENT.CONF_FILE_FUNC()
891 if not os.path.exists(conf_path):
892 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
893 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
896 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
897 with open(conf_path, 'rb') as f:
898 cp = ConfigParser.RawConfigParser()
899 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
900 for conf_name, var_name, var_type in [
901 ('rpcuser', 'bitcoind_rpc_username', str),
902 ('rpcpassword', 'bitcoind_rpc_password', str),
903 ('rpcport', 'bitcoind_rpc_port', int),
904 ('port', 'bitcoind_p2p_port', int),
906 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
907 setattr(args, var_name, var_type(cp.get('x', conf_name)))
908 if args.bitcoind_rpc_password is None:
909 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
911 if args.bitcoind_rpc_username is None:
912 args.bitcoind_rpc_username = ''
914 if args.bitcoind_rpc_port is None:
915 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
917 if args.bitcoind_p2p_port is None:
918 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
920 if args.p2pool_port is None:
921 args.p2pool_port = net.P2P_PORT
923 if args.worker_endpoint is None:
924 worker_endpoint = '', net.WORKER_PORT
925 elif ':' not in args.worker_endpoint:
926 worker_endpoint = '', int(args.worker_endpoint)
928 addr, port = args.worker_endpoint.rsplit(':', 1)
929 worker_endpoint = addr, int(port)
931 if args.address is not None:
933 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
935 parser.error('error parsing address: ' + repr(e))
937 args.pubkey_hash = None
939 def separate_url(url):
940 s = urlparse.urlsplit(url)
941 if '@' not in s.netloc:
942 parser.error('merged url netloc must contain an "@"')
943 userpass, new_netloc = s.netloc.rsplit('@', 1)
944 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
945 merged_urls = map(separate_url, args.merged_urls)
947 if args.logfile is None:
948 args.logfile = os.path.join(datadir_path, 'log')
950 logfile = logging.LogFile(args.logfile)
951 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
952 sys.stdout = logging.AbortPipe(pipe)
953 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
954 if hasattr(signal, "SIGUSR1"):
955 def sigusr1(signum, frame):
956 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
958 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
959 signal.signal(signal.SIGUSR1, sigusr1)
960 task.LoopingCall(logfile.reopen).start(5)
962 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)