1 from __future__ import division
15 if '--iocp' in sys.argv:
16 from twisted.internet import iocpreactor
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
33 work = yield bitcoind.rpc_getmemorypool()
34 except jsonrpc.Error, e:
35 if e.code == -32601: # Method not found
36 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37 raise deferral.RetrySilentlyException()
39 packed_transactions = [x.decode('hex') for x in work['transactions']]
40 defer.returnValue(dict(
41 version=work['version'],
42 previous_block_hash=int(work['previousblockhash'], 16),
43 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
45 subsidy=work['coinbasevalue'],
47 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51 @defer.inlineCallbacks
52 def main(args, net, datadir_path, merged_urls, worker_endpoint):
54 print 'p2pool (version %s)' % (p2pool.__version__,)
57 # connect to bitcoind over JSON-RPC and do initial getmemorypool
58 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
59 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
60 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
61 @deferral.retry('Error while checking Bitcoin connection:', 1)
62 @defer.inlineCallbacks
64 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
65 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
66 raise deferral.RetrySilentlyException()
67 temp_work = yield getwork(bitcoind)
68 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
69 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
70 raise deferral.RetrySilentlyException()
71 defer.returnValue(temp_work)
72 temp_work = yield check()
74 block_height_var = variable.Variable(None)
75 @defer.inlineCallbacks
77 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
79 task.LoopingCall(poll_height).start(60*60)
82 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
83 print ' Current block height: %i' % (block_height_var.value,)
86 # connect to bitcoind over bitcoin-p2p
87 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
88 factory = bitcoin_p2p.ClientFactory(net.PARENT)
89 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
90 yield factory.getProtocol() # waits until handshake is successful
94 print 'Determining payout address...'
95 if args.pubkey_hash is None:
96 address_path = os.path.join(datadir_path, 'cached_payout_address')
98 if os.path.exists(address_path):
99 with open(address_path, 'rb') as f:
100 address = f.read().strip('\r\n')
101 print ' Loaded cached address: %s...' % (address,)
105 if address is not None:
106 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
107 if not res['isvalid'] or not res['ismine']:
108 print ' Cached address is either invalid or not controlled by local bitcoind!'
112 print ' Getting payout address from bitcoind...'
113 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
115 with open(address_path, 'wb') as f:
118 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
120 my_pubkey_hash = args.pubkey_hash
121 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
124 my_share_hashes = set()
125 my_doa_share_hashes = set()
127 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
128 shared_share_hashes = set()
129 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
130 known_verified = set()
131 print "Loading shares..."
132 for i, (mode, contents) in enumerate(ss.get_shares()):
134 if contents.hash in tracker.shares:
136 shared_share_hashes.add(contents.hash)
137 contents.time_seen = 0
138 tracker.add(contents)
139 if len(tracker.shares) % 1000 == 0 and tracker.shares:
140 print " %i" % (len(tracker.shares),)
141 elif mode == 'verified_hash':
142 known_verified.add(contents)
144 raise AssertionError()
145 print " ...inserting %i verified shares..." % (len(known_verified),)
146 for h in known_verified:
147 if h not in tracker.shares:
148 ss.forget_verified_share(h)
150 tracker.verified.add(tracker.shares[h])
151 print " ...done loading %i shares!" % (len(tracker.shares),)
153 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
154 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
155 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
157 print 'Initializing work...'
162 bitcoind_work = variable.Variable(None)
164 @defer.inlineCallbacks
166 work = yield getwork(bitcoind)
167 bitcoind_work.set(dict(
168 version=work['version'],
169 previous_block=work['previous_block_hash'],
171 coinbaseflags=work['coinbaseflags'],
173 transactions=work['transactions'],
174 merkle_link=work['merkle_link'],
175 subsidy=work['subsidy'],
176 clock_offset=time.time() - work['time'],
177 last_update=time.time(),
179 yield poll_bitcoind()
181 @defer.inlineCallbacks
184 flag = factory.new_block.get_deferred()
186 yield poll_bitcoind()
189 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
194 best_block_header = variable.Variable(None)
195 def handle_header(new_header):
196 # check that header matches current target
197 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
199 bitcoind_best_block = bitcoind_work.value['previous_block']
200 if (best_block_header.value is None
202 new_header['previous_block'] == bitcoind_best_block and
203 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
204 ) # new is child of current and previous is current
206 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
207 best_block_header.value['previous_block'] != bitcoind_best_block
208 )): # new is current and previous is not a child of current
209 best_block_header.set(new_header)
210 @defer.inlineCallbacks
212 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
213 bitcoind_work.changed.watch(lambda _: poll_header())
218 merged_work = variable.Variable({})
220 @defer.inlineCallbacks
221 def set_merged_work(merged_url, merged_userpass):
222 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
224 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
225 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
226 hash=int(auxblock['hash'], 16),
227 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
228 merged_proxy=merged_proxy,
230 yield deferral.sleep(1)
231 for merged_url, merged_userpass in merged_urls:
232 set_merged_work(merged_url, merged_userpass)
234 @merged_work.changed.watch
235 def _(new_merged_work):
236 print 'Got new merged mining work!'
240 current_work = variable.Variable(None)
242 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
243 requested = expiring_dict.ExpiringDict(300)
244 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
246 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
248 t = dict(bitcoind_work.value)
250 if (best_block_header.value is not None and
251 best_block_header.value['previous_block'] == t['previous_block'] and
252 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
253 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
254 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
256 version=best_block_header.value['version'],
257 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
258 bits=best_block_header.value['bits'], # not always true
260 time=best_block_header.value['timestamp'] + 600, # better way?
262 merkle_link=bitcoin_data.calculate_merkle_link([0], 0),
263 subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
264 clock_offset=current_work.value['clock_offset'],
265 last_update=current_work.value['last_update'],
268 t['best_share_hash'] = best
269 t['mm_chains'] = merged_work.value
273 for peer2, share_hash in desired:
274 if share_hash not in tracker.tails: # was received in the time tracker.think was running
276 last_request_time, count = requested.get(share_hash, (None, 0))
277 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
279 potential_peers = set()
280 for head in tracker.tails[share_hash]:
281 potential_peers.update(peer_heads.get(head, set()))
282 potential_peers = [peer for peer in potential_peers if peer.connected2]
283 if count == 0 and peer2 is not None and peer2.connected2:
286 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
290 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
294 stops=list(set(tracker.heads) | set(
295 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
298 requested[share_hash] = t, count + 1
299 bitcoind_work.changed.watch(lambda _: compute_work())
300 merged_work.changed.watch(lambda _: compute_work())
301 best_block_header.changed.watch(lambda _: compute_work())
306 lp_signal = variable.Event()
308 @current_work.transitioned.watch
309 def _(before, after):
310 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
311 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
318 # setup p2p logic and join p2pool network
320 class Node(p2p.Node):
321 def handle_shares(self, shares, peer):
323 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
327 if share.hash in tracker.shares:
328 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
333 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
337 if shares and peer is not None:
338 peer_heads.setdefault(shares[0].hash, set()).add(peer)
344 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
346 def handle_share_hashes(self, hashes, peer):
349 for share_hash in hashes:
350 if share_hash in tracker.shares:
352 last_request_time, count = requested.get(share_hash, (None, 0))
353 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
355 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
356 get_hashes.append(share_hash)
357 requested[share_hash] = t, count + 1
359 if hashes and peer is not None:
360 peer_heads.setdefault(hashes[0], set()).add(peer)
362 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
364 def handle_get_shares(self, hashes, parents, stops, peer):
365 parents = min(parents, 1000//len(hashes))
368 for share_hash in hashes:
369 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
370 if share.hash in stops:
373 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
376 def handle_bestblock(self, header, peer):
377 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
378 raise p2p.PeerMisbehavingError('received block header fails PoW test')
379 handle_header(header)
381 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
382 def submit_block_p2p(block):
383 if factory.conn.value is None:
384 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
385 raise deferral.RetrySilentlyException()
386 factory.conn.value.send_block(block=block)
388 @deferral.retry('Error submitting block: (will retry)', 10, 10)
389 @defer.inlineCallbacks
390 def submit_block_rpc(block, ignore_failure):
391 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
392 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
393 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
394 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
396 def submit_block(block, ignore_failure):
397 submit_block_p2p(block)
398 submit_block_rpc(block, ignore_failure)
400 @tracker.verified.added.watch
402 if share.pow_hash <= share.header['bits'].target:
403 submit_block(share.as_block(tracker), ignore_failure=True)
405 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
408 if (get_height_rel_highest(share.header['previous_block']) > -5 or
409 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
410 broadcast_share(share.hash)
412 reactor.callLater(5, spread) # so get_height_rel_highest can update
414 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
416 @defer.inlineCallbacks
419 ip, port = x.split(':')
420 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
422 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
425 if os.path.exists(os.path.join(datadir_path, 'addrs')):
427 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
428 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
430 print >>sys.stderr, 'error parsing addrs'
431 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
433 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
435 print >>sys.stderr, "error reading addrs.txt"
436 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
439 if addr not in addrs:
440 addrs[addr] = (0, time.time(), time.time())
444 connect_addrs = set()
445 for addr_df in map(parse, args.p2pool_nodes):
447 connect_addrs.add((yield addr_df))
452 best_share_hash_func=lambda: current_work.value['best_share_hash'],
453 port=args.p2pool_port,
456 connect_addrs=connect_addrs,
457 max_incoming_conns=args.p2pool_conns,
462 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
463 f.write(json.dumps(p2p_node.addr_store.items()))
464 task.LoopingCall(save_addrs).start(60)
466 @best_block_header.changed.watch
468 for peer in p2p_node.peers.itervalues():
469 peer.send_bestblock(header=header)
471 def broadcast_share(share_hash):
473 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
474 if share.hash in shared_share_hashes:
476 shared_share_hashes.add(share.hash)
479 for peer in p2p_node.peers.itervalues():
480 peer.sendShares([share for share in shares if share.peer is not peer])
482 # send share when the chain changes to their chain
483 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
486 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
488 if share.hash in tracker.verified.shares:
489 ss.add_verified_hash(share.hash)
490 task.LoopingCall(save_shares).start(60)
496 @defer.inlineCallbacks
500 is_lan, lan_ip = yield ipdiscover.get_local_ip()
502 pm = yield portmapper.get_port_mapper()
503 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
504 except defer.TimeoutError:
508 log.err(None, 'UPnP error:')
509 yield deferral.sleep(random.expovariate(1/120))
512 # start listening for workers with a JSON-RPC server
514 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
518 removed_unstales_var = variable.Variable((0, 0, 0))
519 removed_doa_unstales_var = variable.Variable(0)
520 @tracker.verified.removed.watch
522 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
523 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
524 removed_unstales_var.set((
525 removed_unstales_var.value[0] + 1,
526 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
527 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
529 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
530 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
532 def get_stale_counts():
533 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
534 my_shares = len(my_share_hashes)
535 my_doa_shares = len(my_doa_share_hashes)
536 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
537 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
538 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
539 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
540 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
542 my_shares_not_in_chain = my_shares - my_shares_in_chain
543 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
545 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
548 pseudoshare_received = variable.Event()
549 share_received = variable.Event()
550 local_rate_monitor = math.RateMonitor(10*60)
552 class WorkerBridge(worker_interface.WorkerBridge):
554 worker_interface.WorkerBridge.__init__(self)
555 self.new_work_event = lp_signal
556 self.recent_shares_ts_work = []
558 def get_user_details(self, request):
559 user = request.getUser() if request.getUser() is not None else ''
561 desired_pseudoshare_target = None
563 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
565 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
569 desired_share_target = 2**256 - 1
571 user, min_diff_str = user.rsplit('/', 1)
573 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
577 if random.uniform(0, 100) < args.worker_fee:
578 pubkey_hash = my_pubkey_hash
581 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
583 pubkey_hash = my_pubkey_hash
585 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
587 def preprocess_request(self, request):
588 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
589 return pubkey_hash, desired_share_target, desired_pseudoshare_target
591 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
592 if len(p2p_node.peers) == 0 and net.PERSIST:
593 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
594 if current_work.value['best_share_hash'] is None and net.PERSIST:
595 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
596 if time.time() > current_work.value['last_update'] + 60:
597 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
599 if current_work.value['mm_chains']:
600 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
601 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
602 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
603 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
607 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
613 share_info, generate_tx = p2pool_data.Share.generate_transaction(
616 previous_share_hash=current_work.value['best_share_hash'],
617 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
618 nonce=random.randrange(2**32),
619 pubkey_hash=pubkey_hash,
620 subsidy=current_work.value['subsidy'],
621 donation=math.perfect_round(65535*args.donation_percentage/100),
622 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
623 'orphan' if orphans > orphans_recorded_in_chain else
624 'doa' if doas > doas_recorded_in_chain else
626 )(*get_stale_counts()),
629 block_target=current_work.value['bits'].target,
630 desired_timestamp=int(time.time() - current_work.value['clock_offset']),
631 desired_target=desired_share_target,
632 ref_merkle_link=dict(branch=[], index=0),
636 if desired_pseudoshare_target is None:
638 if len(self.recent_shares_ts_work) == 50:
639 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
641 target = min(target, int(2**256/hash_rate))
643 target = desired_pseudoshare_target
644 target = max(target, share_info['bits'].target)
645 for aux_work in current_work.value['mm_chains'].itervalues():
646 target = max(target, aux_work['target'])
647 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
649 transactions = [generate_tx] + list(current_work.value['transactions'])
650 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
651 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work.value['merkle_link'])
653 getwork_time = time.time()
654 merkle_link = current_work.value['merkle_link']
656 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
657 bitcoin_data.target_to_difficulty(target),
658 bitcoin_data.target_to_difficulty(share_info['bits'].target),
659 current_work.value['subsidy']*1e-8, net.PARENT.SYMBOL,
660 len(current_work.value['transactions']),
663 bits = current_work.value['bits']
664 previous_block = current_work.value['previous_block']
665 ba = bitcoin_getwork.BlockAttempt(
666 version=current_work.value['version'],
667 previous_block=current_work.value['previous_block'],
668 merkle_root=merkle_root,
669 timestamp=current_work.value['time'],
670 bits=current_work.value['bits'],
674 received_header_hashes = set()
676 def got_response(header, request):
677 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
678 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
680 if pow_hash <= header['bits'].target or p2pool.DEBUG:
681 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
682 if pow_hash <= header['bits'].target:
684 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
687 log.err(None, 'Error while processing potential block:')
689 user, _, _, _ = self.get_user_details(request)
690 assert header['merkle_root'] == merkle_root
691 assert header['previous_block'] == previous_block
692 assert header['bits'] == bits
694 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
696 for aux_work, index, hashes in mm_later:
698 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
699 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
700 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
701 bitcoin_data.aux_pow_type.pack(dict(
704 block_hash=header_hash,
705 merkle_link=merkle_link,
707 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
708 parent_block_header=header,
713 if result != (pow_hash <= aux_work['target']):
714 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
716 print 'Merged block submittal result: %s' % (result,)
719 log.err(err, 'Error submitting merged block:')
721 log.err(None, 'Error while processing merged mining POW:')
723 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
724 min_header = dict(header);del min_header['merkle_root']
725 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
726 share = p2pool_data.Share(net, None, dict(
727 min_header=min_header, share_info=share_info, hash_link=hash_link,
728 ref_merkle_link=dict(branch=[], index=0),
729 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
731 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
733 p2pool_data.format_hash(share.hash),
734 p2pool_data.format_hash(share.previous_hash),
735 time.time() - getwork_time,
736 ' DEAD ON ARRIVAL' if not on_time else '',
738 my_share_hashes.add(share.hash)
740 my_doa_share_hashes.add(share.hash)
744 tracker.verified.add(share)
748 if pow_hash <= header['bits'].target or p2pool.DEBUG:
749 for peer in p2p_node.peers.itervalues():
750 peer.sendShares([share])
751 shared_share_hashes.add(share.hash)
753 log.err(None, 'Error forwarding block solution:')
755 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
757 if pow_hash > target:
758 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
759 print ' Hash: %56x' % (pow_hash,)
760 print ' Target: %56x' % (target,)
761 elif header_hash in received_header_hashes:
762 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
764 received_header_hashes.add(header_hash)
766 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
767 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
768 while len(self.recent_shares_ts_work) > 50:
769 self.recent_shares_ts_work.pop(0)
770 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
774 return ba, got_response
776 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
778 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
779 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
781 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
783 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
791 print 'Started successfully!'
792 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
793 if args.donation_percentage > 0.51:
794 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
795 elif args.donation_percentage < 0.49:
796 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
798 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
799 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
803 if hasattr(signal, 'SIGALRM'):
804 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
805 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
807 signal.siginterrupt(signal.SIGALRM, False)
808 task.LoopingCall(signal.alarm, 30).start(1)
810 if args.irc_announce:
811 from twisted.words.protocols import irc
812 class IRCClient(irc.IRCClient):
813 nickname = 'p2pool%02i' % (random.randrange(100),)
814 channel = net.ANNOUNCE_CHANNEL
815 def lineReceived(self, line):
818 irc.IRCClient.lineReceived(self, line)
820 irc.IRCClient.signedOn(self)
821 self.factory.resetDelay()
822 self.join(self.channel)
823 @defer.inlineCallbacks
824 def new_share(share):
825 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
826 yield deferral.sleep(random.expovariate(1/60))
827 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
828 if message not in self.recent_messages:
829 self.say(self.channel, message)
830 self._remember_message(message)
831 self.watch_id = tracker.verified.added.watch(new_share)
832 self.recent_messages = []
833 def _remember_message(self, message):
834 self.recent_messages.append(message)
835 while len(self.recent_messages) > 100:
836 self.recent_messages.pop(0)
837 def privmsg(self, user, channel, message):
838 if channel == self.channel:
839 self._remember_message(message)
840 def connectionLost(self, reason):
841 tracker.verified.added.unwatch(self.watch_id)
842 print 'IRC connection lost:', reason.getErrorMessage()
843 class IRCClientFactory(protocol.ReconnectingClientFactory):
845 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
847 @defer.inlineCallbacks
852 yield deferral.sleep(3)
854 if time.time() > current_work.value['last_update'] + 60:
855 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
857 height = tracker.get_height(current_work.value['best_share_hash'])
858 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
860 len(tracker.verified.shares),
863 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
864 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
866 datums, dt = local_rate_monitor.get_datums_in_last()
867 my_att_s = sum(datum['work']/dt for datum in datums)
868 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
869 math.format(int(my_att_s)),
871 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
872 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
876 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
877 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
878 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
880 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
881 shares, stale_orphan_shares, stale_doa_shares,
882 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
883 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
884 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
886 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
887 math.format(int(real_att_s)),
889 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
892 for warning in p2pool_data.get_warnings(tracker, current_work, net):
893 print >>sys.stderr, '#'*40
894 print >>sys.stderr, '>>> Warning: ' + warning
895 print >>sys.stderr, '#'*40
897 if this_str != last_str or time.time() > last_time + 15:
900 last_time = time.time()
906 log.err(None, 'Fatal error:')
909 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
911 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
912 parser.add_argument('--version', action='version', version=p2pool.__version__)
913 parser.add_argument('--net',
914 help='use specified network (default: bitcoin)',
915 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
916 parser.add_argument('--testnet',
917 help='''use the network's testnet''',
918 action='store_const', const=True, default=False, dest='testnet')
919 parser.add_argument('--debug',
920 help='enable debugging mode',
921 action='store_const', const=True, default=False, dest='debug')
922 parser.add_argument('-a', '--address',
923 help='generate payouts to this address (default: <address requested from bitcoind>)',
924 type=str, action='store', default=None, dest='address')
925 parser.add_argument('--datadir',
926 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
927 type=str, action='store', default=None, dest='datadir')
928 parser.add_argument('--logfile',
929 help='''log to this file (default: data/<NET>/log)''',
930 type=str, action='store', default=None, dest='logfile')
931 parser.add_argument('--merged',
932 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
933 type=str, action='append', default=[], dest='merged_urls')
934 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
935 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
936 type=float, action='store', default=0.5, dest='donation_percentage')
937 parser.add_argument('--iocp',
938 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
939 action='store_true', default=False, dest='iocp')
940 parser.add_argument('--irc-announce',
941 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
942 action='store_true', default=False, dest='irc_announce')
943 parser.add_argument('--no-bugreport',
944 help='disable submitting caught exceptions to the author',
945 action='store_true', default=False, dest='no_bugreport')
947 p2pool_group = parser.add_argument_group('p2pool interface')
948 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
949 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
950 type=int, action='store', default=None, dest='p2pool_port')
951 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
952 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
953 type=str, action='append', default=[], dest='p2pool_nodes')
954 parser.add_argument('--disable-upnp',
955 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
956 action='store_false', default=True, dest='upnp')
957 p2pool_group.add_argument('--max-conns', metavar='CONNS',
958 help='maximum incoming connections (default: 40)',
959 type=int, action='store', default=40, dest='p2pool_conns')
961 worker_group = parser.add_argument_group('worker interface')
962 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
963 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
964 type=str, action='store', default=None, dest='worker_endpoint')
965 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
966 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
967 type=float, action='store', default=0, dest='worker_fee')
969 bitcoind_group = parser.add_argument_group('bitcoind interface')
970 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
971 help='connect to this address (default: 127.0.0.1)',
972 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
973 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
974 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
975 type=int, action='store', default=None, dest='bitcoind_rpc_port')
976 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
977 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
978 type=int, action='store', default=None, dest='bitcoind_p2p_port')
980 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
981 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
982 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
984 args = parser.parse_args()
989 net_name = args.net_name + ('_testnet' if args.testnet else '')
990 net = networks.nets[net_name]
992 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
993 if not os.path.exists(datadir_path):
994 os.makedirs(datadir_path)
996 if len(args.bitcoind_rpc_userpass) > 2:
997 parser.error('a maximum of two arguments are allowed')
998 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1000 if args.bitcoind_rpc_password is None:
1001 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1002 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1003 conf_path = net.PARENT.CONF_FILE_FUNC()
1004 if not os.path.exists(conf_path):
1005 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1006 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1009 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1010 with open(conf_path, 'rb') as f:
1011 cp = ConfigParser.RawConfigParser()
1012 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1013 for conf_name, var_name, var_type in [
1014 ('rpcuser', 'bitcoind_rpc_username', str),
1015 ('rpcpassword', 'bitcoind_rpc_password', str),
1016 ('rpcport', 'bitcoind_rpc_port', int),
1017 ('port', 'bitcoind_p2p_port', int),
1019 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1020 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1021 if args.bitcoind_rpc_password is None:
1022 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1024 if args.bitcoind_rpc_username is None:
1025 args.bitcoind_rpc_username = ''
1027 if args.bitcoind_rpc_port is None:
1028 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1030 if args.bitcoind_p2p_port is None:
1031 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1033 if args.p2pool_port is None:
1034 args.p2pool_port = net.P2P_PORT
1036 if args.worker_endpoint is None:
1037 worker_endpoint = '', net.WORKER_PORT
1038 elif ':' not in args.worker_endpoint:
1039 worker_endpoint = '', int(args.worker_endpoint)
1041 addr, port = args.worker_endpoint.rsplit(':', 1)
1042 worker_endpoint = addr, int(port)
1044 if args.address is not None:
1046 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1047 except Exception, e:
1048 parser.error('error parsing address: ' + repr(e))
1050 args.pubkey_hash = None
1052 def separate_url(url):
1053 s = urlparse.urlsplit(url)
1054 if '@' not in s.netloc:
1055 parser.error('merged url netloc must contain an "@"')
1056 userpass, new_netloc = s.netloc.rsplit('@', 1)
1057 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1058 merged_urls = map(separate_url, args.merged_urls)
1060 if args.logfile is None:
1061 args.logfile = os.path.join(datadir_path, 'log')
1063 logfile = logging.LogFile(args.logfile)
1064 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1065 sys.stdout = logging.AbortPipe(pipe)
1066 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1067 if hasattr(signal, "SIGUSR1"):
1068 def sigusr1(signum, frame):
1069 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1071 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1072 signal.signal(signal.SIGUSR1, sigusr1)
1073 task.LoopingCall(logfile.reopen).start(5)
1075 class ErrorReporter(object):
1077 self.last_sent = None
1079 def emit(self, eventDict):
1080 if not eventDict["isError"]:
1083 if self.last_sent is not None and time.time() < self.last_sent + 5:
1085 self.last_sent = time.time()
1087 if 'failure' in eventDict:
1088 text = ((eventDict.get('why') or 'Unhandled Error')
1089 + '\n' + eventDict['failure'].getTraceback())
1091 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1093 from twisted.web import client
1095 url='http://u.forre.st/p2pool_error.cgi',
1097 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1099 ).addBoth(lambda x: None)
1100 if not args.no_bugreport:
1101 log.addObserver(ErrorReporter().emit)
1103 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)