1 from __future__ import division
17 from twisted.internet import iocpreactor
22 print 'Using IOCP reactor!'
23 from twisted.internet import defer, reactor, protocol, task
24 from twisted.web import server
25 from twisted.python import log
26 from nattraverso import portmapper, ipdiscover
28 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
29 from bitcoin import worker_interface, height_tracker
30 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
31 from . import p2p, networks, web
32 import p2pool, p2pool.data as p2pool_data
34 @deferral.retry('Error getting work from bitcoind:', 3)
35 @defer.inlineCallbacks
36 def getwork(bitcoind):
38 work = yield bitcoind.rpc_getmemorypool()
39 except jsonrpc.Error, e:
40 if e.code == -32601: # Method not found
41 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
42 raise deferral.RetrySilentlyException()
44 packed_transactions = [x.decode('hex') for x in work['transactions']]
45 defer.returnValue(dict(
46 version=work['version'],
47 previous_block_hash=int(work['previousblockhash'], 16),
48 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
49 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50 subsidy=work['coinbasevalue'],
52 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
59 print 'p2pool (version %s)' % (p2pool.__version__,)
62 # connect to bitcoind over JSON-RPC and do initial getmemorypool
63 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66 @deferral.retry('Error while checking Bitcoin connection:', 1)
67 @defer.inlineCallbacks
69 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71 raise deferral.RetrySilentlyException()
72 temp_work = yield getwork(bitcoind)
73 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75 raise deferral.RetrySilentlyException()
76 defer.returnValue(temp_work)
77 temp_work = yield check()
79 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
82 # connect to bitcoind over bitcoin-p2p
83 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84 factory = bitcoin_p2p.ClientFactory(net.PARENT)
85 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86 yield factory.getProtocol() # waits until handshake is successful
90 print 'Determining payout address...'
91 if args.pubkey_hash is None:
92 address_path = os.path.join(datadir_path, 'cached_payout_address')
94 if os.path.exists(address_path):
95 with open(address_path, 'rb') as f:
96 address = f.read().strip('\r\n')
97 print ' Loaded cached address: %s...' % (address,)
101 if address is not None:
102 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103 if not res['isvalid'] or not res['ismine']:
104 print ' Cached address is either invalid or not controlled by local bitcoind!'
108 print ' Getting payout address from bitcoind...'
109 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
111 with open(address_path, 'wb') as f:
114 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
116 my_pubkey_hash = args.pubkey_hash
117 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
120 my_share_hashes = set()
121 my_doa_share_hashes = set()
123 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124 shared_share_hashes = set()
125 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126 known_verified = set()
127 print "Loading shares..."
128 for i, (mode, contents) in enumerate(ss.get_shares()):
130 if contents.hash in tracker.shares:
132 shared_share_hashes.add(contents.hash)
133 contents.time_seen = 0
134 tracker.add(contents)
135 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136 print " %i" % (len(tracker.shares),)
137 elif mode == 'verified_hash':
138 known_verified.add(contents)
140 raise AssertionError()
141 print " ...inserting %i verified shares..." % (len(known_verified),)
142 for h in known_verified:
143 if h not in tracker.shares:
144 ss.forget_verified_share(h)
146 tracker.verified.add(tracker.shares[h])
147 print " ...done loading %i shares!" % (len(tracker.shares),)
149 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
153 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155 pre_current_work = variable.Variable(None)
156 pre_merged_work = variable.Variable({})
157 # information affecting work that should trigger a long-polling update
158 current_work = variable.Variable(None)
159 # information affecting work that should not trigger a long-polling update
160 current_work2 = variable.Variable(None)
162 requested = expiring_dict.ExpiringDict(300)
164 print 'Initializing work...'
165 @defer.inlineCallbacks
166 def set_real_work1():
167 work = yield getwork(bitcoind)
168 current_work2.set(dict(
170 transactions=work['transactions'],
171 merkle_link=work['merkle_link'],
172 subsidy=work['subsidy'],
173 clock_offset=time.time() - work['time'],
174 last_update=time.time(),
175 )) # second set first because everything hooks on the first
176 pre_current_work.set(dict(
177 version=work['version'],
178 previous_block=work['previous_block_hash'],
180 coinbaseflags=work['coinbaseflags'],
182 yield set_real_work1()
184 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
186 def set_real_work2():
187 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
189 t = dict(pre_current_work.value)
190 t['best_share_hash'] = best
191 t['mm_chains'] = pre_merged_work.value
195 for peer2, share_hash in desired:
196 if share_hash not in tracker.tails: # was received in the time tracker.think was running
198 last_request_time, count = requested.get(share_hash, (None, 0))
199 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
201 potential_peers = set()
202 for head in tracker.tails[share_hash]:
203 potential_peers.update(peer_heads.get(head, set()))
204 potential_peers = [peer for peer in potential_peers if peer.connected2]
205 if count == 0 and peer2 is not None and peer2.connected2:
208 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
212 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
216 stops=list(set(tracker.heads) | set(
217 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
220 requested[share_hash] = t, count + 1
221 pre_current_work.changed.watch(lambda _: set_real_work2())
222 pre_merged_work.changed.watch(lambda _: set_real_work2())
228 @defer.inlineCallbacks
229 def set_merged_work(merged_url, merged_userpass):
230 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
232 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
233 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
234 hash=int(auxblock['hash'], 16),
235 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
236 merged_proxy=merged_proxy,
238 yield deferral.sleep(1)
239 for merged_url, merged_userpass in merged_urls:
240 set_merged_work(merged_url, merged_userpass)
242 @pre_merged_work.changed.watch
243 def _(new_merged_work):
244 print 'Got new merged mining work!'
246 # setup p2p logic and join p2pool network
248 class Node(p2p.Node):
249 def handle_shares(self, shares, peer):
251 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
255 if share.hash in tracker.shares:
256 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
261 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
265 if shares and peer is not None:
266 peer_heads.setdefault(shares[0].hash, set()).add(peer)
272 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
274 def handle_share_hashes(self, hashes, peer):
277 for share_hash in hashes:
278 if share_hash in tracker.shares:
280 last_request_time, count = requested.get(share_hash, (None, 0))
281 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
283 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284 get_hashes.append(share_hash)
285 requested[share_hash] = t, count + 1
287 if hashes and peer is not None:
288 peer_heads.setdefault(hashes[0], set()).add(peer)
290 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
292 def handle_get_shares(self, hashes, parents, stops, peer):
293 parents = min(parents, 1000//len(hashes))
296 for share_hash in hashes:
297 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298 if share.hash in stops:
301 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
305 def submit_block_p2p(block):
306 if factory.conn.value is None:
307 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
308 raise deferral.RetrySilentlyException()
309 factory.conn.value.send_block(block=block)
311 @deferral.retry('Error submitting block: (will retry)', 10, 10)
312 @defer.inlineCallbacks
313 def submit_block_rpc(block, ignore_failure):
314 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
315 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
316 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
317 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
319 def submit_block(block, ignore_failure):
320 submit_block_p2p(block)
321 submit_block_rpc(block, ignore_failure)
323 @tracker.verified.added.watch
325 if share.pow_hash <= share.header['bits'].target:
326 submit_block(share.as_block(tracker), ignore_failure=True)
328 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
331 if (get_height_rel_highest(share.header['previous_block']) > -5 or
332 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
333 broadcast_share(share.hash)
335 reactor.callLater(5, spread) # so get_height_rel_highest can update
337 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
339 @defer.inlineCallbacks
342 ip, port = x.split(':')
343 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
345 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
348 if os.path.exists(os.path.join(datadir_path, 'addrs')):
350 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
351 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
353 print >>sys.stderr, 'error parsing addrs'
354 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
356 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
358 print >>sys.stderr, "error reading addrs.txt"
359 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
362 if addr not in addrs:
363 addrs[addr] = (0, time.time(), time.time())
367 connect_addrs = set()
368 for addr_df in map(parse, args.p2pool_nodes):
370 connect_addrs.add((yield addr_df))
375 best_share_hash_func=lambda: current_work.value['best_share_hash'],
376 port=args.p2pool_port,
379 connect_addrs=connect_addrs,
380 max_incoming_conns=args.p2pool_conns,
385 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
386 f.write(json.dumps(p2p_node.addr_store.items()))
387 task.LoopingCall(save_addrs).start(60)
389 def broadcast_share(share_hash):
391 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
392 if share.hash in shared_share_hashes:
394 shared_share_hashes.add(share.hash)
397 for peer in p2p_node.peers.itervalues():
398 peer.sendShares([share for share in shares if share.peer is not peer])
400 # send share when the chain changes to their chain
401 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
404 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
406 if share.hash in tracker.verified.shares:
407 ss.add_verified_hash(share.hash)
408 task.LoopingCall(save_shares).start(60)
414 @defer.inlineCallbacks
418 is_lan, lan_ip = yield ipdiscover.get_local_ip()
420 pm = yield portmapper.get_port_mapper()
421 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
422 except defer.TimeoutError:
426 log.err(None, 'UPnP error:')
427 yield deferral.sleep(random.expovariate(1/120))
430 # start listening for workers with a JSON-RPC server
432 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
436 removed_unstales_var = variable.Variable((0, 0, 0))
437 removed_doa_unstales_var = variable.Variable(0)
438 @tracker.verified.removed.watch
440 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
441 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
442 removed_unstales_var.set((
443 removed_unstales_var.value[0] + 1,
444 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
445 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
447 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
448 removed_doa_unstales.set(removed_doa_unstales.value + 1)
450 def get_stale_counts():
451 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
452 my_shares = len(my_share_hashes)
453 my_doa_shares = len(my_doa_share_hashes)
454 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
455 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
456 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
457 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
458 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
460 my_shares_not_in_chain = my_shares - my_shares_in_chain
461 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
463 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
466 pseudoshare_received = variable.Event()
467 share_received = variable.Event()
468 local_rate_monitor = math.RateMonitor(10*60)
470 class WorkerBridge(worker_interface.WorkerBridge):
472 worker_interface.WorkerBridge.__init__(self)
473 self.new_work_event = current_work.changed
474 self.recent_shares_ts_work = []
476 def get_user_details(self, request):
477 user = request.getUser() if request.getUser() is not None else ''
479 desired_pseudoshare_target = None
481 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
483 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
487 desired_share_target = 2**256 - 1
489 user, min_diff_str = user.rsplit('/', 1)
491 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
495 if random.uniform(0, 100) < args.worker_fee:
496 pubkey_hash = my_pubkey_hash
499 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
501 pubkey_hash = my_pubkey_hash
503 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
505 def preprocess_request(self, request):
506 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
507 return pubkey_hash, desired_share_target, desired_pseudoshare_target
509 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
510 if len(p2p_node.peers) == 0 and net.PERSIST:
511 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
512 if current_work.value['best_share_hash'] is None and net.PERSIST:
513 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
514 if time.time() > current_work2.value['last_update'] + 60:
515 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
517 if current_work.value['mm_chains']:
518 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
519 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
520 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
521 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
525 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
530 share_type = p2pool_data.NewShare
531 if current_work.value['best_share_hash'] is not None:
532 previous_share = tracker.shares[current_work.value['best_share_hash']]
533 if isinstance(previous_share, p2pool_data.Share):
534 # Share -> NewShare only valid if 85% of hashes in [net.CHAIN_LENGTH*9//10, net.CHAIN_LENGTH] for new version
535 if tracker.get_height(previous_share.hash) < net.CHAIN_LENGTH:
536 share_type = p2pool_data.Share
538 counts = p2pool_data.get_desired_version_counts(tracker,
539 tracker.get_nth_parent_hash(previous_share.hash, net.CHAIN_LENGTH*9//10), net.CHAIN_LENGTH//10)
540 if counts.get(2, 0) < sum(counts.itervalues())*95//100:
541 share_type = p2pool_data.Share
544 share_info, generate_tx = share_type.generate_transaction(
547 previous_share_hash=current_work.value['best_share_hash'],
548 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
549 nonce=random.randrange(2**32),
550 pubkey_hash=pubkey_hash,
551 subsidy=current_work2.value['subsidy'],
552 donation=math.perfect_round(65535*args.donation_percentage/100),
553 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
554 253 if orphans > orphans_recorded_in_chain else
555 254 if doas > doas_recorded_in_chain else
557 )(*get_stale_counts()),
560 block_target=current_work.value['bits'].target,
561 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
562 desired_target=desired_share_target,
563 ref_merkle_link=dict(branch=[], index=0),
567 if desired_pseudoshare_target is None:
568 if len(self.recent_shares_ts_work) == 50:
569 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
570 target = min(2**256-1, int(4*2**256/hash_rate))
574 target = desired_pseudoshare_target
575 target = max(target, share_info['bits'].target)
576 for aux_work in current_work.value['mm_chains'].itervalues():
577 target = max(target, aux_work['target'])
578 target = min(target, net.PARENT.SANE_MAX_TARGET)
580 transactions = [generate_tx] + list(current_work2.value['transactions'])
581 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
582 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
584 getwork_time = time.time()
585 merkle_link = current_work2.value['merkle_link']
587 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
588 bitcoin_data.target_to_difficulty(target),
589 bitcoin_data.target_to_difficulty(share_info['bits'].target),
590 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
591 len(current_work2.value['transactions']),
594 ba = bitcoin_getwork.BlockAttempt(
595 version=current_work.value['version'],
596 previous_block=current_work.value['previous_block'],
597 merkle_root=merkle_root,
598 timestamp=current_work2.value['time'],
599 bits=current_work.value['bits'],
603 received_header_hashes = set()
605 def got_response(header, request):
606 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
607 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
609 if pow_hash <= header['bits'].target or p2pool.DEBUG:
610 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
611 if pow_hash <= header['bits'].target:
613 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
616 log.err(None, 'Error while processing potential block:')
618 user, _, _, _ = self.get_user_details(request)
619 assert header['merkle_root'] == merkle_root
621 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
623 for aux_work, index, hashes in mm_later:
625 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
626 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
627 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
628 bitcoin_data.aux_pow_type.pack(dict(
631 block_hash=header_hash,
632 merkle_link=merkle_link,
634 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
635 parent_block_header=header,
640 if result != (pow_hash <= aux_work['target']):
641 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
643 print 'Merged block submittal result: %s' % (result,)
646 log.err(err, 'Error submitting merged block:')
648 log.err(None, 'Error while processing merged mining POW:')
650 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
651 min_header = dict(header);del min_header['merkle_root']
652 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], share_type.gentx_before_refhash)
653 share = share_type(net, None, dict(
654 min_header=min_header, share_info=share_info, hash_link=hash_link,
655 ref_merkle_link=dict(branch=[], index=0),
656 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
658 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
660 p2pool_data.format_hash(share.hash),
661 p2pool_data.format_hash(share.previous_hash),
662 time.time() - getwork_time,
663 ' DEAD ON ARRIVAL' if not on_time else '',
665 my_share_hashes.add(share.hash)
667 my_doa_share_hashes.add(share.hash)
671 tracker.verified.add(share)
675 if pow_hash <= header['bits'].target or p2pool.DEBUG:
676 for peer in p2p_node.peers.itervalues():
677 peer.sendShares([share])
678 shared_share_hashes.add(share.hash)
680 log.err(None, 'Error forwarding block solution:')
682 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
684 if pow_hash > target:
685 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
686 print ' Hash: %56x' % (pow_hash,)
687 print ' Target: %56x' % (target,)
688 elif header_hash in received_header_hashes:
689 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
691 received_header_hashes.add(header_hash)
693 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
694 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
695 while len(self.recent_shares_ts_work) > 50:
696 self.recent_shares_ts_work.pop(0)
697 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
701 return ba, got_response
703 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
705 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
706 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
708 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
710 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
717 @defer.inlineCallbacks
720 flag = factory.new_block.get_deferred()
722 yield set_real_work1()
725 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
730 print 'Started successfully!'
731 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
732 if args.donation_percentage > 0.51:
733 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
734 elif args.donation_percentage < 0.49:
735 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
737 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
738 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
742 if hasattr(signal, 'SIGALRM'):
743 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
744 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
746 signal.siginterrupt(signal.SIGALRM, False)
747 task.LoopingCall(signal.alarm, 30).start(1)
749 if args.irc_announce:
750 from twisted.words.protocols import irc
751 class IRCClient(irc.IRCClient):
752 nickname = 'p2pool%02i' % (random.randrange(100),)
753 channel = net.ANNOUNCE_CHANNEL
754 def lineReceived(self, line):
757 irc.IRCClient.lineReceived(self, line)
759 irc.IRCClient.signedOn(self)
760 self.factory.resetDelay()
761 self.join(self.channel)
762 @defer.inlineCallbacks
763 def new_share(share):
764 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
765 yield deferral.sleep(random.expovariate(1/60))
766 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
767 if message not in self.recent_messages:
768 self.say(self.channel, message)
769 self._remember_message(message)
770 self.watch_id = tracker.verified.added.watch(new_share)
771 self.recent_messages = []
772 def _remember_message(self, message):
773 self.recent_messages.append(message)
774 while len(self.recent_messages) > 100:
775 self.recent_messages.pop(0)
776 def privmsg(self, user, channel, message):
777 if channel == self.channel:
778 self._remember_message(message)
779 def connectionLost(self, reason):
780 tracker.verified.added.unwatch(self.watch_id)
781 print 'IRC connection lost:', reason.getErrorMessage()
782 class IRCClientFactory(protocol.ReconnectingClientFactory):
784 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
786 @defer.inlineCallbacks
791 yield deferral.sleep(3)
793 if time.time() > current_work2.value['last_update'] + 60:
794 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
796 height = tracker.get_height(current_work.value['best_share_hash'])
797 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
799 len(tracker.verified.shares),
802 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
803 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
805 datums, dt = local_rate_monitor.get_datums_in_last()
806 my_att_s = sum(datum['work']/dt for datum in datums)
807 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
808 math.format(int(my_att_s)),
810 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
811 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
815 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
816 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
817 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
819 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
820 shares, stale_orphan_shares, stale_doa_shares,
821 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
822 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
823 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
825 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
826 math.format(int(real_att_s)),
828 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
831 for warning in p2pool_data.get_warnings(tracker, current_work, net):
832 print >>sys.stderr, '#'*40
833 print >>sys.stderr, '>>> Warning: ' + warning
834 print >>sys.stderr, '#'*40
836 if this_str != last_str or time.time() > last_time + 15:
839 last_time = time.time()
845 log.err(None, 'Fatal error:')
848 class FixedArgumentParser(argparse.ArgumentParser):
849 def _read_args_from_files(self, arg_strings):
850 # expand arguments referencing files
852 for arg_string in arg_strings:
854 # for regular arguments, just add them back into the list
855 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
856 new_arg_strings.append(arg_string)
858 # replace arguments referencing files with the file content
861 args_file = open(arg_string[1:])
864 for arg_line in args_file.read().splitlines():
865 for arg in self.convert_arg_line_to_args(arg_line):
866 arg_strings.append(arg)
867 arg_strings = self._read_args_from_files(arg_strings)
868 new_arg_strings.extend(arg_strings)
872 err = sys.exc_info()[1]
875 # return the modified argument list
876 return new_arg_strings
878 def convert_arg_line_to_args(self, arg_line):
879 return [arg for arg in arg_line.split() if arg.strip()]
882 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
884 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
885 parser.add_argument('--version', action='version', version=p2pool.__version__)
886 parser.add_argument('--net',
887 help='use specified network (default: bitcoin)',
888 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
889 parser.add_argument('--testnet',
890 help='''use the network's testnet''',
891 action='store_const', const=True, default=False, dest='testnet')
892 parser.add_argument('--debug',
893 help='enable debugging mode',
894 action='store_const', const=True, default=False, dest='debug')
895 parser.add_argument('-a', '--address',
896 help='generate payouts to this address (default: <address requested from bitcoind>)',
897 type=str, action='store', default=None, dest='address')
898 parser.add_argument('--datadir',
899 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
900 type=str, action='store', default=None, dest='datadir')
901 parser.add_argument('--logfile',
902 help='''log to this file (default: data/<NET>/log)''',
903 type=str, action='store', default=None, dest='logfile')
904 parser.add_argument('--merged',
905 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
906 type=str, action='append', default=[], dest='merged_urls')
907 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
908 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
909 type=float, action='store', default=0.5, dest='donation_percentage')
910 parser.add_argument('--irc-announce',
911 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
912 action='store_true', default=False, dest='irc_announce')
913 parser.add_argument('--no-bugreport',
914 help='disable submitting caught exceptions to the author',
915 action='store_true', default=False, dest='no_bugreport')
917 p2pool_group = parser.add_argument_group('p2pool interface')
918 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
919 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
920 type=int, action='store', default=None, dest='p2pool_port')
921 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
922 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
923 type=str, action='append', default=[], dest='p2pool_nodes')
924 parser.add_argument('--disable-upnp',
925 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
926 action='store_false', default=True, dest='upnp')
927 p2pool_group.add_argument('--max-conns', metavar='CONNS',
928 help='maximum incoming connections (default: 40)',
929 type=int, action='store', default=40, dest='p2pool_conns')
931 worker_group = parser.add_argument_group('worker interface')
932 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
933 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
934 type=str, action='store', default=None, dest='worker_endpoint')
935 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
936 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
937 type=float, action='store', default=0, dest='worker_fee')
939 bitcoind_group = parser.add_argument_group('bitcoind interface')
940 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
941 help='connect to this address (default: 127.0.0.1)',
942 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
943 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
944 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
945 type=int, action='store', default=None, dest='bitcoind_rpc_port')
946 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
947 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
948 type=int, action='store', default=None, dest='bitcoind_p2p_port')
950 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
951 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
952 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
954 args = parser.parse_args()
959 net_name = args.net_name + ('_testnet' if args.testnet else '')
960 net = networks.nets[net_name]
962 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
963 if not os.path.exists(datadir_path):
964 os.makedirs(datadir_path)
966 if len(args.bitcoind_rpc_userpass) > 2:
967 parser.error('a maximum of two arguments are allowed')
968 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
970 if args.bitcoind_rpc_password is None:
971 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
972 parser.error('This network has no configuration file function. Manually enter your RPC password.')
973 conf_path = net.PARENT.CONF_FILE_FUNC()
974 if not os.path.exists(conf_path):
975 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
976 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
979 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
980 with open(conf_path, 'rb') as f:
981 cp = ConfigParser.RawConfigParser()
982 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
983 for conf_name, var_name, var_type in [
984 ('rpcuser', 'bitcoind_rpc_username', str),
985 ('rpcpassword', 'bitcoind_rpc_password', str),
986 ('rpcport', 'bitcoind_rpc_port', int),
987 ('port', 'bitcoind_p2p_port', int),
989 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
990 setattr(args, var_name, var_type(cp.get('x', conf_name)))
991 if args.bitcoind_rpc_password is None:
992 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
994 if args.bitcoind_rpc_username is None:
995 args.bitcoind_rpc_username = ''
997 if args.bitcoind_rpc_port is None:
998 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1000 if args.bitcoind_p2p_port is None:
1001 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1003 if args.p2pool_port is None:
1004 args.p2pool_port = net.P2P_PORT
1006 if args.worker_endpoint is None:
1007 worker_endpoint = '', net.WORKER_PORT
1008 elif ':' not in args.worker_endpoint:
1009 worker_endpoint = '', int(args.worker_endpoint)
1011 addr, port = args.worker_endpoint.rsplit(':', 1)
1012 worker_endpoint = addr, int(port)
1014 if args.address is not None:
1016 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1017 except Exception, e:
1018 parser.error('error parsing address: ' + repr(e))
1020 args.pubkey_hash = None
1022 def separate_url(url):
1023 s = urlparse.urlsplit(url)
1024 if '@' not in s.netloc:
1025 parser.error('merged url netloc must contain an "@"')
1026 userpass, new_netloc = s.netloc.rsplit('@', 1)
1027 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1028 merged_urls = map(separate_url, args.merged_urls)
1030 if args.logfile is None:
1031 args.logfile = os.path.join(datadir_path, 'log')
1033 logfile = logging.LogFile(args.logfile)
1034 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1035 sys.stdout = logging.AbortPipe(pipe)
1036 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1037 if hasattr(signal, "SIGUSR1"):
1038 def sigusr1(signum, frame):
1039 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1041 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1042 signal.signal(signal.SIGUSR1, sigusr1)
1043 task.LoopingCall(logfile.reopen).start(5)
1045 class ErrorReporter(object):
1047 self.last_sent = None
1049 def emit(self, eventDict):
1050 if not eventDict["isError"]:
1053 if self.last_sent is not None and time.time() < self.last_sent + 5:
1055 self.last_sent = time.time()
1057 if 'failure' in eventDict:
1058 text = ((eventDict.get('why') or 'Unhandled Error')
1059 + '\n' + eventDict['failure'].getTraceback())
1061 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1063 from twisted.web import client
1065 url='http://u.forre.st/p2pool_error.cgi',
1067 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1069 ).addBoth(lambda x: None)
1070 if not args.no_bugreport:
1071 log.addObserver(ErrorReporter().emit)
1073 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)