1 from __future__ import division
16 from twisted.internet import iocpreactor
21 print 'Using IOCP reactor!'
22 from twisted.internet import defer, reactor, protocol, task
23 from twisted.web import server
24 from twisted.python import log
25 from nattraverso import portmapper, ipdiscover
27 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
28 from bitcoin import worker_interface, height_tracker
29 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
30 from . import p2p, networks, web
31 import p2pool, p2pool.data as p2pool_data
33 @deferral.retry('Error getting work from bitcoind:', 3)
34 @defer.inlineCallbacks
35 def getwork(bitcoind):
37 work = yield bitcoind.rpc_getmemorypool()
38 except jsonrpc.Error, e:
39 if e.code == -32601: # Method not found
40 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
41 raise deferral.RetrySilentlyException()
43 packed_transactions = [x.decode('hex') for x in work['transactions']]
44 defer.returnValue(dict(
45 version=work['version'],
46 previous_block_hash=int(work['previousblockhash'], 16),
47 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
48 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
49 subsidy=work['coinbasevalue'],
51 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
52 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58 print 'p2pool (version %s)' % (p2pool.__version__,)
61 # connect to bitcoind over JSON-RPC and do initial getmemorypool
62 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65 @deferral.retry('Error while checking Bitcoin connection:', 1)
66 @defer.inlineCallbacks
68 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
69 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70 raise deferral.RetrySilentlyException()
71 temp_work = yield getwork(bitcoind)
72 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74 raise deferral.RetrySilentlyException()
75 defer.returnValue(temp_work)
76 temp_work = yield check()
78 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
81 # connect to bitcoind over bitcoin-p2p
82 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83 factory = bitcoin_p2p.ClientFactory(net.PARENT)
84 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85 yield factory.getProtocol() # waits until handshake is successful
89 print 'Determining payout address...'
90 if args.pubkey_hash is None:
91 address_path = os.path.join(datadir_path, 'cached_payout_address')
93 if os.path.exists(address_path):
94 with open(address_path, 'rb') as f:
95 address = f.read().strip('\r\n')
96 print ' Loaded cached address: %s...' % (address,)
100 if address is not None:
101 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
102 if not res['isvalid'] or not res['ismine']:
103 print ' Cached address is either invalid or not controlled by local bitcoind!'
107 print ' Getting payout address from bitcoind...'
108 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
110 with open(address_path, 'wb') as f:
113 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
115 my_pubkey_hash = args.pubkey_hash
116 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
119 my_share_hashes = set()
120 my_doa_share_hashes = set()
122 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
123 shared_share_hashes = set()
124 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
125 known_verified = set()
126 print "Loading shares..."
127 for i, (mode, contents) in enumerate(ss.get_shares()):
129 if contents.hash in tracker.shares:
131 shared_share_hashes.add(contents.hash)
132 contents.time_seen = 0
133 tracker.add(contents)
134 if len(tracker.shares) % 1000 == 0 and tracker.shares:
135 print " %i" % (len(tracker.shares),)
136 elif mode == 'verified_hash':
137 known_verified.add(contents)
139 raise AssertionError()
140 print " ...inserting %i verified shares..." % (len(known_verified),)
141 for h in known_verified:
142 if h not in tracker.shares:
143 ss.forget_verified_share(h)
145 tracker.verified.add(tracker.shares[h])
146 print " ...done loading %i shares!" % (len(tracker.shares),)
148 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
149 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
150 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
152 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
154 pre_current_work = variable.Variable(None)
155 pre_merged_work = variable.Variable({})
156 # information affecting work that should trigger a long-polling update
157 current_work = variable.Variable(None)
158 # information affecting work that should not trigger a long-polling update
159 current_work2 = variable.Variable(None)
161 requested = expiring_dict.ExpiringDict(300)
163 print 'Initializing work...'
164 @defer.inlineCallbacks
165 def set_real_work1():
166 work = yield getwork(bitcoind)
167 current_work2.set(dict(
169 transactions=work['transactions'],
170 merkle_link=work['merkle_link'],
171 subsidy=work['subsidy'],
172 clock_offset=time.time() - work['time'],
173 last_update=time.time(),
174 )) # second set first because everything hooks on the first
175 pre_current_work.set(dict(
176 version=work['version'],
177 previous_block=work['previous_block_hash'],
179 coinbaseflags=work['coinbaseflags'],
181 yield set_real_work1()
183 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
185 def set_real_work2():
186 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
188 t = dict(pre_current_work.value)
189 t['best_share_hash'] = best
190 t['mm_chains'] = pre_merged_work.value
194 for peer2, share_hash in desired:
195 if share_hash not in tracker.tails: # was received in the time tracker.think was running
197 last_request_time, count = requested.get(share_hash, (None, 0))
198 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
200 potential_peers = set()
201 for head in tracker.tails[share_hash]:
202 potential_peers.update(peer_heads.get(head, set()))
203 potential_peers = [peer for peer in potential_peers if peer.connected2]
204 if count == 0 and peer2 is not None and peer2.connected2:
207 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
211 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
215 stops=list(set(tracker.heads) | set(
216 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
219 requested[share_hash] = t, count + 1
220 pre_current_work.changed.watch(lambda _: set_real_work2())
221 pre_merged_work.changed.watch(lambda _: set_real_work2())
227 @defer.inlineCallbacks
228 def set_merged_work(merged_url, merged_userpass):
229 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
231 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
232 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
233 hash=int(auxblock['hash'], 16),
234 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
235 merged_proxy=merged_proxy,
237 yield deferral.sleep(1)
238 for merged_url, merged_userpass in merged_urls:
239 set_merged_work(merged_url, merged_userpass)
241 @pre_merged_work.changed.watch
242 def _(new_merged_work):
243 print 'Got new merged mining work!'
245 # setup p2p logic and join p2pool network
247 class Node(p2p.Node):
248 def handle_shares(self, shares, peer):
250 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
254 if share.hash in tracker.shares:
255 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
260 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
264 if shares and peer is not None:
265 peer_heads.setdefault(shares[0].hash, set()).add(peer)
271 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273 def handle_share_hashes(self, hashes, peer):
276 for share_hash in hashes:
277 if share_hash in tracker.shares:
279 last_request_time, count = requested.get(share_hash, (None, 0))
280 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
283 get_hashes.append(share_hash)
284 requested[share_hash] = t, count + 1
286 if hashes and peer is not None:
287 peer_heads.setdefault(hashes[0], set()).add(peer)
289 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291 def handle_get_shares(self, hashes, parents, stops, peer):
292 parents = min(parents, 1000//len(hashes))
295 for share_hash in hashes:
296 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
297 if share.hash in stops:
300 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
303 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
304 def submit_block_p2p(block):
305 if factory.conn.value is None:
306 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
307 raise deferral.RetrySilentlyException()
308 factory.conn.value.send_block(block=block)
310 @deferral.retry('Error submitting block: (will retry)', 10, 10)
311 @defer.inlineCallbacks
312 def submit_block_rpc(block, ignore_failure):
313 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
314 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
315 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
316 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
318 def submit_block(block, ignore_failure):
319 submit_block_p2p(block)
320 submit_block_rpc(block, ignore_failure)
322 @tracker.verified.added.watch
324 if share.pow_hash <= share.header['bits'].target:
325 submit_block(share.as_block(tracker), ignore_failure=True)
327 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
329 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
330 broadcast_share(share.hash)
332 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
334 @defer.inlineCallbacks
337 ip, port = x.split(':')
338 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
340 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
343 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
345 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
347 print >>sys.stderr, "error reading addrs"
348 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
351 if addr not in addrs:
352 addrs[addr] = (0, time.time(), time.time())
356 connect_addrs = set()
357 for addr_df in map(parse, args.p2pool_nodes):
359 connect_addrs.add((yield addr_df))
364 best_share_hash_func=lambda: current_work.value['best_share_hash'],
365 port=args.p2pool_port,
368 connect_addrs=connect_addrs,
369 max_incoming_conns=args.p2pool_conns,
373 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
375 def broadcast_share(share_hash):
377 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
378 if share.hash in shared_share_hashes:
380 shared_share_hashes.add(share.hash)
383 for peer in p2p_node.peers.itervalues():
384 peer.sendShares([share for share in shares if share.peer is not peer])
386 # send share when the chain changes to their chain
387 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
390 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
392 if share.hash in tracker.verified.shares:
393 ss.add_verified_hash(share.hash)
394 task.LoopingCall(save_shares).start(60)
400 @defer.inlineCallbacks
404 is_lan, lan_ip = yield ipdiscover.get_local_ip()
406 pm = yield portmapper.get_port_mapper()
407 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
408 except defer.TimeoutError:
412 log.err(None, 'UPnP error:')
413 yield deferral.sleep(random.expovariate(1/120))
416 # start listening for workers with a JSON-RPC server
418 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
422 removed_unstales_var = variable.Variable((0, 0, 0))
423 removed_doa_unstales_var = variable.Variable(0)
424 @tracker.verified.removed.watch
426 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
428 removed_unstales_var.set((
429 removed_unstales_var.value[0] + 1,
430 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
431 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
433 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
434 removed_doa_unstales.set(removed_doa_unstales.value + 1)
436 def get_stale_counts():
437 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
438 my_shares = len(my_share_hashes)
439 my_doa_shares = len(my_doa_share_hashes)
440 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
441 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
442 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
443 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
444 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
446 my_shares_not_in_chain = my_shares - my_shares_in_chain
447 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
449 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
452 pseudoshare_received = variable.Event()
453 share_received = variable.Event()
454 local_rate_monitor = math.RateMonitor(10*60)
456 class WorkerBridge(worker_interface.WorkerBridge):
458 worker_interface.WorkerBridge.__init__(self)
459 self.new_work_event = current_work.changed
460 self.recent_shares_ts_work = []
462 def get_user_details(self, request):
463 user = request.getUser() if request.getUser() is not None else ''
465 desired_pseudoshare_target = None
467 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
469 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
473 desired_share_target = 2**256 - 1
475 user, min_diff_str = user.rsplit('/', 1)
477 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
481 if random.uniform(0, 100) < args.worker_fee:
482 pubkey_hash = my_pubkey_hash
485 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
487 pubkey_hash = my_pubkey_hash
489 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
491 def preprocess_request(self, request):
492 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
493 return pubkey_hash, desired_share_target, desired_pseudoshare_target
495 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
496 if len(p2p_node.peers) == 0 and net.PERSIST:
497 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
498 if current_work.value['best_share_hash'] is None and net.PERSIST:
499 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
500 if time.time() > current_work2.value['last_update'] + 60:
501 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
503 if current_work.value['mm_chains']:
504 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
505 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
506 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
507 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
511 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
517 share_info, generate_tx = p2pool_data.Share.generate_transaction(
520 previous_share_hash=current_work.value['best_share_hash'],
521 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
522 nonce=random.randrange(2**32),
523 pubkey_hash=pubkey_hash,
524 subsidy=current_work2.value['subsidy'],
525 donation=math.perfect_round(65535*args.donation_percentage/100),
526 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
527 253 if orphans > orphans_recorded_in_chain else
528 254 if doas > doas_recorded_in_chain else
530 )(*get_stale_counts()),
533 block_target=current_work.value['bits'].target,
534 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
535 desired_target=desired_share_target,
536 ref_merkle_link=dict(branch=[], index=0),
540 target = net.PARENT.SANE_MAX_TARGET
541 if desired_pseudoshare_target is None:
542 if len(self.recent_shares_ts_work) == 50:
543 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
544 target = min(target, 2**256//hash_rate)
546 target = min(target, desired_pseudoshare_target)
547 target = max(target, share_info['bits'].target)
548 for aux_work in current_work.value['mm_chains'].itervalues():
549 target = max(target, aux_work['target'])
551 transactions = [generate_tx] + list(current_work2.value['transactions'])
552 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
553 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
555 getwork_time = time.time()
556 merkle_link = current_work2.value['merkle_link']
558 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
559 bitcoin_data.target_to_difficulty(target),
560 bitcoin_data.target_to_difficulty(share_info['bits'].target),
561 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
562 len(current_work2.value['transactions']),
565 ba = bitcoin_getwork.BlockAttempt(
566 version=current_work.value['version'],
567 previous_block=current_work.value['previous_block'],
568 merkle_root=merkle_root,
569 timestamp=current_work2.value['time'],
570 bits=current_work.value['bits'],
574 received_header_hashes = set()
576 def got_response(header, request):
577 user, _, _, _ = self.get_user_details(request)
578 assert header['merkle_root'] == merkle_root
580 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
581 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
582 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
585 if pow_hash <= header['bits'].target or p2pool.DEBUG:
586 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
587 if pow_hash <= header['bits'].target:
589 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
592 log.err(None, 'Error while processing potential block:')
594 for aux_work, index, hashes in mm_later:
596 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
597 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
598 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
599 bitcoin_data.aux_pow_type.pack(dict(
602 block_hash=header_hash,
603 merkle_link=merkle_link,
605 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
606 parent_block_header=header,
611 if result != (pow_hash <= aux_work['target']):
612 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
614 print 'Merged block submittal result: %s' % (result,)
617 log.err(err, 'Error submitting merged block:')
619 log.err(None, 'Error while processing merged mining POW:')
621 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
622 min_header = dict(header);del min_header['merkle_root']
623 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
624 share = p2pool_data.Share(net, None, dict(
625 min_header=min_header, share_info=share_info, hash_link=hash_link,
626 ref_merkle_link=dict(branch=[], index=0),
627 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
629 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
631 p2pool_data.format_hash(share.hash),
632 p2pool_data.format_hash(share.previous_hash),
633 time.time() - getwork_time,
634 ' DEAD ON ARRIVAL' if not on_time else '',
636 my_share_hashes.add(share.hash)
638 my_doa_share_hashes.add(share.hash)
642 tracker.verified.add(share)
646 if pow_hash <= header['bits'].target or p2pool.DEBUG:
647 for peer in p2p_node.peers.itervalues():
648 peer.sendShares([share])
649 shared_share_hashes.add(share.hash)
651 log.err(None, 'Error forwarding block solution:')
653 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
655 if pow_hash > target:
656 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
657 print ' Hash: %56x' % (pow_hash,)
658 print ' Target: %56x' % (target,)
659 elif header_hash in received_header_hashes:
660 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
662 received_header_hashes.add(header_hash)
664 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
665 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
666 while len(self.recent_shares_ts_work) > 50:
667 self.recent_shares_ts_work.pop(0)
668 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
672 return ba, got_response
674 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
676 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
677 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
679 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
681 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
688 @defer.inlineCallbacks
691 flag = factory.new_block.get_deferred()
693 yield set_real_work1()
696 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
701 print 'Started successfully!'
702 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
703 if args.donation_percentage > 0.51:
704 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
705 elif args.donation_percentage < 0.49:
706 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
708 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
709 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
713 if hasattr(signal, 'SIGALRM'):
714 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
715 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
717 signal.siginterrupt(signal.SIGALRM, False)
718 task.LoopingCall(signal.alarm, 30).start(1)
720 if args.irc_announce:
721 from twisted.words.protocols import irc
722 class IRCClient(irc.IRCClient):
723 nickname = 'p2pool%02i' % (random.randrange(100),)
724 channel = net.ANNOUNCE_CHANNEL
725 def lineReceived(self, line):
728 irc.IRCClient.lineReceived(self, line)
730 irc.IRCClient.signedOn(self)
731 self.factory.resetDelay()
732 self.join(self.channel)
733 @defer.inlineCallbacks
734 def new_share(share):
735 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
736 yield deferral.sleep(random.expovariate(1/60))
737 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
738 if message not in self.recent_messages:
739 self.say(self.channel, message)
740 self._remember_message(message)
741 self.watch_id = tracker.verified.added.watch(new_share)
742 self.recent_messages = []
743 def _remember_message(self, message):
744 self.recent_messages.append(message)
745 while len(self.recent_messages) > 100:
746 self.recent_messages.pop(0)
747 def privmsg(self, user, channel, message):
748 if channel == self.channel:
749 self._remember_message(message)
750 def connectionLost(self, reason):
751 tracker.verified.added.unwatch(self.watch_id)
752 print 'IRC connection lost:', reason.getErrorMessage()
753 class IRCClientFactory(protocol.ReconnectingClientFactory):
755 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
757 @defer.inlineCallbacks
762 yield deferral.sleep(3)
764 if time.time() > current_work2.value['last_update'] + 60:
765 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
767 height = tracker.get_height(current_work.value['best_share_hash'])
768 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
770 len(tracker.verified.shares),
773 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
774 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
776 datums, dt = local_rate_monitor.get_datums_in_last()
777 my_att_s = sum(datum['work']/dt for datum in datums)
778 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
779 math.format(int(my_att_s)),
781 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
782 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
786 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
787 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
788 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
790 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
791 shares, stale_orphan_shares, stale_doa_shares,
792 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
793 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
794 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
796 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
797 math.format(int(real_att_s)),
799 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
802 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
803 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
804 if majority_desired_version not in [0, 1]:
805 print >>sys.stderr, '#'*40
806 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
807 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
808 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
809 print >>sys.stderr, '#'*40
811 if this_str != last_str or time.time() > last_time + 15:
814 last_time = time.time()
820 log.err(None, 'Fatal error:')
823 class FixedArgumentParser(argparse.ArgumentParser):
824 def _read_args_from_files(self, arg_strings):
825 # expand arguments referencing files
827 for arg_string in arg_strings:
829 # for regular arguments, just add them back into the list
830 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
831 new_arg_strings.append(arg_string)
833 # replace arguments referencing files with the file content
836 args_file = open(arg_string[1:])
839 for arg_line in args_file.read().splitlines():
840 for arg in self.convert_arg_line_to_args(arg_line):
841 arg_strings.append(arg)
842 arg_strings = self._read_args_from_files(arg_strings)
843 new_arg_strings.extend(arg_strings)
847 err = sys.exc_info()[1]
850 # return the modified argument list
851 return new_arg_strings
853 def convert_arg_line_to_args(self, arg_line):
854 return [arg for arg in arg_line.split() if arg.strip()]
857 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
859 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
860 parser.add_argument('--version', action='version', version=p2pool.__version__)
861 parser.add_argument('--net',
862 help='use specified network (default: bitcoin)',
863 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
864 parser.add_argument('--testnet',
865 help='''use the network's testnet''',
866 action='store_const', const=True, default=False, dest='testnet')
867 parser.add_argument('--debug',
868 help='enable debugging mode',
869 action='store_const', const=True, default=False, dest='debug')
870 parser.add_argument('-a', '--address',
871 help='generate payouts to this address (default: <address requested from bitcoind>)',
872 type=str, action='store', default=None, dest='address')
873 parser.add_argument('--datadir',
874 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
875 type=str, action='store', default=None, dest='datadir')
876 parser.add_argument('--logfile',
877 help='''log to this file (default: data/<NET>/log)''',
878 type=str, action='store', default=None, dest='logfile')
879 parser.add_argument('--merged',
880 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
881 type=str, action='append', default=[], dest='merged_urls')
882 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
883 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
884 type=float, action='store', default=0.5, dest='donation_percentage')
885 parser.add_argument('--irc-announce',
886 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
887 action='store_true', default=False, dest='irc_announce')
889 p2pool_group = parser.add_argument_group('p2pool interface')
890 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
891 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
892 type=int, action='store', default=None, dest='p2pool_port')
893 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
894 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
895 type=str, action='append', default=[], dest='p2pool_nodes')
896 parser.add_argument('--disable-upnp',
897 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
898 action='store_false', default=True, dest='upnp')
899 p2pool_group.add_argument('--max-conns', metavar='CONNS',
900 help='maximum incoming connections (default: 40)',
901 type=int, action='store', default=40, dest='p2pool_conns')
903 worker_group = parser.add_argument_group('worker interface')
904 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
905 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
906 type=str, action='store', default=None, dest='worker_endpoint')
907 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
908 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
909 type=float, action='store', default=0, dest='worker_fee')
911 bitcoind_group = parser.add_argument_group('bitcoind interface')
912 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
913 help='connect to this address (default: 127.0.0.1)',
914 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
915 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
916 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
917 type=int, action='store', default=None, dest='bitcoind_rpc_port')
918 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
919 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
920 type=int, action='store', default=None, dest='bitcoind_p2p_port')
922 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
923 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
924 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
926 args = parser.parse_args()
931 net_name = args.net_name + ('_testnet' if args.testnet else '')
932 net = networks.nets[net_name]
934 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
935 if not os.path.exists(datadir_path):
936 os.makedirs(datadir_path)
938 if len(args.bitcoind_rpc_userpass) > 2:
939 parser.error('a maximum of two arguments are allowed')
940 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
942 if args.bitcoind_rpc_password is None:
943 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
944 parser.error('This network has no configuration file function. Manually enter your RPC password.')
945 conf_path = net.PARENT.CONF_FILE_FUNC()
946 if not os.path.exists(conf_path):
947 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
948 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
951 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
952 with open(conf_path, 'rb') as f:
953 cp = ConfigParser.RawConfigParser()
954 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
955 for conf_name, var_name, var_type in [
956 ('rpcuser', 'bitcoind_rpc_username', str),
957 ('rpcpassword', 'bitcoind_rpc_password', str),
958 ('rpcport', 'bitcoind_rpc_port', int),
959 ('port', 'bitcoind_p2p_port', int),
961 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
962 setattr(args, var_name, var_type(cp.get('x', conf_name)))
963 if args.bitcoind_rpc_password is None:
964 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
966 if args.bitcoind_rpc_username is None:
967 args.bitcoind_rpc_username = ''
969 if args.bitcoind_rpc_port is None:
970 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
972 if args.bitcoind_p2p_port is None:
973 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
975 if args.p2pool_port is None:
976 args.p2pool_port = net.P2P_PORT
978 if args.worker_endpoint is None:
979 worker_endpoint = '', net.WORKER_PORT
980 elif ':' not in args.worker_endpoint:
981 worker_endpoint = '', int(args.worker_endpoint)
983 addr, port = args.worker_endpoint.rsplit(':', 1)
984 worker_endpoint = addr, int(port)
986 if args.address is not None:
988 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
990 parser.error('error parsing address: ' + repr(e))
992 args.pubkey_hash = None
994 def separate_url(url):
995 s = urlparse.urlsplit(url)
996 if '@' not in s.netloc:
997 parser.error('merged url netloc must contain an "@"')
998 userpass, new_netloc = s.netloc.rsplit('@', 1)
999 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1000 merged_urls = map(separate_url, args.merged_urls)
1002 if args.logfile is None:
1003 args.logfile = os.path.join(datadir_path, 'log')
1005 logfile = logging.LogFile(args.logfile)
1006 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1007 sys.stdout = logging.AbortPipe(pipe)
1008 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1009 if hasattr(signal, "SIGUSR1"):
1010 def sigusr1(signum, frame):
1011 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1013 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1014 signal.signal(signal.SIGUSR1, sigusr1)
1015 task.LoopingCall(logfile.reopen).start(5)
1017 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)