1 from __future__ import division
17 from twisted.internet import iocpreactor
22 print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
38 work = yield bitcoind.rpc_getmemorypool()
39 except jsonrpc.Error, e:
40 if e.code == -32601: # Method not found
41 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42 raise deferral.RetrySilentlyException()
44 packed_transactions = [x.decode('hex') for x in work['transactions']]
45 defer.returnValue(dict(
46 version=work['version'],
47 previous_block_hash=int(work['previousblockhash'], 16),
48 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50 subsidy=work['coinbasevalue'],
52 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
59 print 'p2pool (version %s)' % (p2pool.__version__,)
62 # connect to bitcoind over JSON-RPC and do initial getmemorypool
63 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66 @deferral.retry('Error while checking Bitcoin connection:', 1)
67 @defer.inlineCallbacks
69 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71 raise deferral.RetrySilentlyException()
72 temp_work = yield getwork(bitcoind)
73 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75 raise deferral.RetrySilentlyException()
76 defer.returnValue(temp_work)
77 temp_work = yield check()
79 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
82 # connect to bitcoind over bitcoin-p2p
83 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84 factory = bitcoin_p2p.ClientFactory(net.PARENT)
85 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86 yield factory.getProtocol() # waits until handshake is successful
90 print 'Determining payout address...'
91 if args.pubkey_hash is None:
92 address_path = os.path.join(datadir_path, 'cached_payout_address')
94 if os.path.exists(address_path):
95 with open(address_path, 'rb') as f:
96 address = f.read().strip('\r\n')
97 print ' Loaded cached address: %s...' % (address,)
101 if address is not None:
102 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103 if not res['isvalid'] or not res['ismine']:
104 print ' Cached address is either invalid or not controlled by local bitcoind!'
108 print ' Getting payout address from bitcoind...'
109 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
111 with open(address_path, 'wb') as f:
114 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
116 my_pubkey_hash = args.pubkey_hash
117 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
120 my_share_hashes = set()
121 my_doa_share_hashes = set()
123 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124 shared_share_hashes = set()
125 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126 known_verified = set()
127 print "Loading shares..."
128 for i, (mode, contents) in enumerate(ss.get_shares()):
130 if contents.hash in tracker.shares:
132 shared_share_hashes.add(contents.hash)
133 contents.time_seen = 0
134 tracker.add(contents)
135 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136 print " %i" % (len(tracker.shares),)
137 elif mode == 'verified_hash':
138 known_verified.add(contents)
140 raise AssertionError()
141 print " ...inserting %i verified shares..." % (len(known_verified),)
142 for h in known_verified:
143 if h not in tracker.shares:
144 ss.forget_verified_share(h)
146 tracker.verified.add(tracker.shares[h])
147 print " ...done loading %i shares!" % (len(tracker.shares),)
149 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
153 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155 pre_current_work = variable.Variable(None)
156 pre_merged_work = variable.Variable({})
157 # information affecting work that should trigger a long-polling update
158 current_work = variable.Variable(None)
159 # information affecting work that should not trigger a long-polling update
160 current_work2 = variable.Variable(None)
162 requested = expiring_dict.ExpiringDict(300)
164 print 'Initializing work...'
165 @defer.inlineCallbacks
166 def set_real_work1():
167 work = yield getwork(bitcoind)
168 current_work2.set(dict(
170 transactions=work['transactions'],
171 merkle_link=work['merkle_link'],
172 subsidy=work['subsidy'],
173 clock_offset=time.time() - work['time'],
174 last_update=time.time(),
175 )) # second set first because everything hooks on the first
176 pre_current_work.set(dict(
177 version=work['version'],
178 previous_block=work['previous_block_hash'],
180 coinbaseflags=work['coinbaseflags'],
182 yield set_real_work1()
184 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
186 def set_real_work2():
187 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
189 t = dict(pre_current_work.value)
190 t['best_share_hash'] = best
191 t['mm_chains'] = pre_merged_work.value
195 for peer2, share_hash in desired:
196 if share_hash not in tracker.tails: # was received in the time tracker.think was running
198 last_request_time, count = requested.get(share_hash, (None, 0))
199 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
201 potential_peers = set()
202 for head in tracker.tails[share_hash]:
203 potential_peers.update(peer_heads.get(head, set()))
204 potential_peers = [peer for peer in potential_peers if peer.connected2]
205 if count == 0 and peer2 is not None and peer2.connected2:
208 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
212 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
216 stops=list(set(tracker.heads) | set(
217 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
220 requested[share_hash] = t, count + 1
221 pre_current_work.changed.watch(lambda _: set_real_work2())
222 pre_merged_work.changed.watch(lambda _: set_real_work2())
228 @defer.inlineCallbacks
229 def set_merged_work(merged_url, merged_userpass):
230 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
232 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234 hash=int(auxblock['hash'], 16),
235 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236 merged_proxy=merged_proxy,
238 yield deferral.sleep(1)
239 for merged_url, merged_userpass in merged_urls:
240 set_merged_work(merged_url, merged_userpass)
242 @pre_merged_work.changed.watch
243 def _(new_merged_work):
244 print 'Got new merged mining work!'
246 # setup p2p logic and join p2pool network
248 class Node(p2p.Node):
249 def handle_shares(self, shares, peer):
251 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255 if share.hash in tracker.shares:
256 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265 if shares and peer is not None:
266 peer_heads.setdefault(shares[0].hash, set()).add(peer)
272 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
274 def handle_share_hashes(self, hashes, peer):
277 for share_hash in hashes:
278 if share_hash in tracker.shares:
280 last_request_time, count = requested.get(share_hash, (None, 0))
281 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
283 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284 get_hashes.append(share_hash)
285 requested[share_hash] = t, count + 1
287 if hashes and peer is not None:
288 peer_heads.setdefault(hashes[0], set()).add(peer)
290 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
292 def handle_get_shares(self, hashes, parents, stops, peer):
293 parents = min(parents, 1000//len(hashes))
296 for share_hash in hashes:
297 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298 if share.hash in stops:
301 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305 def submit_block_p2p(block):
306 if factory.conn.value is None:
307 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308 raise deferral.RetrySilentlyException()
309 factory.conn.value.send_block(block=block)
311 @deferral.retry('Error submitting block: (will retry)', 10, 10)
312 @defer.inlineCallbacks
313 def submit_block_rpc(block, ignore_failure):
314 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, expected_result)
319 def submit_block(block, ignore_failure):
320 submit_block_p2p(block)
321 submit_block_rpc(block, ignore_failure)
323 @tracker.verified.added.watch
325 if share.pow_hash <= share.header['bits'].target:
326 submit_block(share.as_block(tracker), ignore_failure=True)
328 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
330 if current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]:
331 broadcast_share(share.hash)
333 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335 @defer.inlineCallbacks
338 ip, port = x.split(':')
339 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
344 if os.path.exists(os.path.join(datadir_path, 'addrs')):
346 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349 print >>sys.stderr, 'error parsing addrs'
350 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
352 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
354 print >>sys.stderr, "error reading addrs.txt"
355 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
358 if addr not in addrs:
359 addrs[addr] = (0, time.time(), time.time())
363 connect_addrs = set()
364 for addr_df in map(parse, args.p2pool_nodes):
366 connect_addrs.add((yield addr_df))
371 best_share_hash_func=lambda: current_work.value['best_share_hash'],
372 port=args.p2pool_port,
375 connect_addrs=connect_addrs,
376 max_incoming_conns=args.p2pool_conns,
381 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382 f.write(json.dumps(p2p_node.addr_store.items()))
383 task.LoopingCall(save_addrs).start(60)
385 def broadcast_share(share_hash):
387 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388 if share.hash in shared_share_hashes:
390 shared_share_hashes.add(share.hash)
393 for peer in p2p_node.peers.itervalues():
394 peer.sendShares([share for share in shares if share.peer is not peer])
396 # send share when the chain changes to their chain
397 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
400 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
402 if share.hash in tracker.verified.shares:
403 ss.add_verified_hash(share.hash)
404 task.LoopingCall(save_shares).start(60)
410 @defer.inlineCallbacks
414 is_lan, lan_ip = yield ipdiscover.get_local_ip()
416 pm = yield portmapper.get_port_mapper()
417 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418 except defer.TimeoutError:
422 log.err(None, 'UPnP error:')
423 yield deferral.sleep(random.expovariate(1/120))
426 # start listening for workers with a JSON-RPC server
428 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
432 removed_unstales_var = variable.Variable((0, 0, 0))
433 removed_doa_unstales_var = variable.Variable(0)
434 @tracker.verified.removed.watch
436 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
438 removed_unstales_var.set((
439 removed_unstales_var.value[0] + 1,
440 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
441 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
443 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444 removed_doa_unstales.set(removed_doa_unstales.value + 1)
446 def get_stale_counts():
447 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448 my_shares = len(my_share_hashes)
449 my_doa_shares = len(my_doa_share_hashes)
450 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
456 my_shares_not_in_chain = my_shares - my_shares_in_chain
457 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
459 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
462 pseudoshare_received = variable.Event()
463 share_received = variable.Event()
464 local_rate_monitor = math.RateMonitor(10*60)
466 class WorkerBridge(worker_interface.WorkerBridge):
468 worker_interface.WorkerBridge.__init__(self)
469 self.new_work_event = current_work.changed
470 self.recent_shares_ts_work = []
472 def get_user_details(self, request):
473 user = request.getUser() if request.getUser() is not None else ''
475 desired_pseudoshare_target = None
477 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
479 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
483 desired_share_target = 2**256 - 1
485 user, min_diff_str = user.rsplit('/', 1)
487 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
491 if random.uniform(0, 100) < args.worker_fee:
492 pubkey_hash = my_pubkey_hash
495 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
497 pubkey_hash = my_pubkey_hash
499 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
501 def preprocess_request(self, request):
502 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503 return pubkey_hash, desired_share_target, desired_pseudoshare_target
505 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506 if len(p2p_node.peers) == 0 and net.PERSIST:
507 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508 if current_work.value['best_share_hash'] is None and net.PERSIST:
509 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510 if time.time() > current_work2.value['last_update'] + 60:
511 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
513 if current_work.value['mm_chains']:
514 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
521 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
527 share_info, generate_tx = p2pool_data.Share.generate_transaction(
530 previous_share_hash=current_work.value['best_share_hash'],
531 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
532 nonce=random.randrange(2**32),
533 pubkey_hash=pubkey_hash,
534 subsidy=current_work2.value['subsidy'],
535 donation=math.perfect_round(65535*args.donation_percentage/100),
536 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
537 253 if orphans > orphans_recorded_in_chain else
538 254 if doas > doas_recorded_in_chain else
540 )(*get_stale_counts()),
543 block_target=current_work.value['bits'].target,
544 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
545 desired_target=desired_share_target,
546 ref_merkle_link=dict(branch=[], index=0),
550 if desired_pseudoshare_target is None:
551 if len(self.recent_shares_ts_work) == 50:
552 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
553 target = min(2**256-1, int(4*2**256/hash_rate))
557 target = desired_pseudoshare_target
558 target = max(target, share_info['bits'].target)
559 for aux_work in current_work.value['mm_chains'].itervalues():
560 target = max(target, aux_work['target'])
561 target = min(target, net.PARENT.SANE_MAX_TARGET)
563 transactions = [generate_tx] + list(current_work2.value['transactions'])
564 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
565 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
567 getwork_time = time.time()
568 merkle_link = current_work2.value['merkle_link']
570 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
571 bitcoin_data.target_to_difficulty(target),
572 bitcoin_data.target_to_difficulty(share_info['bits'].target),
573 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
574 len(current_work2.value['transactions']),
577 ba = bitcoin_getwork.BlockAttempt(
578 version=current_work.value['version'],
579 previous_block=current_work.value['previous_block'],
580 merkle_root=merkle_root,
581 timestamp=current_work2.value['time'],
582 bits=current_work.value['bits'],
586 received_header_hashes = set()
588 def got_response(header, request):
589 user, _, _, _ = self.get_user_details(request)
590 assert header['merkle_root'] == merkle_root
592 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
593 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
594 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
597 if pow_hash <= header['bits'].target or p2pool.DEBUG:
598 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
599 if pow_hash <= header['bits'].target:
601 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
604 log.err(None, 'Error while processing potential block:')
606 for aux_work, index, hashes in mm_later:
608 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
609 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
610 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
611 bitcoin_data.aux_pow_type.pack(dict(
614 block_hash=header_hash,
615 merkle_link=merkle_link,
617 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
618 parent_block_header=header,
623 if result != (pow_hash <= aux_work['target']):
624 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
626 print 'Merged block submittal result: %s' % (result,)
629 log.err(err, 'Error submitting merged block:')
631 log.err(None, 'Error while processing merged mining POW:')
633 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
634 min_header = dict(header);del min_header['merkle_root']
635 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
636 share = p2pool_data.Share(net, None, dict(
637 min_header=min_header, share_info=share_info, hash_link=hash_link,
638 ref_merkle_link=dict(branch=[], index=0),
639 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
641 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
643 p2pool_data.format_hash(share.hash),
644 p2pool_data.format_hash(share.previous_hash),
645 time.time() - getwork_time,
646 ' DEAD ON ARRIVAL' if not on_time else '',
648 my_share_hashes.add(share.hash)
650 my_doa_share_hashes.add(share.hash)
654 tracker.verified.add(share)
658 if pow_hash <= header['bits'].target or p2pool.DEBUG:
659 for peer in p2p_node.peers.itervalues():
660 peer.sendShares([share])
661 shared_share_hashes.add(share.hash)
663 log.err(None, 'Error forwarding block solution:')
665 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
667 if pow_hash > target:
668 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
669 print ' Hash: %56x' % (pow_hash,)
670 print ' Target: %56x' % (target,)
671 elif header_hash in received_header_hashes:
672 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
674 received_header_hashes.add(header_hash)
676 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
677 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
678 while len(self.recent_shares_ts_work) > 50:
679 self.recent_shares_ts_work.pop(0)
680 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
684 return ba, got_response
686 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
688 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
689 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
691 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
693 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
700 @defer.inlineCallbacks
703 flag = factory.new_block.get_deferred()
705 yield set_real_work1()
708 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
713 print 'Started successfully!'
714 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
715 if args.donation_percentage > 0.51:
716 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
717 elif args.donation_percentage < 0.49:
718 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
720 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
721 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
725 if hasattr(signal, 'SIGALRM'):
726 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
727 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
729 signal.siginterrupt(signal.SIGALRM, False)
730 task.LoopingCall(signal.alarm, 30).start(1)
732 if args.irc_announce:
733 from twisted.words.protocols import irc
734 class IRCClient(irc.IRCClient):
735 nickname = 'p2pool%02i' % (random.randrange(100),)
736 channel = net.ANNOUNCE_CHANNEL
737 def lineReceived(self, line):
740 irc.IRCClient.lineReceived(self, line)
742 irc.IRCClient.signedOn(self)
743 self.factory.resetDelay()
744 self.join(self.channel)
745 @defer.inlineCallbacks
746 def new_share(share):
747 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
748 yield deferral.sleep(random.expovariate(1/60))
749 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
750 if message not in self.recent_messages:
751 self.say(self.channel, message)
752 self._remember_message(message)
753 self.watch_id = tracker.verified.added.watch(new_share)
754 self.recent_messages = []
755 def _remember_message(self, message):
756 self.recent_messages.append(message)
757 while len(self.recent_messages) > 100:
758 self.recent_messages.pop(0)
759 def privmsg(self, user, channel, message):
760 if channel == self.channel:
761 self._remember_message(message)
762 def connectionLost(self, reason):
763 tracker.verified.added.unwatch(self.watch_id)
764 print 'IRC connection lost:', reason.getErrorMessage()
765 class IRCClientFactory(protocol.ReconnectingClientFactory):
767 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
769 @defer.inlineCallbacks
774 yield deferral.sleep(3)
776 if time.time() > current_work2.value['last_update'] + 60:
777 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
779 height = tracker.get_height(current_work.value['best_share_hash'])
780 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
782 len(tracker.verified.shares),
785 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
786 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
788 datums, dt = local_rate_monitor.get_datums_in_last()
789 my_att_s = sum(datum['work']/dt for datum in datums)
790 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
791 math.format(int(my_att_s)),
793 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
794 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
798 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
799 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
800 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
802 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
803 shares, stale_orphan_shares, stale_doa_shares,
804 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
805 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
806 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
808 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
809 math.format(int(real_att_s)),
811 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
814 for warning in p2pool_data.get_warnings(tracker, current_work, net):
815 print >>sys.stderr, '#'*40
816 print >>sys.stderr, '>>> Warning: ' + warning
817 print >>sys.stderr, '#'*40
819 if this_str != last_str or time.time() > last_time + 15:
822 last_time = time.time()
828 log.err(None, 'Fatal error:')
831 class FixedArgumentParser(argparse.ArgumentParser):
832 def _read_args_from_files(self, arg_strings):
833 # expand arguments referencing files
835 for arg_string in arg_strings:
837 # for regular arguments, just add them back into the list
838 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
839 new_arg_strings.append(arg_string)
841 # replace arguments referencing files with the file content
844 args_file = open(arg_string[1:])
847 for arg_line in args_file.read().splitlines():
848 for arg in self.convert_arg_line_to_args(arg_line):
849 arg_strings.append(arg)
850 arg_strings = self._read_args_from_files(arg_strings)
851 new_arg_strings.extend(arg_strings)
855 err = sys.exc_info()[1]
858 # return the modified argument list
859 return new_arg_strings
861 def convert_arg_line_to_args(self, arg_line):
862 return [arg for arg in arg_line.split() if arg.strip()]
865 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
867 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
868 parser.add_argument('--version', action='version', version=p2pool.__version__)
869 parser.add_argument('--net',
870 help='use specified network (default: bitcoin)',
871 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
872 parser.add_argument('--testnet',
873 help='''use the network's testnet''',
874 action='store_const', const=True, default=False, dest='testnet')
875 parser.add_argument('--debug',
876 help='enable debugging mode',
877 action='store_const', const=True, default=False, dest='debug')
878 parser.add_argument('-a', '--address',
879 help='generate payouts to this address (default: <address requested from bitcoind>)',
880 type=str, action='store', default=None, dest='address')
881 parser.add_argument('--datadir',
882 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
883 type=str, action='store', default=None, dest='datadir')
884 parser.add_argument('--logfile',
885 help='''log to this file (default: data/<NET>/log)''',
886 type=str, action='store', default=None, dest='logfile')
887 parser.add_argument('--merged',
888 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
889 type=str, action='append', default=[], dest='merged_urls')
890 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
891 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
892 type=float, action='store', default=0.5, dest='donation_percentage')
893 parser.add_argument('--irc-announce',
894 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
895 action='store_true', default=False, dest='irc_announce')
896 parser.add_argument('--no-bugreport',
897 help='disable submitting caught exceptions to the author',
898 action='store_true', default=False, dest='no_bugreport')
900 p2pool_group = parser.add_argument_group('p2pool interface')
901 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
902 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
903 type=int, action='store', default=None, dest='p2pool_port')
904 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
905 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
906 type=str, action='append', default=[], dest='p2pool_nodes')
907 parser.add_argument('--disable-upnp',
908 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
909 action='store_false', default=True, dest='upnp')
910 p2pool_group.add_argument('--max-conns', metavar='CONNS',
911 help='maximum incoming connections (default: 40)',
912 type=int, action='store', default=40, dest='p2pool_conns')
914 worker_group = parser.add_argument_group('worker interface')
915 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
916 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
917 type=str, action='store', default=None, dest='worker_endpoint')
918 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
919 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
920 type=float, action='store', default=0, dest='worker_fee')
922 bitcoind_group = parser.add_argument_group('bitcoind interface')
923 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
924 help='connect to this address (default: 127.0.0.1)',
925 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
926 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
927 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
928 type=int, action='store', default=None, dest='bitcoind_rpc_port')
929 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
930 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
931 type=int, action='store', default=None, dest='bitcoind_p2p_port')
933 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
934 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
935 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
937 args = parser.parse_args()
942 net_name = args.net_name + ('_testnet' if args.testnet else '')
943 net = networks.nets[net_name]
945 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
946 if not os.path.exists(datadir_path):
947 os.makedirs(datadir_path)
949 if len(args.bitcoind_rpc_userpass) > 2:
950 parser.error('a maximum of two arguments are allowed')
951 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
953 if args.bitcoind_rpc_password is None:
954 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
955 parser.error('This network has no configuration file function. Manually enter your RPC password.')
956 conf_path = net.PARENT.CONF_FILE_FUNC()
957 if not os.path.exists(conf_path):
958 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
959 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
962 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
963 with open(conf_path, 'rb') as f:
964 cp = ConfigParser.RawConfigParser()
965 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
966 for conf_name, var_name, var_type in [
967 ('rpcuser', 'bitcoind_rpc_username', str),
968 ('rpcpassword', 'bitcoind_rpc_password', str),
969 ('rpcport', 'bitcoind_rpc_port', int),
970 ('port', 'bitcoind_p2p_port', int),
972 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
973 setattr(args, var_name, var_type(cp.get('x', conf_name)))
974 if args.bitcoind_rpc_password is None:
975 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
977 if args.bitcoind_rpc_username is None:
978 args.bitcoind_rpc_username = ''
980 if args.bitcoind_rpc_port is None:
981 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
983 if args.bitcoind_p2p_port is None:
984 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
986 if args.p2pool_port is None:
987 args.p2pool_port = net.P2P_PORT
989 if args.worker_endpoint is None:
990 worker_endpoint = '', net.WORKER_PORT
991 elif ':' not in args.worker_endpoint:
992 worker_endpoint = '', int(args.worker_endpoint)
994 addr, port = args.worker_endpoint.rsplit(':', 1)
995 worker_endpoint = addr, int(port)
997 if args.address is not None:
999 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1000 except Exception, e:
1001 parser.error('error parsing address: ' + repr(e))
1003 args.pubkey_hash = None
1005 def separate_url(url):
1006 s = urlparse.urlsplit(url)
1007 if '@' not in s.netloc:
1008 parser.error('merged url netloc must contain an "@"')
1009 userpass, new_netloc = s.netloc.rsplit('@', 1)
1010 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1011 merged_urls = map(separate_url, args.merged_urls)
1013 if args.logfile is None:
1014 args.logfile = os.path.join(datadir_path, 'log')
1016 logfile = logging.LogFile(args.logfile)
1017 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1018 sys.stdout = logging.AbortPipe(pipe)
1019 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1020 if hasattr(signal, "SIGUSR1"):
1021 def sigusr1(signum, frame):
1022 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1024 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1025 signal.signal(signal.SIGUSR1, sigusr1)
1026 task.LoopingCall(logfile.reopen).start(5)
1028 class ErrorReporter(object):
1030 self.last_sent = None
1032 def emit(self, eventDict):
1033 if not eventDict["isError"]:
1036 if self.last_sent is not None and time.time() < self.last_sent + 5:
1038 self.last_sent = time.time()
1040 if 'failure' in eventDict:
1041 text = ((eventDict.get('why') or 'Unhandled Error')
1042 + '\n' + eventDict['failure'].getTraceback())
1044 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1046 from twisted.web import client
1048 url='http://u.forre.st/p2pool_error.cgi',
1050 postdata=p2pool.__version__ + '\n' + text,
1052 ).addBoth(lambda x: None)
1053 if not args.no_bugreport:
1054 log.addObserver(ErrorReporter().emit)
1056 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)