1 from __future__ import division
17 from twisted.internet import iocpreactor
22 print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
38 work = yield bitcoind.rpc_getmemorypool()
39 except jsonrpc.Error, e:
40 if e.code == -32601: # Method not found
41 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42 raise deferral.RetrySilentlyException()
44 packed_transactions = [x.decode('hex') for x in work['transactions']]
45 defer.returnValue(dict(
46 version=work['version'],
47 previous_block_hash=int(work['previousblockhash'], 16),
48 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50 subsidy=work['coinbasevalue'],
52 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
59 print 'p2pool (version %s)' % (p2pool.__version__,)
62 # connect to bitcoind over JSON-RPC and do initial getmemorypool
63 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66 @deferral.retry('Error while checking Bitcoin connection:', 1)
67 @defer.inlineCallbacks
69 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71 raise deferral.RetrySilentlyException()
72 temp_work = yield getwork(bitcoind)
73 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75 raise deferral.RetrySilentlyException()
76 defer.returnValue(temp_work)
77 temp_work = yield check()
79 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
82 # connect to bitcoind over bitcoin-p2p
83 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84 factory = bitcoin_p2p.ClientFactory(net.PARENT)
85 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86 yield factory.getProtocol() # waits until handshake is successful
90 print 'Determining payout address...'
91 if args.pubkey_hash is None:
92 address_path = os.path.join(datadir_path, 'cached_payout_address')
94 if os.path.exists(address_path):
95 with open(address_path, 'rb') as f:
96 address = f.read().strip('\r\n')
97 print ' Loaded cached address: %s...' % (address,)
101 if address is not None:
102 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103 if not res['isvalid'] or not res['ismine']:
104 print ' Cached address is either invalid or not controlled by local bitcoind!'
108 print ' Getting payout address from bitcoind...'
109 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
111 with open(address_path, 'wb') as f:
114 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
116 my_pubkey_hash = args.pubkey_hash
117 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
120 my_share_hashes = set()
121 my_doa_share_hashes = set()
123 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124 shared_share_hashes = set()
125 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126 known_verified = set()
127 print "Loading shares..."
128 for i, (mode, contents) in enumerate(ss.get_shares()):
130 if contents.hash in tracker.shares:
132 shared_share_hashes.add(contents.hash)
133 contents.time_seen = 0
134 tracker.add(contents)
135 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136 print " %i" % (len(tracker.shares),)
137 elif mode == 'verified_hash':
138 known_verified.add(contents)
140 raise AssertionError()
141 print " ...inserting %i verified shares..." % (len(known_verified),)
142 for h in known_verified:
143 if h not in tracker.shares:
144 ss.forget_verified_share(h)
146 tracker.verified.add(tracker.shares[h])
147 print " ...done loading %i shares!" % (len(tracker.shares),)
149 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
153 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155 pre_current_work = variable.Variable(None)
156 pre_merged_work = variable.Variable({})
157 # information affecting work that should trigger a long-polling update
158 current_work = variable.Variable(None)
159 # information affecting work that should not trigger a long-polling update
160 current_work2 = variable.Variable(None)
162 requested = expiring_dict.ExpiringDict(300)
164 print 'Initializing work...'
165 @defer.inlineCallbacks
166 def set_real_work1():
167 work = yield getwork(bitcoind)
168 current_work2.set(dict(
170 transactions=work['transactions'],
171 merkle_link=work['merkle_link'],
172 subsidy=work['subsidy'],
173 clock_offset=time.time() - work['time'],
174 last_update=time.time(),
175 )) # second set first because everything hooks on the first
176 pre_current_work.set(dict(
177 version=work['version'],
178 previous_block=work['previous_block_hash'],
180 coinbaseflags=work['coinbaseflags'],
182 yield set_real_work1()
184 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
186 def set_real_work2():
187 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
189 t = dict(pre_current_work.value)
190 t['best_share_hash'] = best
191 t['mm_chains'] = pre_merged_work.value
195 for peer2, share_hash in desired:
196 if share_hash not in tracker.tails: # was received in the time tracker.think was running
198 last_request_time, count = requested.get(share_hash, (None, 0))
199 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
201 potential_peers = set()
202 for head in tracker.tails[share_hash]:
203 potential_peers.update(peer_heads.get(head, set()))
204 potential_peers = [peer for peer in potential_peers if peer.connected2]
205 if count == 0 and peer2 is not None and peer2.connected2:
208 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
212 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
216 stops=list(set(tracker.heads) | set(
217 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
220 requested[share_hash] = t, count + 1
221 pre_current_work.changed.watch(lambda _: set_real_work2())
222 pre_merged_work.changed.watch(lambda _: set_real_work2())
228 @defer.inlineCallbacks
229 def set_merged_work(merged_url, merged_userpass):
230 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
232 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234 hash=int(auxblock['hash'], 16),
235 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236 merged_proxy=merged_proxy,
238 yield deferral.sleep(1)
239 for merged_url, merged_userpass in merged_urls:
240 set_merged_work(merged_url, merged_userpass)
242 @pre_merged_work.changed.watch
243 def _(new_merged_work):
244 print 'Got new merged mining work!'
246 # setup p2p logic and join p2pool network
248 class Node(p2p.Node):
249 def handle_shares(self, shares, peer):
251 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255 if share.hash in tracker.shares:
256 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265 if shares and peer is not None:
266 peer_heads.setdefault(shares[0].hash, set()).add(peer)
272 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
274 def handle_share_hashes(self, hashes, peer):
277 for share_hash in hashes:
278 if share_hash in tracker.shares:
280 last_request_time, count = requested.get(share_hash, (None, 0))
281 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
283 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284 get_hashes.append(share_hash)
285 requested[share_hash] = t, count + 1
287 if hashes and peer is not None:
288 peer_heads.setdefault(hashes[0], set()).add(peer)
290 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
292 def handle_get_shares(self, hashes, parents, stops, peer):
293 parents = min(parents, 1000//len(hashes))
296 for share_hash in hashes:
297 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298 if share.hash in stops:
301 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305 def submit_block_p2p(block):
306 if factory.conn.value is None:
307 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308 raise deferral.RetrySilentlyException()
309 factory.conn.value.send_block(block=block)
311 @deferral.retry('Error submitting block: (will retry)', 10, 10)
312 @defer.inlineCallbacks
313 def submit_block_rpc(block, ignore_failure):
314 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
319 def submit_block(block, ignore_failure):
320 submit_block_p2p(block)
321 submit_block_rpc(block, ignore_failure)
323 @tracker.verified.added.watch
325 if share.pow_hash <= share.header['bits'].target:
326 submit_block(share.as_block(tracker), ignore_failure=True)
328 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
330 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
331 broadcast_share(share.hash)
333 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335 @defer.inlineCallbacks
338 ip, port = x.split(':')
339 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
344 if os.path.exists(os.path.join(datadir_path, 'addrs')):
346 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349 print >>sys.stderr, 'error parsing addrs'
350 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
352 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
354 print >>sys.stderr, "error reading addrs.txt"
355 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
358 if addr not in addrs:
359 addrs[addr] = (0, time.time(), time.time())
363 connect_addrs = set()
364 for addr_df in map(parse, args.p2pool_nodes):
366 connect_addrs.add((yield addr_df))
371 best_share_hash_func=lambda: current_work.value['best_share_hash'],
372 port=args.p2pool_port,
375 connect_addrs=connect_addrs,
376 max_incoming_conns=args.p2pool_conns,
381 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382 f.write(json.dumps(p2p_node.addr_store.items()))
383 task.LoopingCall(save_addrs).start(60)
385 def broadcast_share(share_hash):
387 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388 if share.hash in shared_share_hashes:
390 shared_share_hashes.add(share.hash)
393 for peer in p2p_node.peers.itervalues():
394 peer.sendShares([share for share in shares if share.peer is not peer])
396 # send share when the chain changes to their chain
397 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
400 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
402 if share.hash in tracker.verified.shares:
403 ss.add_verified_hash(share.hash)
404 task.LoopingCall(save_shares).start(60)
410 @defer.inlineCallbacks
414 is_lan, lan_ip = yield ipdiscover.get_local_ip()
416 pm = yield portmapper.get_port_mapper()
417 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418 except defer.TimeoutError:
422 log.err(None, 'UPnP error:')
423 yield deferral.sleep(random.expovariate(1/120))
426 # start listening for workers with a JSON-RPC server
428 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
432 removed_unstales_var = variable.Variable((0, 0, 0))
433 removed_doa_unstales_var = variable.Variable(0)
434 @tracker.verified.removed.watch
436 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
438 removed_unstales_var.set((
439 removed_unstales_var.value[0] + 1,
440 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
441 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
443 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444 removed_doa_unstales.set(removed_doa_unstales.value + 1)
446 def get_stale_counts():
447 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448 my_shares = len(my_share_hashes)
449 my_doa_shares = len(my_doa_share_hashes)
450 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
456 my_shares_not_in_chain = my_shares - my_shares_in_chain
457 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
459 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
462 pseudoshare_received = variable.Event()
463 share_received = variable.Event()
464 local_rate_monitor = math.RateMonitor(10*60)
466 class WorkerBridge(worker_interface.WorkerBridge):
468 worker_interface.WorkerBridge.__init__(self)
469 self.new_work_event = current_work.changed
470 self.recent_shares_ts_work = []
472 def get_user_details(self, request):
473 user = request.getUser() if request.getUser() is not None else ''
475 desired_pseudoshare_target = None
477 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
479 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
483 desired_share_target = 2**256 - 1
485 user, min_diff_str = user.rsplit('/', 1)
487 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
491 if random.uniform(0, 100) < args.worker_fee:
492 pubkey_hash = my_pubkey_hash
495 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
497 pubkey_hash = my_pubkey_hash
499 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
501 def preprocess_request(self, request):
502 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503 return pubkey_hash, desired_share_target, desired_pseudoshare_target
505 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506 if len(p2p_node.peers) == 0 and net.PERSIST:
507 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508 if current_work.value['best_share_hash'] is None and net.PERSIST:
509 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510 if time.time() > current_work2.value['last_update'] + 60:
511 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
513 if current_work.value['mm_chains']:
514 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
521 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
527 share_info, generate_tx = p2pool_data.Share.generate_transaction(
530 previous_share_hash=current_work.value['best_share_hash'],
531 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532 nonce=random.randrange(2**32),
533 pubkey_hash=pubkey_hash,
534 subsidy=current_work2.value['subsidy'],
535 donation=math.perfect_round(65535*args.donation_percentage/100),
536 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537 253 if orphans > orphans_recorded_in_chain else
538 254 if doas > doas_recorded_in_chain else
540 )(*get_stale_counts()),
543 block_target=current_work.value['bits'].target,
544 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545 desired_target=desired_share_target,
546 ref_merkle_link=dict(branch=[], index=0),
550 target = net.PARENT.SANE_MAX_TARGET
551 if desired_pseudoshare_target is None:
552 if len(self.recent_shares_ts_work) == 50:
553 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
554 target = min(target, 2**256//hash_rate)
556 target = min(target, desired_pseudoshare_target)
557 target = max(target, share_info['bits'].target)
558 for aux_work in current_work.value['mm_chains'].itervalues():
559 target = max(target, aux_work['target'])
561 transactions = [generate_tx] + list(current_work2.value['transactions'])
562 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
563 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
565 getwork_time = time.time()
566 merkle_link = current_work2.value['merkle_link']
568 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
569 bitcoin_data.target_to_difficulty(target),
570 bitcoin_data.target_to_difficulty(share_info['bits'].target),
571 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
572 len(current_work2.value['transactions']),
575 ba = bitcoin_getwork.BlockAttempt(
576 version=current_work.value['version'],
577 previous_block=current_work.value['previous_block'],
578 merkle_root=merkle_root,
579 timestamp=current_work2.value['time'],
580 bits=current_work.value['bits'],
584 received_header_hashes = set()
586 def got_response(header, request):
587 user, _, _, _ = self.get_user_details(request)
588 assert header['merkle_root'] == merkle_root
590 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
591 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
592 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
595 if pow_hash <= header['bits'].target or p2pool.DEBUG:
596 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
597 if pow_hash <= header['bits'].target:
599 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
602 log.err(None, 'Error while processing potential block:')
604 for aux_work, index, hashes in mm_later:
606 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
607 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
608 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
609 bitcoin_data.aux_pow_type.pack(dict(
612 block_hash=header_hash,
613 merkle_link=merkle_link,
615 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
616 parent_block_header=header,
621 if result != (pow_hash <= aux_work['target']):
622 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
624 print 'Merged block submittal result: %s' % (result,)
627 log.err(err, 'Error submitting merged block:')
629 log.err(None, 'Error while processing merged mining POW:')
631 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
632 min_header = dict(header);del min_header['merkle_root']
633 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
634 share = p2pool_data.Share(net, None, dict(
635 min_header=min_header, share_info=share_info, hash_link=hash_link,
636 ref_merkle_link=dict(branch=[], index=0),
637 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
639 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
641 p2pool_data.format_hash(share.hash),
642 p2pool_data.format_hash(share.previous_hash),
643 time.time() - getwork_time,
644 ' DEAD ON ARRIVAL' if not on_time else '',
646 my_share_hashes.add(share.hash)
648 my_doa_share_hashes.add(share.hash)
652 tracker.verified.add(share)
656 if pow_hash <= header['bits'].target or p2pool.DEBUG:
657 for peer in p2p_node.peers.itervalues():
658 peer.sendShares([share])
659 shared_share_hashes.add(share.hash)
661 log.err(None, 'Error forwarding block solution:')
663 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
665 if pow_hash > target:
666 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
667 print ' Hash: %56x' % (pow_hash,)
668 print ' Target: %56x' % (target,)
669 elif header_hash in received_header_hashes:
670 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
672 received_header_hashes.add(header_hash)
674 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
675 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
676 while len(self.recent_shares_ts_work) > 50:
677 self.recent_shares_ts_work.pop(0)
678 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
682 return ba, got_response
684 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
686 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
687 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
689 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
691 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
698 @defer.inlineCallbacks
701 flag = factory.new_block.get_deferred()
703 yield set_real_work1()
706 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
711 print 'Started successfully!'
712 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
713 if args.donation_percentage > 0.51:
714 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
715 elif args.donation_percentage < 0.49:
716 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
718 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
719 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
723 if hasattr(signal, 'SIGALRM'):
724 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
725 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
727 signal.siginterrupt(signal.SIGALRM, False)
728 task.LoopingCall(signal.alarm, 30).start(1)
730 if args.irc_announce:
731 from twisted.words.protocols import irc
732 class IRCClient(irc.IRCClient):
733 nickname = 'p2pool%02i' % (random.randrange(100),)
734 channel = net.ANNOUNCE_CHANNEL
735 def lineReceived(self, line):
738 irc.IRCClient.lineReceived(self, line)
740 irc.IRCClient.signedOn(self)
741 self.factory.resetDelay()
742 self.join(self.channel)
743 @defer.inlineCallbacks
744 def new_share(share):
745 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
746 yield deferral.sleep(random.expovariate(1/60))
747 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
748 if message not in self.recent_messages:
749 self.say(self.channel, message)
750 self._remember_message(message)
751 self.watch_id = tracker.verified.added.watch(new_share)
752 self.recent_messages = []
753 def _remember_message(self, message):
754 self.recent_messages.append(message)
755 while len(self.recent_messages) > 100:
756 self.recent_messages.pop(0)
757 def privmsg(self, user, channel, message):
758 if channel == self.channel:
759 self._remember_message(message)
760 def connectionLost(self, reason):
761 tracker.verified.added.unwatch(self.watch_id)
762 print 'IRC connection lost:', reason.getErrorMessage()
763 class IRCClientFactory(protocol.ReconnectingClientFactory):
765 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
767 @defer.inlineCallbacks
772 yield deferral.sleep(3)
774 if time.time() > current_work2.value['last_update'] + 60:
775 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
777 height = tracker.get_height(current_work.value['best_share_hash'])
778 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
780 len(tracker.verified.shares),
783 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
784 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
786 datums, dt = local_rate_monitor.get_datums_in_last()
787 my_att_s = sum(datum['work']/dt for datum in datums)
788 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
789 math.format(int(my_att_s)),
791 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
792 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
796 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
797 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
798 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
800 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
801 shares, stale_orphan_shares, stale_doa_shares,
802 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
803 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
804 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
806 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
807 math.format(int(real_att_s)),
809 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
812 desired_version_counts = p2pool_data.get_desired_version_counts(tracker, current_work.value['best_share_hash'], min(720, height))
813 majority_desired_version = max(desired_version_counts, key=lambda k: desired_version_counts[k])
814 if majority_desired_version not in [0, 1]:
815 print >>sys.stderr, '#'*40
816 print >>sys.stderr, '>>> WARNING: A MAJORITY OF SHARES CONTAIN A VOTE FOR AN UNSUPPORTED SHARE IMPLEMENTATION! (v%i with %i%% support)' % (
817 majority_desired_version, 100*desired_version_counts[majority_desired_version]/sum(desired_version_counts.itervalues()))
818 print >>sys.stderr, '>>> An upgrade is likely necessary. Check http://p2pool.forre.st/ for more information.'
819 print >>sys.stderr, '#'*40
821 if this_str != last_str or time.time() > last_time + 15:
824 last_time = time.time()
830 log.err(None, 'Fatal error:')
833 class FixedArgumentParser(argparse.ArgumentParser):
834 def _read_args_from_files(self, arg_strings):
835 # expand arguments referencing files
837 for arg_string in arg_strings:
839 # for regular arguments, just add them back into the list
840 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
841 new_arg_strings.append(arg_string)
843 # replace arguments referencing files with the file content
846 args_file = open(arg_string[1:])
849 for arg_line in args_file.read().splitlines():
850 for arg in self.convert_arg_line_to_args(arg_line):
851 arg_strings.append(arg)
852 arg_strings = self._read_args_from_files(arg_strings)
853 new_arg_strings.extend(arg_strings)
857 err = sys.exc_info()[1]
860 # return the modified argument list
861 return new_arg_strings
863 def convert_arg_line_to_args(self, arg_line):
864 return [arg for arg in arg_line.split() if arg.strip()]
867 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
869 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
870 parser.add_argument('--version', action='version', version=p2pool.__version__)
871 parser.add_argument('--net',
872 help='use specified network (default: bitcoin)',
873 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
874 parser.add_argument('--testnet',
875 help='''use the network's testnet''',
876 action='store_const', const=True, default=False, dest='testnet')
877 parser.add_argument('--debug',
878 help='enable debugging mode',
879 action='store_const', const=True, default=False, dest='debug')
880 parser.add_argument('-a', '--address',
881 help='generate payouts to this address (default: <address requested from bitcoind>)',
882 type=str, action='store', default=None, dest='address')
883 parser.add_argument('--datadir',
884 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
885 type=str, action='store', default=None, dest='datadir')
886 parser.add_argument('--logfile',
887 help='''log to this file (default: data/<NET>/log)''',
888 type=str, action='store', default=None, dest='logfile')
889 parser.add_argument('--merged',
890 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
891 type=str, action='append', default=[], dest='merged_urls')
892 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
893 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
894 type=float, action='store', default=0.5, dest='donation_percentage')
895 parser.add_argument('--irc-announce',
896 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
897 action='store_true', default=False, dest='irc_announce')
899 p2pool_group = parser.add_argument_group('p2pool interface')
900 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
901 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
902 type=int, action='store', default=None, dest='p2pool_port')
903 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
904 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
905 type=str, action='append', default=[], dest='p2pool_nodes')
906 parser.add_argument('--disable-upnp',
907 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
908 action='store_false', default=True, dest='upnp')
909 p2pool_group.add_argument('--max-conns', metavar='CONNS',
910 help='maximum incoming connections (default: 40)',
911 type=int, action='store', default=40, dest='p2pool_conns')
913 worker_group = parser.add_argument_group('worker interface')
914 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
915 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
916 type=str, action='store', default=None, dest='worker_endpoint')
917 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
918 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
919 type=float, action='store', default=0, dest='worker_fee')
921 bitcoind_group = parser.add_argument_group('bitcoind interface')
922 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
923 help='connect to this address (default: 127.0.0.1)',
924 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
925 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
926 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
927 type=int, action='store', default=None, dest='bitcoind_rpc_port')
928 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
929 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
930 type=int, action='store', default=None, dest='bitcoind_p2p_port')
932 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
933 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
934 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
936 args = parser.parse_args()
941 net_name = args.net_name + ('_testnet' if args.testnet else '')
942 net = networks.nets[net_name]
944 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
945 if not os.path.exists(datadir_path):
946 os.makedirs(datadir_path)
948 if len(args.bitcoind_rpc_userpass) > 2:
949 parser.error('a maximum of two arguments are allowed')
950 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
952 if args.bitcoind_rpc_password is None:
953 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
954 parser.error('This network has no configuration file function. Manually enter your RPC password.')
955 conf_path = net.PARENT.CONF_FILE_FUNC()
956 if not os.path.exists(conf_path):
957 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
958 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
961 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
962 with open(conf_path, 'rb') as f:
963 cp = ConfigParser.RawConfigParser()
964 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
965 for conf_name, var_name, var_type in [
966 ('rpcuser', 'bitcoind_rpc_username', str),
967 ('rpcpassword', 'bitcoind_rpc_password', str),
968 ('rpcport', 'bitcoind_rpc_port', int),
969 ('port', 'bitcoind_p2p_port', int),
971 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
972 setattr(args, var_name, var_type(cp.get('x', conf_name)))
973 if args.bitcoind_rpc_password is None:
974 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
976 if args.bitcoind_rpc_username is None:
977 args.bitcoind_rpc_username = ''
979 if args.bitcoind_rpc_port is None:
980 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
982 if args.bitcoind_p2p_port is None:
983 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
985 if args.p2pool_port is None:
986 args.p2pool_port = net.P2P_PORT
988 if args.worker_endpoint is None:
989 worker_endpoint = '', net.WORKER_PORT
990 elif ':' not in args.worker_endpoint:
991 worker_endpoint = '', int(args.worker_endpoint)
993 addr, port = args.worker_endpoint.rsplit(':', 1)
994 worker_endpoint = addr, int(port)
996 if args.address is not None:
998 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1000 parser.error('error parsing address: ' + repr(e))
1002 args.pubkey_hash = None
1004 def separate_url(url):
1005 s = urlparse.urlsplit(url)
1006 if '@' not in s.netloc:
1007 parser.error('merged url netloc must contain an "@"')
1008 userpass, new_netloc = s.netloc.rsplit('@', 1)
1009 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1010 merged_urls = map(separate_url, args.merged_urls)
1012 if args.logfile is None:
1013 args.logfile = os.path.join(datadir_path, 'log')
1015 logfile = logging.LogFile(args.logfile)
1016 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1017 sys.stdout = logging.AbortPipe(pipe)
1018 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1019 if hasattr(signal, "SIGUSR1"):
1020 def sigusr1(signum, frame):
1021 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1023 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1024 signal.signal(signal.SIGUSR1, sigusr1)
1025 task.LoopingCall(logfile.reopen).start(5)
1027 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)