1 from __future__ import division
15 if '--iocp' in sys.argv:
16 from twisted.internet import iocpreactor
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
33 work = yield bitcoind.rpc_getmemorypool()
34 except jsonrpc.Error, e:
35 if e.code == -32601: # Method not found
36 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37 raise deferral.RetrySilentlyException()
39 packed_transactions = [x.decode('hex') for x in work['transactions']]
40 defer.returnValue(dict(
41 version=work['version'],
42 previous_block_hash=int(work['previousblockhash'], 16),
43 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44 merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45 subsidy=work['coinbasevalue'],
47 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51 class WorkerBridge(worker_interface.WorkerBridge):
52 def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes):
53 worker_interface.WorkerBridge.__init__(self)
54 self.new_work_event = lp_signal
55 self.recent_shares_ts_work = []
57 self.my_pubkey_hash = my_pubkey_hash
59 self.donation_percentage = donation_percentage
60 self.current_work = current_work
61 self.merged_work = merged_work
62 self.best_share_var = best_share_var
63 self.tracker = tracker
64 self.my_share_hashes = my_share_hashes
65 self.my_doa_share_hashes = my_doa_share_hashes
66 self.worker_fee = worker_fee
67 self.p2p_node = p2p_node
68 self.submit_block = submit_block
69 self.set_best_share = set_best_share
70 self.shared_share_hashes = shared_share_hashes
72 self.pseudoshare_received = variable.Event()
73 self.share_received = variable.Event()
74 self.local_rate_monitor = math.RateMonitor(10*60)
76 self.removed_unstales_var = variable.Variable((0, 0, 0))
77 self.removed_doa_unstales_var = variable.Variable(0)
79 @tracker.verified.removed.watch
81 if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
82 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
83 self.removed_unstales_var.set((
84 self.removed_unstales_var.value[0] + 1,
85 self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
86 self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
88 if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
89 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
91 def get_stale_counts(self):
92 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
93 my_shares = len(self.my_share_hashes)
94 my_doa_shares = len(self.my_doa_share_hashes)
95 delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
96 my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
97 my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
98 orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
99 doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
101 my_shares_not_in_chain = my_shares - my_shares_in_chain
102 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
104 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
106 def get_user_details(self, request):
107 user = request.getUser() if request.getUser() is not None else ''
109 desired_pseudoshare_target = None
111 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
113 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
117 desired_share_target = 2**256 - 1
119 user, min_diff_str = user.rsplit('/', 1)
121 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
125 if random.uniform(0, 100) < self.worker_fee:
126 pubkey_hash = self.my_pubkey_hash
129 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
131 pubkey_hash = self.my_pubkey_hash
133 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
135 def preprocess_request(self, request):
136 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
137 return pubkey_hash, desired_share_target, desired_pseudoshare_target
139 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
140 if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
141 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
142 if self.best_share_var.value is None and self.net.PERSIST:
143 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
144 if time.time() > self.current_work.value['last_update'] + 60:
145 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
147 if self.merged_work.value:
148 tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
149 mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
150 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
151 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
155 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
161 share_info, generate_tx = p2pool_data.Share.generate_transaction(
162 tracker=self.tracker,
164 previous_share_hash=self.best_share_var.value,
165 coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
166 nonce=random.randrange(2**32),
167 pubkey_hash=pubkey_hash,
168 subsidy=self.current_work.value['subsidy'],
169 donation=math.perfect_round(65535*self.donation_percentage/100),
170 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
171 'orphan' if orphans > orphans_recorded_in_chain else
172 'doa' if doas > doas_recorded_in_chain else
174 )(*self.get_stale_counts()),
177 block_target=self.current_work.value['bits'].target,
178 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
179 desired_target=desired_share_target,
180 ref_merkle_link=dict(branch=[], index=0),
184 if desired_pseudoshare_target is None:
186 if len(self.recent_shares_ts_work) == 50:
187 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
189 target = min(target, int(2**256/hash_rate))
191 target = desired_pseudoshare_target
192 target = max(target, share_info['bits'].target)
193 for aux_work in self.merged_work.value.itervalues():
194 target = max(target, aux_work['target'])
195 target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
197 transactions = [generate_tx] + list(self.current_work.value['transactions'])
198 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
199 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
201 getwork_time = time.time()
202 merkle_link = self.current_work.value['merkle_link']
204 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
205 bitcoin_data.target_to_difficulty(target),
206 bitcoin_data.target_to_difficulty(share_info['bits'].target),
207 self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
208 len(self.current_work.value['transactions']),
211 bits = self.current_work.value['bits']
212 previous_block = self.current_work.value['previous_block']
213 ba = bitcoin_getwork.BlockAttempt(
214 version=self.current_work.value['version'],
215 previous_block=self.current_work.value['previous_block'],
216 merkle_root=merkle_root,
217 timestamp=self.current_work.value['time'],
218 bits=self.current_work.value['bits'],
222 received_header_hashes = set()
224 def got_response(header, request):
225 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
226 pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
228 if pow_hash <= header['bits'].target or p2pool.DEBUG:
229 self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
230 if pow_hash <= header['bits'].target:
232 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
235 log.err(None, 'Error while processing potential block:')
237 user, _, _, _ = self.get_user_details(request)
238 assert header['merkle_root'] == merkle_root
239 assert header['previous_block'] == previous_block
240 assert header['bits'] == bits
242 on_time = self.best_share_var.value == share_info['share_data']['previous_share_hash']
244 for aux_work, index, hashes in mm_later:
246 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
247 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
248 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
249 bitcoin_data.aux_pow_type.pack(dict(
252 block_hash=header_hash,
253 merkle_link=merkle_link,
255 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
256 parent_block_header=header,
261 if result != (pow_hash <= aux_work['target']):
262 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
264 print 'Merged block submittal result: %s' % (result,)
267 log.err(err, 'Error submitting merged block:')
269 log.err(None, 'Error while processing merged mining POW:')
271 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
272 min_header = dict(header);del min_header['merkle_root']
273 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
274 share = p2pool_data.Share(self.net, None, dict(
275 min_header=min_header, share_info=share_info, hash_link=hash_link,
276 ref_merkle_link=dict(branch=[], index=0),
277 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
279 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
281 p2pool_data.format_hash(share.hash),
282 p2pool_data.format_hash(share.previous_hash),
283 time.time() - getwork_time,
284 ' DEAD ON ARRIVAL' if not on_time else '',
286 self.my_share_hashes.add(share.hash)
288 self.my_doa_share_hashes.add(share.hash)
290 self.tracker.add(share)
292 self.tracker.verified.add(share)
293 self.set_best_share()
296 if pow_hash <= header['bits'].target or p2pool.DEBUG:
297 for peer in self.p2p_node.peers.itervalues():
298 peer.sendShares([share])
299 self.shared_share_hashes.add(share.hash)
301 log.err(None, 'Error forwarding block solution:')
303 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
305 if pow_hash > target:
306 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
307 print ' Hash: %56x' % (pow_hash,)
308 print ' Target: %56x' % (target,)
309 elif header_hash in received_header_hashes:
310 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
312 received_header_hashes.add(header_hash)
314 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
315 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
316 while len(self.recent_shares_ts_work) > 50:
317 self.recent_shares_ts_work.pop(0)
318 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
322 return ba, got_response
324 @defer.inlineCallbacks
325 def main(args, net, datadir_path, merged_urls, worker_endpoint):
327 print 'p2pool (version %s)' % (p2pool.__version__,)
330 # connect to bitcoind over JSON-RPC and do initial getmemorypool
331 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
332 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
333 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
334 @deferral.retry('Error while checking Bitcoin connection:', 1)
335 @defer.inlineCallbacks
337 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
338 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
339 raise deferral.RetrySilentlyException()
340 temp_work = yield getwork(bitcoind)
341 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
342 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
343 raise deferral.RetrySilentlyException()
344 defer.returnValue(temp_work)
345 temp_work = yield check()
347 block_height_var = variable.Variable(None)
348 @defer.inlineCallbacks
350 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
352 task.LoopingCall(poll_height).start(60*60)
355 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
356 print ' Current block height: %i' % (block_height_var.value,)
359 # connect to bitcoind over bitcoin-p2p
360 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
361 factory = bitcoin_p2p.ClientFactory(net.PARENT)
362 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
363 yield factory.getProtocol() # waits until handshake is successful
367 print 'Determining payout address...'
368 if args.pubkey_hash is None:
369 address_path = os.path.join(datadir_path, 'cached_payout_address')
371 if os.path.exists(address_path):
372 with open(address_path, 'rb') as f:
373 address = f.read().strip('\r\n')
374 print ' Loaded cached address: %s...' % (address,)
378 if address is not None:
379 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
380 if not res['isvalid'] or not res['ismine']:
381 print ' Cached address is either invalid or not controlled by local bitcoind!'
385 print ' Getting payout address from bitcoind...'
386 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
388 with open(address_path, 'wb') as f:
391 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
393 my_pubkey_hash = args.pubkey_hash
394 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
397 my_share_hashes = set()
398 my_doa_share_hashes = set()
400 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
401 shared_share_hashes = set()
402 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
403 known_verified = set()
404 print "Loading shares..."
405 for i, (mode, contents) in enumerate(ss.get_shares()):
407 if contents.hash in tracker.shares:
409 shared_share_hashes.add(contents.hash)
410 contents.time_seen = 0
411 tracker.add(contents)
412 if len(tracker.shares) % 1000 == 0 and tracker.shares:
413 print " %i" % (len(tracker.shares),)
414 elif mode == 'verified_hash':
415 known_verified.add(contents)
417 raise AssertionError()
418 print " ...inserting %i verified shares..." % (len(known_verified),)
419 for h in known_verified:
420 if h not in tracker.shares:
421 ss.forget_verified_share(h)
423 tracker.verified.add(tracker.shares[h])
424 print " ...done loading %i shares!" % (len(tracker.shares),)
426 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
427 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
428 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
430 print 'Initializing work...'
435 bitcoind_work = variable.Variable(None)
437 @defer.inlineCallbacks
439 work = yield getwork(bitcoind)
440 bitcoind_work.set(dict(
441 version=work['version'],
442 previous_block=work['previous_block_hash'],
444 coinbaseflags=work['coinbaseflags'],
446 transactions=work['transactions'],
447 merkle_link=work['merkle_link'],
448 subsidy=work['subsidy'],
449 clock_offset=time.time() - work['time'],
450 last_update=time.time(),
452 yield poll_bitcoind()
454 @defer.inlineCallbacks
457 flag = factory.new_block.get_deferred()
459 yield poll_bitcoind()
462 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
467 best_block_header = variable.Variable(None)
468 def handle_header(new_header):
469 # check that header matches current target
470 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
472 bitcoind_best_block = bitcoind_work.value['previous_block']
473 if (best_block_header.value is None
475 new_header['previous_block'] == bitcoind_best_block and
476 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
477 ) # new is child of current and previous is current
479 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
480 best_block_header.value['previous_block'] != bitcoind_best_block
481 )): # new is current and previous is not a child of current
482 best_block_header.set(new_header)
483 @defer.inlineCallbacks
485 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
486 bitcoind_work.changed.watch(lambda _: poll_header())
491 merged_work = variable.Variable({})
493 @defer.inlineCallbacks
494 def set_merged_work(merged_url, merged_userpass):
495 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
497 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
498 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
499 hash=int(auxblock['hash'], 16),
500 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
501 merged_proxy=merged_proxy,
503 yield deferral.sleep(1)
504 for merged_url, merged_userpass in merged_urls:
505 set_merged_work(merged_url, merged_userpass)
507 @merged_work.changed.watch
508 def _(new_merged_work):
509 print 'Got new merged mining work!'
513 current_work = variable.Variable(None)
515 t = dict(bitcoind_work.value)
517 if (best_block_header.value is not None and
518 best_block_header.value['previous_block'] == t['previous_block'] and
519 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
520 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
521 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
523 version=best_block_header.value['version'],
524 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
525 bits=best_block_header.value['bits'], # not always true
527 time=best_block_header.value['timestamp'] + 600, # better way?
529 merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
530 subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
531 clock_offset=current_work.value['clock_offset'],
532 last_update=current_work.value['last_update'],
536 bitcoind_work.changed.watch(lambda _: compute_work())
537 best_block_header.changed.watch(lambda _: compute_work())
542 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
543 requested = expiring_dict.ExpiringDict(300)
544 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
546 best_share_var = variable.Variable(None)
547 def set_best_share():
548 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
550 best_share_var.set(best)
553 for peer2, share_hash in desired:
554 if share_hash not in tracker.tails: # was received in the time tracker.think was running
556 last_request_time, count = requested.get(share_hash, (None, 0))
557 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
559 potential_peers = set()
560 for head in tracker.tails[share_hash]:
561 potential_peers.update(peer_heads.get(head, set()))
562 potential_peers = [peer for peer in potential_peers if peer.connected2]
563 if count == 0 and peer2 is not None and peer2.connected2:
566 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
570 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
574 stops=list(set(tracker.heads) | set(
575 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
578 requested[share_hash] = t, count + 1
579 bitcoind_work.changed.watch(lambda _: set_best_share())
584 lp_signal = variable.Event()
586 @current_work.transitioned.watch
587 def _(before, after):
588 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
589 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
591 merged_work.changed.watch(lambda _: lp_signal.happened())
592 best_share_var.changed.watch(lambda _: lp_signal.happened())
598 # setup p2p logic and join p2pool network
600 class Node(p2p.Node):
601 def handle_shares(self, shares, peer):
603 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
607 if share.hash in tracker.shares:
608 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
613 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
617 if shares and peer is not None:
618 peer_heads.setdefault(shares[0].hash, set()).add(peer)
624 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
626 def handle_share_hashes(self, hashes, peer):
629 for share_hash in hashes:
630 if share_hash in tracker.shares:
632 last_request_time, count = requested.get(share_hash, (None, 0))
633 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
635 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
636 get_hashes.append(share_hash)
637 requested[share_hash] = t, count + 1
639 if hashes and peer is not None:
640 peer_heads.setdefault(hashes[0], set()).add(peer)
642 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
644 def handle_get_shares(self, hashes, parents, stops, peer):
645 parents = min(parents, 1000//len(hashes))
648 for share_hash in hashes:
649 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
650 if share.hash in stops:
653 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
656 def handle_bestblock(self, header, peer):
657 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
658 raise p2p.PeerMisbehavingError('received block header fails PoW test')
659 handle_header(header)
661 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
662 def submit_block_p2p(block):
663 if factory.conn.value is None:
664 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
665 raise deferral.RetrySilentlyException()
666 factory.conn.value.send_block(block=block)
668 @deferral.retry('Error submitting block: (will retry)', 10, 10)
669 @defer.inlineCallbacks
670 def submit_block_rpc(block, ignore_failure):
671 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
672 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
673 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
674 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
676 def submit_block(block, ignore_failure):
677 submit_block_p2p(block)
678 submit_block_rpc(block, ignore_failure)
680 @tracker.verified.added.watch
682 if share.pow_hash <= share.header['bits'].target:
683 submit_block(share.as_block(tracker), ignore_failure=True)
685 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
688 if (get_height_rel_highest(share.header['previous_block']) > -5 or
689 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
690 broadcast_share(share.hash)
692 reactor.callLater(5, spread) # so get_height_rel_highest can update
694 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
696 @defer.inlineCallbacks
699 ip, port = x.split(':')
700 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
702 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
705 if os.path.exists(os.path.join(datadir_path, 'addrs')):
707 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
708 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
710 print >>sys.stderr, 'error parsing addrs'
711 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
713 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
715 print >>sys.stderr, "error reading addrs.txt"
716 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
719 if addr not in addrs:
720 addrs[addr] = (0, time.time(), time.time())
724 connect_addrs = set()
725 for addr_df in map(parse, args.p2pool_nodes):
727 connect_addrs.add((yield addr_df))
732 best_share_hash_func=lambda: best_share_var.value,
733 port=args.p2pool_port,
736 connect_addrs=connect_addrs,
737 max_incoming_conns=args.p2pool_conns,
742 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
743 f.write(json.dumps(p2p_node.addr_store.items()))
744 task.LoopingCall(save_addrs).start(60)
746 @best_block_header.changed.watch
748 for peer in p2p_node.peers.itervalues():
749 peer.send_bestblock(header=header)
751 def broadcast_share(share_hash):
753 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
754 if share.hash in shared_share_hashes:
756 shared_share_hashes.add(share.hash)
759 for peer in p2p_node.peers.itervalues():
760 peer.sendShares([share for share in shares if share.peer is not peer])
762 # send share when the chain changes to their chain
763 best_share_var.changed.watch(broadcast_share)
766 for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
768 if share.hash in tracker.verified.shares:
769 ss.add_verified_hash(share.hash)
770 task.LoopingCall(save_shares).start(60)
776 @defer.inlineCallbacks
780 is_lan, lan_ip = yield ipdiscover.get_local_ip()
782 pm = yield portmapper.get_port_mapper()
783 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
784 except defer.TimeoutError:
788 log.err(None, 'UPnP error:')
789 yield deferral.sleep(random.expovariate(1/120))
792 # start listening for workers with a JSON-RPC server
794 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
796 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, current_work.value['bits'].target, current_work.value['subsidy'], net)
798 wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes)
799 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
800 worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
802 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
804 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
812 print 'Started successfully!'
813 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
814 if args.donation_percentage > 0.51:
815 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
816 elif args.donation_percentage < 0.49:
817 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
819 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
820 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
824 if hasattr(signal, 'SIGALRM'):
825 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
826 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
828 signal.siginterrupt(signal.SIGALRM, False)
829 task.LoopingCall(signal.alarm, 30).start(1)
831 if args.irc_announce:
832 from twisted.words.protocols import irc
833 class IRCClient(irc.IRCClient):
834 nickname = 'p2pool%02i' % (random.randrange(100),)
835 channel = net.ANNOUNCE_CHANNEL
836 def lineReceived(self, line):
839 irc.IRCClient.lineReceived(self, line)
841 irc.IRCClient.signedOn(self)
842 self.factory.resetDelay()
843 self.join(self.channel)
844 @defer.inlineCallbacks
845 def new_share(share):
846 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
847 yield deferral.sleep(random.expovariate(1/60))
848 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
849 if message not in self.recent_messages:
850 self.say(self.channel, message)
851 self._remember_message(message)
852 self.watch_id = tracker.verified.added.watch(new_share)
853 self.recent_messages = []
854 def _remember_message(self, message):
855 self.recent_messages.append(message)
856 while len(self.recent_messages) > 100:
857 self.recent_messages.pop(0)
858 def privmsg(self, user, channel, message):
859 if channel == self.channel:
860 self._remember_message(message)
861 def connectionLost(self, reason):
862 tracker.verified.added.unwatch(self.watch_id)
863 print 'IRC connection lost:', reason.getErrorMessage()
864 class IRCClientFactory(protocol.ReconnectingClientFactory):
866 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
868 @defer.inlineCallbacks
873 yield deferral.sleep(3)
875 if time.time() > current_work.value['last_update'] + 60:
876 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
878 height = tracker.get_height(best_share_var.value)
879 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
881 len(tracker.verified.shares),
884 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
885 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
887 datums, dt = wb.local_rate_monitor.get_datums_in_last()
888 my_att_s = sum(datum['work']/dt for datum in datums)
889 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
890 math.format(int(my_att_s)),
892 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
893 math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
897 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
898 stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
899 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
901 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
902 shares, stale_orphan_shares, stale_doa_shares,
903 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
904 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
905 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
907 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
908 math.format(int(real_att_s)),
910 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
913 for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
914 print >>sys.stderr, '#'*40
915 print >>sys.stderr, '>>> Warning: ' + warning
916 print >>sys.stderr, '#'*40
918 if this_str != last_str or time.time() > last_time + 15:
921 last_time = time.time()
927 log.err(None, 'Fatal error:')
930 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
932 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
933 parser.add_argument('--version', action='version', version=p2pool.__version__)
934 parser.add_argument('--net',
935 help='use specified network (default: bitcoin)',
936 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
937 parser.add_argument('--testnet',
938 help='''use the network's testnet''',
939 action='store_const', const=True, default=False, dest='testnet')
940 parser.add_argument('--debug',
941 help='enable debugging mode',
942 action='store_const', const=True, default=False, dest='debug')
943 parser.add_argument('-a', '--address',
944 help='generate payouts to this address (default: <address requested from bitcoind>)',
945 type=str, action='store', default=None, dest='address')
946 parser.add_argument('--datadir',
947 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
948 type=str, action='store', default=None, dest='datadir')
949 parser.add_argument('--logfile',
950 help='''log to this file (default: data/<NET>/log)''',
951 type=str, action='store', default=None, dest='logfile')
952 parser.add_argument('--merged',
953 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
954 type=str, action='append', default=[], dest='merged_urls')
955 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
956 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
957 type=float, action='store', default=0.5, dest='donation_percentage')
958 parser.add_argument('--iocp',
959 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
960 action='store_true', default=False, dest='iocp')
961 parser.add_argument('--irc-announce',
962 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
963 action='store_true', default=False, dest='irc_announce')
964 parser.add_argument('--no-bugreport',
965 help='disable submitting caught exceptions to the author',
966 action='store_true', default=False, dest='no_bugreport')
968 p2pool_group = parser.add_argument_group('p2pool interface')
969 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
970 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
971 type=int, action='store', default=None, dest='p2pool_port')
972 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
973 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
974 type=str, action='append', default=[], dest='p2pool_nodes')
975 parser.add_argument('--disable-upnp',
976 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
977 action='store_false', default=True, dest='upnp')
978 p2pool_group.add_argument('--max-conns', metavar='CONNS',
979 help='maximum incoming connections (default: 40)',
980 type=int, action='store', default=40, dest='p2pool_conns')
982 worker_group = parser.add_argument_group('worker interface')
983 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
984 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
985 type=str, action='store', default=None, dest='worker_endpoint')
986 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
987 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
988 type=float, action='store', default=0, dest='worker_fee')
990 bitcoind_group = parser.add_argument_group('bitcoind interface')
991 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
992 help='connect to this address (default: 127.0.0.1)',
993 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
994 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
995 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
996 type=int, action='store', default=None, dest='bitcoind_rpc_port')
997 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
998 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
999 type=int, action='store', default=None, dest='bitcoind_p2p_port')
1001 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
1002 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
1003 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1005 args = parser.parse_args()
1010 net_name = args.net_name + ('_testnet' if args.testnet else '')
1011 net = networks.nets[net_name]
1013 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1014 if not os.path.exists(datadir_path):
1015 os.makedirs(datadir_path)
1017 if len(args.bitcoind_rpc_userpass) > 2:
1018 parser.error('a maximum of two arguments are allowed')
1019 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1021 if args.bitcoind_rpc_password is None:
1022 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1023 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1024 conf_path = net.PARENT.CONF_FILE_FUNC()
1025 if not os.path.exists(conf_path):
1026 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1027 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1030 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1031 with open(conf_path, 'rb') as f:
1032 cp = ConfigParser.RawConfigParser()
1033 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1034 for conf_name, var_name, var_type in [
1035 ('rpcuser', 'bitcoind_rpc_username', str),
1036 ('rpcpassword', 'bitcoind_rpc_password', str),
1037 ('rpcport', 'bitcoind_rpc_port', int),
1038 ('port', 'bitcoind_p2p_port', int),
1040 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1041 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1042 if args.bitcoind_rpc_password is None:
1043 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1045 if args.bitcoind_rpc_username is None:
1046 args.bitcoind_rpc_username = ''
1048 if args.bitcoind_rpc_port is None:
1049 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1051 if args.bitcoind_p2p_port is None:
1052 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1054 if args.p2pool_port is None:
1055 args.p2pool_port = net.P2P_PORT
1057 if args.worker_endpoint is None:
1058 worker_endpoint = '', net.WORKER_PORT
1059 elif ':' not in args.worker_endpoint:
1060 worker_endpoint = '', int(args.worker_endpoint)
1062 addr, port = args.worker_endpoint.rsplit(':', 1)
1063 worker_endpoint = addr, int(port)
1065 if args.address is not None:
1067 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1068 except Exception, e:
1069 parser.error('error parsing address: ' + repr(e))
1071 args.pubkey_hash = None
1073 def separate_url(url):
1074 s = urlparse.urlsplit(url)
1075 if '@' not in s.netloc:
1076 parser.error('merged url netloc must contain an "@"')
1077 userpass, new_netloc = s.netloc.rsplit('@', 1)
1078 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1079 merged_urls = map(separate_url, args.merged_urls)
1081 if args.logfile is None:
1082 args.logfile = os.path.join(datadir_path, 'log')
1084 logfile = logging.LogFile(args.logfile)
1085 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1086 sys.stdout = logging.AbortPipe(pipe)
1087 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1088 if hasattr(signal, "SIGUSR1"):
1089 def sigusr1(signum, frame):
1090 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1092 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1093 signal.signal(signal.SIGUSR1, sigusr1)
1094 task.LoopingCall(logfile.reopen).start(5)
1096 class ErrorReporter(object):
1098 self.last_sent = None
1100 def emit(self, eventDict):
1101 if not eventDict["isError"]:
1104 if self.last_sent is not None and time.time() < self.last_sent + 5:
1106 self.last_sent = time.time()
1108 if 'failure' in eventDict:
1109 text = ((eventDict.get('why') or 'Unhandled Error')
1110 + '\n' + eventDict['failure'].getTraceback())
1112 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1114 from twisted.web import client
1116 url='http://u.forre.st/p2pool_error.cgi',
1118 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1120 ).addBoth(lambda x: None)
1121 if not args.no_bugreport:
1122 log.addObserver(ErrorReporter().emit)
1124 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)