1 from __future__ import division
16 if '--iocp' in sys.argv:
17 from twisted.internet import iocpreactor
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
34 work = yield bitcoind.rpc_getmemorypool()
35 except jsonrpc.Error, e:
36 if e.code == -32601: # Method not found
37 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38 raise deferral.RetrySilentlyException()
40 packed_transactions = [x.decode('hex') for x in work['transactions']]
42 unpacked = map(bitcoin_data.tx_type.unpack, packed_transactions)
44 print (e-s)*1000, "ms"
45 defer.returnValue(dict(
46 version=work['version'],
47 previous_block_hash=int(work['previousblockhash'], 16),
48 transactions=unpacked,
49 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
50 subsidy=work['coinbasevalue'],
52 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
53 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
59 print 'p2pool (version %s)' % (p2pool.__version__,)
62 # connect to bitcoind over JSON-RPC and do initial getmemorypool
63 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66 @deferral.retry('Error while checking Bitcoin connection:', 1)
67 @defer.inlineCallbacks
69 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
70 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71 raise deferral.RetrySilentlyException()
72 temp_work = yield getwork(bitcoind)
73 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75 raise deferral.RetrySilentlyException()
76 defer.returnValue(temp_work)
77 temp_work = yield check()
79 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
82 # connect to bitcoind over bitcoin-p2p
83 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
84 factory = bitcoin_p2p.ClientFactory(net.PARENT)
85 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
86 yield factory.getProtocol() # waits until handshake is successful
90 print 'Determining payout address...'
91 if args.pubkey_hash is None:
92 address_path = os.path.join(datadir_path, 'cached_payout_address')
94 if os.path.exists(address_path):
95 with open(address_path, 'rb') as f:
96 address = f.read().strip('\r\n')
97 print ' Loaded cached address: %s...' % (address,)
101 if address is not None:
102 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
103 if not res['isvalid'] or not res['ismine']:
104 print ' Cached address is either invalid or not controlled by local bitcoind!'
108 print ' Getting payout address from bitcoind...'
109 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
111 with open(address_path, 'wb') as f:
114 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
116 my_pubkey_hash = args.pubkey_hash
117 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
120 my_share_hashes = set()
121 my_doa_share_hashes = set()
123 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
124 shared_share_hashes = set()
125 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
126 known_verified = set()
127 print "Loading shares..."
128 for i, (mode, contents) in enumerate(ss.get_shares()):
130 if contents.hash in tracker.shares:
132 shared_share_hashes.add(contents.hash)
133 contents.time_seen = 0
134 tracker.add(contents)
135 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136 print " %i" % (len(tracker.shares),)
137 elif mode == 'verified_hash':
138 known_verified.add(contents)
140 raise AssertionError()
141 print " ...inserting %i verified shares..." % (len(known_verified),)
142 for h in known_verified:
143 if h not in tracker.shares:
144 ss.forget_verified_share(h)
146 tracker.verified.add(tracker.shares[h])
147 print " ...done loading %i shares!" % (len(tracker.shares),)
149 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
150 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
151 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
153 print 'Initializing work...'
158 bitcoind_work = variable.Variable(None)
160 @defer.inlineCallbacks
162 work = yield getwork(bitcoind)
163 bitcoind_work.set(dict(
164 version=work['version'],
165 previous_block=work['previous_block_hash'],
167 coinbaseflags=work['coinbaseflags'],
169 transactions=work['transactions'],
170 merkle_link=work['merkle_link'],
171 subsidy=work['subsidy'],
172 clock_offset=time.time() - work['time'],
173 last_update=time.time(),
175 yield poll_bitcoind()
177 @defer.inlineCallbacks
180 flag = factory.new_block.get_deferred()
182 yield poll_bitcoind()
185 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
190 best_block_header = variable.Variable(None)
191 def handle_header(header):
192 # check that header matches current target
193 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) <= bitcoind_work.value['bits'].target):
195 if (best_block_header.value is None
197 header['previous_block'] == current_work.value['previous_block'] and
198 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == current_work.value['previous_block']
199 ) # new is child of current and previous is current
201 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) == current_work.value['previous_block'] and
202 best_block_header.value['previous_block'] != current_work.value['previous_block'])
203 ): # new is current and previous is not child of current
204 best_block_header.set(header)
205 @bitcoind_work.changed.watch
206 @defer.inlineCallbacks
208 handle_header((yield factory.conn.value.get_block_header(work['previous_block'])))
209 @best_block_header.changed.watch
215 merged_work = variable.Variable({})
217 @defer.inlineCallbacks
218 def set_merged_work(merged_url, merged_userpass):
219 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
221 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
222 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
223 hash=int(auxblock['hash'], 16),
224 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
225 merged_proxy=merged_proxy,
227 yield deferral.sleep(1)
228 for merged_url, merged_userpass in merged_urls:
229 set_merged_work(merged_url, merged_userpass)
231 @merged_work.changed.watch
232 def _(new_merged_work):
233 print 'Got new merged mining work!'
237 current_work = variable.Variable(None)
239 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
240 requested = expiring_dict.ExpiringDict(300)
241 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
243 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
245 t = dict(bitcoind_work.value)
247 if (best_block_header.value is not None and
248 best_block_header.value['previous_block'] == t['previous_block'] and
249 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
250 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
251 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
253 version=best_block_header.value['version'],
254 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
255 bits=best_block_header.value['bits'], # not always true
257 time=best_block_header.value['timestamp'] + 600, # better way?
259 merkle_link=bitcoin_data.calculate_merkle_link([0], 0),
260 subsidy=5000000000, # XXX fix this
261 clock_offset=current_work.value['clock_offset'],
262 last_update=current_work.value['last_update'],
265 t['best_share_hash'] = best
266 t['mm_chains'] = merged_work.value
270 for peer2, share_hash in desired:
271 if share_hash not in tracker.tails: # was received in the time tracker.think was running
273 last_request_time, count = requested.get(share_hash, (None, 0))
274 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
276 potential_peers = set()
277 for head in tracker.tails[share_hash]:
278 potential_peers.update(peer_heads.get(head, set()))
279 potential_peers = [peer for peer in potential_peers if peer.connected2]
280 if count == 0 and peer2 is not None and peer2.connected2:
283 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
287 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
291 stops=list(set(tracker.heads) | set(
292 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
295 requested[share_hash] = t, count + 1
296 bitcoind_work.changed.watch(lambda _: compute_work())
297 merged_work.changed.watch(lambda _: compute_work())
302 lp_signal = variable.Event()
304 @current_work.transitioned.watch
305 def _(before, after):
306 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
307 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
314 # setup p2p logic and join p2pool network
316 class Node(p2p.Node):
317 def handle_shares(self, shares, peer):
319 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
323 if share.hash in tracker.shares:
324 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
329 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
333 if shares and peer is not None:
334 peer_heads.setdefault(shares[0].hash, set()).add(peer)
340 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
342 def handle_share_hashes(self, hashes, peer):
345 for share_hash in hashes:
346 if share_hash in tracker.shares:
348 last_request_time, count = requested.get(share_hash, (None, 0))
349 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
351 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
352 get_hashes.append(share_hash)
353 requested[share_hash] = t, count + 1
355 if hashes and peer is not None:
356 peer_heads.setdefault(hashes[0], set()).add(peer)
358 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
360 def handle_get_shares(self, hashes, parents, stops, peer):
361 parents = min(parents, 1000//len(hashes))
364 for share_hash in hashes:
365 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
366 if share.hash in stops:
369 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
372 def handle_bestblock(self, header, peer):
373 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
374 raise p2p.PeerMisbehavingError('received block header fails PoW test')
375 handle_header(header)
377 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
378 def submit_block_p2p(block):
379 if factory.conn.value is None:
380 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
381 raise deferral.RetrySilentlyException()
382 factory.conn.value.send_block(block=block)
384 @deferral.retry('Error submitting block: (will retry)', 10, 10)
385 @defer.inlineCallbacks
386 def submit_block_rpc(block, ignore_failure):
387 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
388 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
389 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
390 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
392 def submit_block(block, ignore_failure):
393 submit_block_p2p(block)
394 submit_block_rpc(block, ignore_failure)
396 @tracker.verified.added.watch
398 if share.pow_hash <= share.header['bits'].target:
399 submit_block(share.as_block(tracker), ignore_failure=True)
401 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
404 if (get_height_rel_highest(share.header['previous_block']) > -5 or
405 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
406 broadcast_share(share.hash)
408 reactor.callLater(5, spread) # so get_height_rel_highest can update
410 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
412 @defer.inlineCallbacks
415 ip, port = x.split(':')
416 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
418 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
421 if os.path.exists(os.path.join(datadir_path, 'addrs')):
423 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
424 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
426 print >>sys.stderr, 'error parsing addrs'
427 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
429 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
431 print >>sys.stderr, "error reading addrs.txt"
432 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
435 if addr not in addrs:
436 addrs[addr] = (0, time.time(), time.time())
440 connect_addrs = set()
441 for addr_df in map(parse, args.p2pool_nodes):
443 connect_addrs.add((yield addr_df))
448 best_share_hash_func=lambda: current_work.value['best_share_hash'],
449 port=args.p2pool_port,
452 connect_addrs=connect_addrs,
453 max_incoming_conns=args.p2pool_conns,
458 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
459 f.write(json.dumps(p2p_node.addr_store.items()))
460 task.LoopingCall(save_addrs).start(60)
462 @best_block_header.changed.watch
464 for peer in p2p_node.peers.itervalues():
465 peer.send_bestblock(header=header)
467 def broadcast_share(share_hash):
469 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
470 if share.hash in shared_share_hashes:
472 shared_share_hashes.add(share.hash)
475 for peer in p2p_node.peers.itervalues():
476 peer.sendShares([share for share in shares if share.peer is not peer])
478 # send share when the chain changes to their chain
479 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
482 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
484 if share.hash in tracker.verified.shares:
485 ss.add_verified_hash(share.hash)
486 task.LoopingCall(save_shares).start(60)
492 @defer.inlineCallbacks
496 is_lan, lan_ip = yield ipdiscover.get_local_ip()
498 pm = yield portmapper.get_port_mapper()
499 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
500 except defer.TimeoutError:
504 log.err(None, 'UPnP error:')
505 yield deferral.sleep(random.expovariate(1/120))
508 # start listening for workers with a JSON-RPC server
510 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
514 removed_unstales_var = variable.Variable((0, 0, 0))
515 removed_doa_unstales_var = variable.Variable(0)
516 @tracker.verified.removed.watch
518 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
519 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
520 removed_unstales_var.set((
521 removed_unstales_var.value[0] + 1,
522 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
523 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
525 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
526 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
528 def get_stale_counts():
529 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
530 my_shares = len(my_share_hashes)
531 my_doa_shares = len(my_doa_share_hashes)
532 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
533 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
534 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
535 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
536 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
538 my_shares_not_in_chain = my_shares - my_shares_in_chain
539 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
541 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
544 pseudoshare_received = variable.Event()
545 share_received = variable.Event()
546 local_rate_monitor = math.RateMonitor(10*60)
548 class WorkerBridge(worker_interface.WorkerBridge):
550 worker_interface.WorkerBridge.__init__(self)
551 self.new_work_event = lp_signal
552 self.recent_shares_ts_work = []
554 def get_user_details(self, request):
555 user = request.getUser() if request.getUser() is not None else ''
557 desired_pseudoshare_target = None
559 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
561 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
565 desired_share_target = 2**256 - 1
567 user, min_diff_str = user.rsplit('/', 1)
569 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
573 if random.uniform(0, 100) < args.worker_fee:
574 pubkey_hash = my_pubkey_hash
577 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
579 pubkey_hash = my_pubkey_hash
581 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
583 def preprocess_request(self, request):
584 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
585 return pubkey_hash, desired_share_target, desired_pseudoshare_target
587 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
588 if len(p2p_node.peers) == 0 and net.PERSIST:
589 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
590 if current_work.value['best_share_hash'] is None and net.PERSIST:
591 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
592 if time.time() > current_work.value['last_update'] + 60:
593 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
595 if current_work.value['mm_chains']:
596 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
597 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
598 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
599 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
603 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
609 share_info, generate_tx = p2pool_data.Share.generate_transaction(
612 previous_share_hash=current_work.value['best_share_hash'],
613 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
614 nonce=random.randrange(2**32),
615 pubkey_hash=pubkey_hash,
616 subsidy=current_work.value['subsidy'],
617 donation=math.perfect_round(65535*args.donation_percentage/100),
618 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
619 'orphan' if orphans > orphans_recorded_in_chain else
620 'doa' if doas > doas_recorded_in_chain else
622 )(*get_stale_counts()),
625 block_target=current_work.value['bits'].target,
626 desired_timestamp=int(time.time() - current_work.value['clock_offset']),
627 desired_target=desired_share_target,
628 ref_merkle_link=dict(branch=[], index=0),
632 if desired_pseudoshare_target is None:
634 if len(self.recent_shares_ts_work) == 50:
635 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
637 target = min(target, int(2**256/hash_rate))
639 target = desired_pseudoshare_target
640 target = max(target, share_info['bits'].target)
641 for aux_work in current_work.value['mm_chains'].itervalues():
642 target = max(target, aux_work['target'])
643 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
645 transactions = [generate_tx] + list(current_work.value['transactions'])
646 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
647 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
649 getwork_time = time.time()
650 merkle_link = current_work.value['merkle_link']
652 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
653 bitcoin_data.target_to_difficulty(target),
654 bitcoin_data.target_to_difficulty(share_info['bits'].target),
655 current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
656 len(current_work.value['transactions']),
659 bits = current_work.value['bits']
660 previous_block = current_work.value['previous_block']
661 ba = bitcoin_getwork.BlockAttempt(
662 version=current_work.value['version'],
663 previous_block=current_work.value['previous_block'],
664 merkle_root=merkle_root,
665 timestamp=current_work.value['time'],
666 bits=current_work.value['bits'],
670 received_header_hashes = set()
672 def got_response(header, request):
673 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
674 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
676 if pow_hash <= header['bits'].target or p2pool.DEBUG:
677 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
678 if pow_hash <= header['bits'].target:
680 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
683 log.err(None, 'Error while processing potential block:')
685 user, _, _, _ = self.get_user_details(request)
686 assert header['merkle_root'] == merkle_root
687 assert header['previous_block'] == previous_block
688 assert header['bits'] == bits
690 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
692 for aux_work, index, hashes in mm_later:
694 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
695 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
696 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
697 bitcoin_data.aux_pow_type.pack(dict(
700 block_hash=header_hash,
701 merkle_link=merkle_link,
703 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
704 parent_block_header=header,
709 if result != (pow_hash <= aux_work['target']):
710 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
712 print 'Merged block submittal result: %s' % (result,)
715 log.err(err, 'Error submitting merged block:')
717 log.err(None, 'Error while processing merged mining POW:')
719 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
720 min_header = dict(header);del min_header['merkle_root']
721 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
722 share = p2pool_data.Share(net, None, dict(
723 min_header=min_header, share_info=share_info, hash_link=hash_link,
724 ref_merkle_link=dict(branch=[], index=0),
725 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
727 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
729 p2pool_data.format_hash(share.hash),
730 p2pool_data.format_hash(share.previous_hash),
731 time.time() - getwork_time,
732 ' DEAD ON ARRIVAL' if not on_time else '',
734 my_share_hashes.add(share.hash)
736 my_doa_share_hashes.add(share.hash)
740 tracker.verified.add(share)
744 if pow_hash <= header['bits'].target or p2pool.DEBUG:
745 for peer in p2p_node.peers.itervalues():
746 peer.sendShares([share])
747 shared_share_hashes.add(share.hash)
749 log.err(None, 'Error forwarding block solution:')
751 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
753 if pow_hash > target:
754 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
755 print ' Hash: %56x' % (pow_hash,)
756 print ' Target: %56x' % (target,)
757 elif header_hash in received_header_hashes:
758 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
760 received_header_hashes.add(header_hash)
762 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
763 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
764 while len(self.recent_shares_ts_work) > 50:
765 self.recent_shares_ts_work.pop(0)
766 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
770 return ba, got_response
772 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
774 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
775 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
777 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
779 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
787 print 'Started successfully!'
788 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
789 if args.donation_percentage > 0.51:
790 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
791 elif args.donation_percentage < 0.49:
792 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
794 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
795 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
799 if hasattr(signal, 'SIGALRM'):
800 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
801 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
803 signal.siginterrupt(signal.SIGALRM, False)
804 task.LoopingCall(signal.alarm, 30).start(1)
806 if args.irc_announce:
807 from twisted.words.protocols import irc
808 class IRCClient(irc.IRCClient):
809 nickname = 'p2pool%02i' % (random.randrange(100),)
810 channel = net.ANNOUNCE_CHANNEL
811 def lineReceived(self, line):
814 irc.IRCClient.lineReceived(self, line)
816 irc.IRCClient.signedOn(self)
817 self.factory.resetDelay()
818 self.join(self.channel)
819 @defer.inlineCallbacks
820 def new_share(share):
821 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
822 yield deferral.sleep(random.expovariate(1/60))
823 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
824 if message not in self.recent_messages:
825 self.say(self.channel, message)
826 self._remember_message(message)
827 self.watch_id = tracker.verified.added.watch(new_share)
828 self.recent_messages = []
829 def _remember_message(self, message):
830 self.recent_messages.append(message)
831 while len(self.recent_messages) > 100:
832 self.recent_messages.pop(0)
833 def privmsg(self, user, channel, message):
834 if channel == self.channel:
835 self._remember_message(message)
836 def connectionLost(self, reason):
837 tracker.verified.added.unwatch(self.watch_id)
838 print 'IRC connection lost:', reason.getErrorMessage()
839 class IRCClientFactory(protocol.ReconnectingClientFactory):
841 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
843 @defer.inlineCallbacks
848 yield deferral.sleep(3)
850 if time.time() > current_work.value['last_update'] + 60:
851 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
853 height = tracker.get_height(current_work.value['best_share_hash'])
854 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
856 len(tracker.verified.shares),
859 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
860 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
862 datums, dt = local_rate_monitor.get_datums_in_last()
863 my_att_s = sum(datum['work']/dt for datum in datums)
864 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
865 math.format(int(my_att_s)),
867 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
868 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
872 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
873 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
874 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
876 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
877 shares, stale_orphan_shares, stale_doa_shares,
878 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
879 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
880 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
882 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
883 math.format(int(real_att_s)),
885 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
888 for warning in p2pool_data.get_warnings(tracker, current_work, net):
889 print >>sys.stderr, '#'*40
890 print >>sys.stderr, '>>> Warning: ' + warning
891 print >>sys.stderr, '#'*40
893 if this_str != last_str or time.time() > last_time + 15:
896 last_time = time.time()
902 log.err(None, 'Fatal error:')
905 class FixedArgumentParser(argparse.ArgumentParser):
906 def _read_args_from_files(self, arg_strings):
907 # expand arguments referencing files
909 for arg_string in arg_strings:
911 # for regular arguments, just add them back into the list
912 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
913 new_arg_strings.append(arg_string)
915 # replace arguments referencing files with the file content
918 args_file = open(arg_string[1:])
921 for arg_line in args_file.read().splitlines():
922 for arg in self.convert_arg_line_to_args(arg_line):
923 arg_strings.append(arg)
924 arg_strings = self._read_args_from_files(arg_strings)
925 new_arg_strings.extend(arg_strings)
929 err = sys.exc_info()[1]
932 # return the modified argument list
933 return new_arg_strings
935 def convert_arg_line_to_args(self, arg_line):
936 return [arg for arg in arg_line.split() if arg.strip()]
939 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
941 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
942 parser.add_argument('--version', action='version', version=p2pool.__version__)
943 parser.add_argument('--net',
944 help='use specified network (default: bitcoin)',
945 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
946 parser.add_argument('--testnet',
947 help='''use the network's testnet''',
948 action='store_const', const=True, default=False, dest='testnet')
949 parser.add_argument('--debug',
950 help='enable debugging mode',
951 action='store_const', const=True, default=False, dest='debug')
952 parser.add_argument('-a', '--address',
953 help='generate payouts to this address (default: <address requested from bitcoind>)',
954 type=str, action='store', default=None, dest='address')
955 parser.add_argument('--datadir',
956 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
957 type=str, action='store', default=None, dest='datadir')
958 parser.add_argument('--logfile',
959 help='''log to this file (default: data/<NET>/log)''',
960 type=str, action='store', default=None, dest='logfile')
961 parser.add_argument('--merged',
962 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
963 type=str, action='append', default=[], dest='merged_urls')
964 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
965 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
966 type=float, action='store', default=0.5, dest='donation_percentage')
967 parser.add_argument('--iocp',
968 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
969 action='store_true', default=False, dest='iocp')
970 parser.add_argument('--irc-announce',
971 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
972 action='store_true', default=False, dest='irc_announce')
973 parser.add_argument('--no-bugreport',
974 help='disable submitting caught exceptions to the author',
975 action='store_true', default=False, dest='no_bugreport')
977 p2pool_group = parser.add_argument_group('p2pool interface')
978 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
979 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
980 type=int, action='store', default=None, dest='p2pool_port')
981 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
982 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
983 type=str, action='append', default=[], dest='p2pool_nodes')
984 parser.add_argument('--disable-upnp',
985 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
986 action='store_false', default=True, dest='upnp')
987 p2pool_group.add_argument('--max-conns', metavar='CONNS',
988 help='maximum incoming connections (default: 40)',
989 type=int, action='store', default=40, dest='p2pool_conns')
991 worker_group = parser.add_argument_group('worker interface')
992 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
993 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
994 type=str, action='store', default=None, dest='worker_endpoint')
995 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
996 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
997 type=float, action='store', default=0, dest='worker_fee')
999 bitcoind_group = parser.add_argument_group('bitcoind interface')
1000 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
1001 help='connect to this address (default: 127.0.0.1)',
1002 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
1003 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
1004 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
1005 type=int, action='store', default=None, dest='bitcoind_rpc_port')
1006 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
1007 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
1008 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1010 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1011 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1012 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1014 args = parser.parse_args()
1019 net_name = args.net_name + ('_testnet' if args.testnet else '')
1020 net = networks.nets[net_name]
1022 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1023 if not os.path.exists(datadir_path):
1024 os.makedirs(datadir_path)
1026 if len(args.bitcoind_rpc_userpass) > 2:
1027 parser.error('a maximum of two arguments are allowed')
1028 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1030 if args.bitcoind_rpc_password is None:
1031 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1032 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1033 conf_path = net.PARENT.CONF_FILE_FUNC()
1034 if not os.path.exists(conf_path):
1035 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1036 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1039 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1040 with open(conf_path, 'rb') as f:
1041 cp = ConfigParser.RawConfigParser()
1042 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1043 for conf_name, var_name, var_type in [
1044 ('rpcuser', 'bitcoind_rpc_username', str),
1045 ('rpcpassword', 'bitcoind_rpc_password', str),
1046 ('rpcport', 'bitcoind_rpc_port', int),
1047 ('port', 'bitcoind_p2p_port', int),
1049 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1050 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1051 if args.bitcoind_rpc_password is None:
1052 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1054 if args.bitcoind_rpc_username is None:
1055 args.bitcoind_rpc_username = ''
1057 if args.bitcoind_rpc_port is None:
1058 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1060 if args.bitcoind_p2p_port is None:
1061 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1063 if args.p2pool_port is None:
1064 args.p2pool_port = net.P2P_PORT
1066 if args.worker_endpoint is None:
1067 worker_endpoint = '', net.WORKER_PORT
1068 elif ':' not in args.worker_endpoint:
1069 worker_endpoint = '', int(args.worker_endpoint)
1071 addr, port = args.worker_endpoint.rsplit(':', 1)
1072 worker_endpoint = addr, int(port)
1074 if args.address is not None:
1076 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1077 except Exception, e:
1078 parser.error('error parsing address: ' + repr(e))
1080 args.pubkey_hash = None
1082 def separate_url(url):
1083 s = urlparse.urlsplit(url)
1084 if '@' not in s.netloc:
1085 parser.error('merged url netloc must contain an "@"')
1086 userpass, new_netloc = s.netloc.rsplit('@', 1)
1087 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1088 merged_urls = map(separate_url, args.merged_urls)
1090 if args.logfile is None:
1091 args.logfile = os.path.join(datadir_path, 'log')
1093 logfile = logging.LogFile(args.logfile)
1094 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1095 sys.stdout = logging.AbortPipe(pipe)
1096 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1097 if hasattr(signal, "SIGUSR1"):
1098 def sigusr1(signum, frame):
1099 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1101 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1102 signal.signal(signal.SIGUSR1, sigusr1)
1103 task.LoopingCall(logfile.reopen).start(5)
1105 class ErrorReporter(object):
1107 self.last_sent = None
1109 def emit(self, eventDict):
1110 if not eventDict["isError"]:
1113 if self.last_sent is not None and time.time() < self.last_sent + 5:
1115 self.last_sent = time.time()
1117 if 'failure' in eventDict:
1118 text = ((eventDict.get('why') or 'Unhandled Error')
1119 + '\n' + eventDict['failure'].getTraceback())
1121 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1123 from twisted.web import client
1125 url='http://u.forre.st/p2pool_error.cgi',
1127 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1129 ).addBoth(lambda x: None)
1130 if not args.no_bugreport:
1131 log.addObserver(ErrorReporter().emit)
1133 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)