1 from __future__ import division
15 if '--iocp' in sys.argv:
16 from twisted.internet import iocpreactor
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
33 work = yield bitcoind.rpc_getmemorypool()
34 except jsonrpc.Error, e:
35 if e.code == -32601: # Method not found
36 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37 raise deferral.RetrySilentlyException()
39 packed_transactions = [x.decode('hex') for x in work['transactions']]
40 defer.returnValue(dict(
41 version=work['version'],
42 previous_block_hash=int(work['previousblockhash'], 16),
43 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44 merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45 subsidy=work['coinbasevalue'],
47 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51 class WorkerBridge(worker_interface.WorkerBridge):
52 def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes):
53 worker_interface.WorkerBridge.__init__(self)
54 self.new_work_event = lp_signal
55 self.recent_shares_ts_work = []
57 self.my_pubkey_hash = my_pubkey_hash
59 self.donation_percentage = donation_percentage
60 self.current_work = current_work
61 self.merged_work = merged_work
62 self.best_share_var = best_share_var
63 self.tracker = tracker
64 self.my_share_hashes = my_share_hashes
65 self.my_doa_share_hashes = my_doa_share_hashes
66 self.worker_fee = worker_fee
67 self.p2p_node = p2p_node
68 self.submit_block = submit_block
69 self.set_best_share = set_best_share
70 self.shared_share_hashes = shared_share_hashes
72 self.pseudoshare_received = variable.Event()
73 self.share_received = variable.Event()
74 self.local_rate_monitor = math.RateMonitor(10*60)
76 self.removed_unstales_var = variable.Variable((0, 0, 0))
77 self.removed_doa_unstales_var = variable.Variable(0)
79 @tracker.verified.removed.watch
81 if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
82 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
83 self.removed_unstales_var.set((
84 self.removed_unstales_var.value[0] + 1,
85 self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
86 self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
88 if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
89 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
91 def get_stale_counts(self):
92 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
93 my_shares = len(self.my_share_hashes)
94 my_doa_shares = len(self.my_doa_share_hashes)
95 delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
96 my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
97 my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
98 orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
99 doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
101 my_shares_not_in_chain = my_shares - my_shares_in_chain
102 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
104 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
106 def get_user_details(self, request):
107 user = request.getUser() if request.getUser() is not None else ''
109 desired_pseudoshare_target = None
111 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
113 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
117 desired_share_target = 2**256 - 1
119 user, min_diff_str = user.rsplit('/', 1)
121 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
125 if random.uniform(0, 100) < self.worker_fee:
126 pubkey_hash = self.my_pubkey_hash
129 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
131 pubkey_hash = self.my_pubkey_hash
133 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
135 def preprocess_request(self, request):
136 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
137 return pubkey_hash, desired_share_target, desired_pseudoshare_target
139 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
140 if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
141 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
142 if self.best_share_var.value is None and self.net.PERSIST:
143 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
144 if time.time() > self.current_work.value['last_update'] + 60:
145 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
147 if self.merged_work.value:
148 tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
149 mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
150 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
151 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
155 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
161 share_info, generate_tx = p2pool_data.Share.generate_transaction(
162 tracker=self.tracker,
164 previous_share_hash=self.best_share_var.value,
165 coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
166 nonce=random.randrange(2**32),
167 pubkey_hash=pubkey_hash,
168 subsidy=self.current_work.value['subsidy'],
169 donation=math.perfect_round(65535*self.donation_percentage/100),
170 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
171 'orphan' if orphans > orphans_recorded_in_chain else
172 'doa' if doas > doas_recorded_in_chain else
174 )(*self.get_stale_counts()),
177 block_target=self.current_work.value['bits'].target,
178 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
179 desired_target=desired_share_target,
180 ref_merkle_link=dict(branch=[], index=0),
184 if desired_pseudoshare_target is None:
186 if len(self.recent_shares_ts_work) == 50:
187 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
189 target = min(target, int(2**256/hash_rate))
191 target = desired_pseudoshare_target
192 target = max(target, share_info['bits'].target)
193 for aux_work in self.merged_work.value.itervalues():
194 target = max(target, aux_work['target'])
195 target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
197 transactions = [generate_tx] + list(self.current_work.value['transactions'])
198 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
199 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
201 getwork_time = time.time()
202 merkle_link = self.current_work.value['merkle_link']
204 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
205 bitcoin_data.target_to_difficulty(target),
206 bitcoin_data.target_to_difficulty(share_info['bits'].target),
207 self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
208 len(self.current_work.value['transactions']),
211 bits = self.current_work.value['bits']
212 previous_block = self.current_work.value['previous_block']
213 ba = bitcoin_getwork.BlockAttempt(
214 version=self.current_work.value['version'],
215 previous_block=self.current_work.value['previous_block'],
216 merkle_root=merkle_root,
217 timestamp=self.current_work.value['time'],
218 bits=self.current_work.value['bits'],
222 received_header_hashes = set()
224 def got_response(header, request):
225 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
226 pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
228 if pow_hash <= header['bits'].target or p2pool.DEBUG:
229 self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
230 if pow_hash <= header['bits'].target:
232 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
235 log.err(None, 'Error while processing potential block:')
237 user, _, _, _ = self.get_user_details(request)
238 assert header['merkle_root'] == merkle_root
239 assert header['previous_block'] == previous_block
240 assert header['bits'] == bits
242 on_time = self.best_share_var.value == share_info['share_data']['previous_share_hash']
244 for aux_work, index, hashes in mm_later:
246 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
247 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
248 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
249 bitcoin_data.aux_pow_type.pack(dict(
252 block_hash=header_hash,
253 merkle_link=merkle_link,
255 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
256 parent_block_header=header,
261 if result != (pow_hash <= aux_work['target']):
262 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
264 print 'Merged block submittal result: %s' % (result,)
267 log.err(err, 'Error submitting merged block:')
269 log.err(None, 'Error while processing merged mining POW:')
271 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
272 min_header = dict(header);del min_header['merkle_root']
273 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
274 share = p2pool_data.Share(self.net, None, dict(
275 min_header=min_header, share_info=share_info, hash_link=hash_link,
276 ref_merkle_link=dict(branch=[], index=0),
277 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
279 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
281 p2pool_data.format_hash(share.hash),
282 p2pool_data.format_hash(share.previous_hash),
283 time.time() - getwork_time,
284 ' DEAD ON ARRIVAL' if not on_time else '',
286 self.my_share_hashes.add(share.hash)
288 self.my_doa_share_hashes.add(share.hash)
290 self.tracker.add(share)
292 self.tracker.verified.add(share)
293 self.set_best_share()
296 if pow_hash <= header['bits'].target or p2pool.DEBUG:
297 for peer in self.p2p_node.peers.itervalues():
298 peer.sendShares([share])
299 self.shared_share_hashes.add(share.hash)
301 log.err(None, 'Error forwarding block solution:')
303 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
305 if pow_hash > target:
306 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
307 print ' Hash: %56x' % (pow_hash,)
308 print ' Target: %56x' % (target,)
309 elif header_hash in received_header_hashes:
310 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
312 received_header_hashes.add(header_hash)
314 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
315 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
316 while len(self.recent_shares_ts_work) > 50:
317 self.recent_shares_ts_work.pop(0)
318 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
322 return ba, got_response
324 @defer.inlineCallbacks
325 def main(args, net, datadir_path, merged_urls, worker_endpoint):
327 print 'p2pool (version %s)' % (p2pool.__version__,)
330 # connect to bitcoind over JSON-RPC and do initial getmemorypool
331 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
332 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
333 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
334 @deferral.retry('Error while checking Bitcoin connection:', 1)
335 @defer.inlineCallbacks
337 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
338 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
339 raise deferral.RetrySilentlyException()
340 temp_work = yield getwork(bitcoind)
341 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
342 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
343 raise deferral.RetrySilentlyException()
344 defer.returnValue(temp_work)
345 temp_work = yield check()
347 block_height_var = variable.Variable(None)
348 @defer.inlineCallbacks
350 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
352 task.LoopingCall(poll_height).start(60*60)
355 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
356 print ' Current block height: %i' % (block_height_var.value,)
359 # connect to bitcoind over bitcoin-p2p
360 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
361 factory = bitcoin_p2p.ClientFactory(net.PARENT)
362 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
363 yield factory.getProtocol() # waits until handshake is successful
367 print 'Determining payout address...'
368 if args.pubkey_hash is None:
369 address_path = os.path.join(datadir_path, 'cached_payout_address')
371 if os.path.exists(address_path):
372 with open(address_path, 'rb') as f:
373 address = f.read().strip('\r\n')
374 print ' Loaded cached address: %s...' % (address,)
378 if address is not None:
379 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
380 if not res['isvalid'] or not res['ismine']:
381 print ' Cached address is either invalid or not controlled by local bitcoind!'
385 print ' Getting payout address from bitcoind...'
386 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
388 with open(address_path, 'wb') as f:
391 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
393 my_pubkey_hash = args.pubkey_hash
394 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
397 my_share_hashes = set()
398 my_doa_share_hashes = set()
400 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
401 shared_share_hashes = set()
402 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
403 known_verified = set()
404 print "Loading shares..."
405 for i, (mode, contents) in enumerate(ss.get_shares()):
407 if contents.hash in tracker.shares:
409 shared_share_hashes.add(contents.hash)
410 contents.time_seen = 0
411 tracker.add(contents)
412 if len(tracker.shares) % 1000 == 0 and tracker.shares:
413 print " %i" % (len(tracker.shares),)
414 elif mode == 'verified_hash':
415 known_verified.add(contents)
417 raise AssertionError()
418 print " ...inserting %i verified shares..." % (len(known_verified),)
419 for h in known_verified:
420 if h not in tracker.shares:
421 ss.forget_verified_share(h)
423 tracker.verified.add(tracker.shares[h])
424 print " ...done loading %i shares!" % (len(tracker.shares),)
426 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
427 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
428 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
430 print 'Initializing work...'
435 bitcoind_work = variable.Variable(None)
437 @defer.inlineCallbacks
439 work = yield getwork(bitcoind)
440 bitcoind_work.set(dict(
441 version=work['version'],
442 previous_block=work['previous_block_hash'],
444 coinbaseflags=work['coinbaseflags'],
446 transactions=work['transactions'],
447 merkle_link=work['merkle_link'],
448 subsidy=work['subsidy'],
449 clock_offset=time.time() - work['time'],
450 last_update=time.time(),
452 yield poll_bitcoind()
454 @defer.inlineCallbacks
457 flag = factory.new_block.get_deferred()
459 yield poll_bitcoind()
462 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
467 best_block_header = variable.Variable(None)
468 def handle_header(new_header):
469 # check that header matches current target
470 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
472 bitcoind_best_block = bitcoind_work.value['previous_block']
473 if (best_block_header.value is None
475 new_header['previous_block'] == bitcoind_best_block and
476 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
477 ) # new is child of current and previous is current
479 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
480 best_block_header.value['previous_block'] != bitcoind_best_block
481 )): # new is current and previous is not a child of current
482 best_block_header.set(new_header)
483 @defer.inlineCallbacks
485 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
486 bitcoind_work.changed.watch(lambda _: poll_header())
491 merged_work = variable.Variable({})
493 @defer.inlineCallbacks
494 def set_merged_work(merged_url, merged_userpass):
495 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
497 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
498 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
499 hash=int(auxblock['hash'], 16),
500 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
501 merged_proxy=merged_proxy,
503 yield deferral.sleep(1)
504 for merged_url, merged_userpass in merged_urls:
505 set_merged_work(merged_url, merged_userpass)
507 @merged_work.changed.watch
508 def _(new_merged_work):
509 print 'Got new merged mining work!'
513 current_work = variable.Variable(None)
515 t = dict(bitcoind_work.value)
517 if (best_block_header.value is not None and
518 best_block_header.value['previous_block'] == t['previous_block'] and
519 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
520 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
521 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
523 version=best_block_header.value['version'],
524 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
525 bits=best_block_header.value['bits'], # not always true
527 time=best_block_header.value['timestamp'] + 600, # better way?
529 merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
530 subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
531 clock_offset=current_work.value['clock_offset'],
532 last_update=current_work.value['last_update'],
536 bitcoind_work.changed.watch(lambda _: compute_work())
537 best_block_header.changed.watch(lambda _: compute_work())
542 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
543 requested = expiring_dict.ExpiringDict(300)
544 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
546 best_share_var = variable.Variable(None)
547 def set_best_share():
548 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
550 best_share_var.set(best)
553 for peer2, share_hash in desired:
554 if share_hash not in tracker.tails: # was received in the time tracker.think was running
556 last_request_time, count = requested.get(share_hash, (None, 0))
557 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
559 potential_peers = set()
560 for head in tracker.tails[share_hash]:
561 potential_peers.update(peer_heads.get(head, set()))
562 potential_peers = [peer for peer in potential_peers if peer.connected2]
563 if count == 0 and peer2 is not None and peer2.connected2:
566 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
570 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
574 stops=list(set(tracker.heads) | set(
575 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
578 requested[share_hash] = t, count + 1
579 bitcoind_work.changed.watch(lambda _: set_best_share())
584 lp_signal = variable.Event()
586 @current_work.transitioned.watch
587 def _(before, after):
588 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
589 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
591 merged_work.changed.watch(lambda _: lp_signal.happened())
592 best_share_var.changed.watch(lambda _: lp_signal.happened())
598 # setup p2p logic and join p2pool network
600 class Node(p2p.Node):
601 def handle_shares(self, shares, peer):
603 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
607 if share.hash in tracker.shares:
608 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
613 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
617 if shares and peer is not None:
618 peer_heads.setdefault(shares[0].hash, set()).add(peer)
624 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
626 def handle_share_hashes(self, hashes, peer):
629 for share_hash in hashes:
630 if share_hash in tracker.shares:
632 last_request_time, count = requested.get(share_hash, (None, 0))
633 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
635 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
636 get_hashes.append(share_hash)
637 requested[share_hash] = t, count + 1
639 if hashes and peer is not None:
640 peer_heads.setdefault(hashes[0], set()).add(peer)
642 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
644 def handle_get_shares(self, hashes, parents, stops, peer):
645 parents = min(parents, 1000//len(hashes))
648 for share_hash in hashes:
649 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
650 if share.hash in stops:
653 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
656 def handle_bestblock(self, header, peer):
657 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
658 raise p2p.PeerMisbehavingError('received block header fails PoW test')
659 handle_header(header)
661 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
662 def submit_block_p2p(block):
663 if factory.conn.value is None:
664 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
665 raise deferral.RetrySilentlyException()
666 factory.conn.value.send_block(block=block)
668 @deferral.retry('Error submitting block: (will retry)', 10, 10)
669 @defer.inlineCallbacks
670 def submit_block_rpc(block, ignore_failure):
671 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
672 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
673 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
674 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
676 def submit_block(block, ignore_failure):
677 submit_block_p2p(block)
678 submit_block_rpc(block, ignore_failure)
680 @tracker.verified.added.watch
682 if share.pow_hash <= share.header['bits'].target:
683 submit_block(share.as_block(tracker), ignore_failure=True)
685 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
688 if (get_height_rel_highest(share.header['previous_block']) > -5 or
689 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
690 broadcast_share(share.hash)
692 reactor.callLater(5, spread) # so get_height_rel_highest can update
694 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
696 @defer.inlineCallbacks
699 ip, port = x.split(':')
700 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
702 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
705 if os.path.exists(os.path.join(datadir_path, 'addrs')):
707 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
708 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
710 print >>sys.stderr, 'error parsing addrs'
711 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
714 if addr not in addrs:
715 addrs[addr] = (0, time.time(), time.time())
719 connect_addrs = set()
720 for addr_df in map(parse, args.p2pool_nodes):
722 connect_addrs.add((yield addr_df))
727 best_share_hash_func=lambda: best_share_var.value,
728 port=args.p2pool_port,
731 connect_addrs=connect_addrs,
732 max_incoming_conns=args.p2pool_conns,
737 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
738 f.write(json.dumps(p2p_node.addr_store.items()))
739 task.LoopingCall(save_addrs).start(60)
741 @best_block_header.changed.watch
743 for peer in p2p_node.peers.itervalues():
744 peer.send_bestblock(header=header)
746 def broadcast_share(share_hash):
748 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
749 if share.hash in shared_share_hashes:
751 shared_share_hashes.add(share.hash)
754 for peer in p2p_node.peers.itervalues():
755 peer.sendShares([share for share in shares if share.peer is not peer])
757 # send share when the chain changes to their chain
758 best_share_var.changed.watch(broadcast_share)
761 for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
763 if share.hash in tracker.verified.shares:
764 ss.add_verified_hash(share.hash)
765 task.LoopingCall(save_shares).start(60)
771 @defer.inlineCallbacks
775 is_lan, lan_ip = yield ipdiscover.get_local_ip()
777 pm = yield portmapper.get_port_mapper()
778 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
779 except defer.TimeoutError:
783 log.err(None, 'UPnP error:')
784 yield deferral.sleep(random.expovariate(1/120))
787 # start listening for workers with a JSON-RPC server
789 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
791 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, current_work.value['bits'].target, current_work.value['subsidy'], net)
793 wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, merged_work, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes)
794 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
795 worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
797 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
799 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
807 print 'Started successfully!'
808 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
809 if args.donation_percentage > 0.51:
810 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
811 elif args.donation_percentage < 0.49:
812 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
814 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
815 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
819 if hasattr(signal, 'SIGALRM'):
820 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
821 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
823 signal.siginterrupt(signal.SIGALRM, False)
824 task.LoopingCall(signal.alarm, 30).start(1)
826 if args.irc_announce:
827 from twisted.words.protocols import irc
828 class IRCClient(irc.IRCClient):
829 nickname = 'p2pool%02i' % (random.randrange(100),)
830 channel = net.ANNOUNCE_CHANNEL
831 def lineReceived(self, line):
834 irc.IRCClient.lineReceived(self, line)
836 irc.IRCClient.signedOn(self)
837 self.factory.resetDelay()
838 self.join(self.channel)
839 @defer.inlineCallbacks
840 def new_share(share):
841 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
842 yield deferral.sleep(random.expovariate(1/60))
843 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
844 if message not in self.recent_messages:
845 self.say(self.channel, message)
846 self._remember_message(message)
847 self.watch_id = tracker.verified.added.watch(new_share)
848 self.recent_messages = []
849 def _remember_message(self, message):
850 self.recent_messages.append(message)
851 while len(self.recent_messages) > 100:
852 self.recent_messages.pop(0)
853 def privmsg(self, user, channel, message):
854 if channel == self.channel:
855 self._remember_message(message)
856 def connectionLost(self, reason):
857 tracker.verified.added.unwatch(self.watch_id)
858 print 'IRC connection lost:', reason.getErrorMessage()
859 class IRCClientFactory(protocol.ReconnectingClientFactory):
861 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
863 @defer.inlineCallbacks
868 yield deferral.sleep(3)
870 if time.time() > current_work.value['last_update'] + 60:
871 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
873 height = tracker.get_height(best_share_var.value)
874 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
876 len(tracker.verified.shares),
879 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
880 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
882 datums, dt = wb.local_rate_monitor.get_datums_in_last()
883 my_att_s = sum(datum['work']/dt for datum in datums)
884 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
885 math.format(int(my_att_s)),
887 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
888 math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
892 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
893 stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
894 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
896 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
897 shares, stale_orphan_shares, stale_doa_shares,
898 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
899 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
900 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
902 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
903 math.format(int(real_att_s)),
905 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
908 for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
909 print >>sys.stderr, '#'*40
910 print >>sys.stderr, '>>> Warning: ' + warning
911 print >>sys.stderr, '#'*40
913 if this_str != last_str or time.time() > last_time + 15:
916 last_time = time.time()
922 log.err(None, 'Fatal error:')
925 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
927 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
928 parser.add_argument('--version', action='version', version=p2pool.__version__)
929 parser.add_argument('--net',
930 help='use specified network (default: bitcoin)',
931 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
932 parser.add_argument('--testnet',
933 help='''use the network's testnet''',
934 action='store_const', const=True, default=False, dest='testnet')
935 parser.add_argument('--debug',
936 help='enable debugging mode',
937 action='store_const', const=True, default=False, dest='debug')
938 parser.add_argument('-a', '--address',
939 help='generate payouts to this address (default: <address requested from bitcoind>)',
940 type=str, action='store', default=None, dest='address')
941 parser.add_argument('--datadir',
942 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
943 type=str, action='store', default=None, dest='datadir')
944 parser.add_argument('--logfile',
945 help='''log to this file (default: data/<NET>/log)''',
946 type=str, action='store', default=None, dest='logfile')
947 parser.add_argument('--merged',
948 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
949 type=str, action='append', default=[], dest='merged_urls')
950 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
951 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
952 type=float, action='store', default=0.5, dest='donation_percentage')
953 parser.add_argument('--iocp',
954 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
955 action='store_true', default=False, dest='iocp')
956 parser.add_argument('--irc-announce',
957 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
958 action='store_true', default=False, dest='irc_announce')
959 parser.add_argument('--no-bugreport',
960 help='disable submitting caught exceptions to the author',
961 action='store_true', default=False, dest='no_bugreport')
963 p2pool_group = parser.add_argument_group('p2pool interface')
964 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
965 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
966 type=int, action='store', default=None, dest='p2pool_port')
967 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
968 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
969 type=str, action='append', default=[], dest='p2pool_nodes')
970 parser.add_argument('--disable-upnp',
971 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
972 action='store_false', default=True, dest='upnp')
973 p2pool_group.add_argument('--max-conns', metavar='CONNS',
974 help='maximum incoming connections (default: 40)',
975 type=int, action='store', default=40, dest='p2pool_conns')
977 worker_group = parser.add_argument_group('worker interface')
978 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
979 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
980 type=str, action='store', default=None, dest='worker_endpoint')
981 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
982 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
983 type=float, action='store', default=0, dest='worker_fee')
985 bitcoind_group = parser.add_argument_group('bitcoind interface')
986 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
987 help='connect to this address (default: 127.0.0.1)',
988 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
989 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
990 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
991 type=int, action='store', default=None, dest='bitcoind_rpc_port')
992 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
993 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
994 type=int, action='store', default=None, dest='bitcoind_p2p_port')
996 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
997 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
998 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
1000 args = parser.parse_args()
1005 net_name = args.net_name + ('_testnet' if args.testnet else '')
1006 net = networks.nets[net_name]
1008 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1009 if not os.path.exists(datadir_path):
1010 os.makedirs(datadir_path)
1012 if len(args.bitcoind_rpc_userpass) > 2:
1013 parser.error('a maximum of two arguments are allowed')
1014 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1016 if args.bitcoind_rpc_password is None:
1017 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1018 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1019 conf_path = net.PARENT.CONF_FILE_FUNC()
1020 if not os.path.exists(conf_path):
1021 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1022 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1025 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1026 with open(conf_path, 'rb') as f:
1027 cp = ConfigParser.RawConfigParser()
1028 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1029 for conf_name, var_name, var_type in [
1030 ('rpcuser', 'bitcoind_rpc_username', str),
1031 ('rpcpassword', 'bitcoind_rpc_password', str),
1032 ('rpcport', 'bitcoind_rpc_port', int),
1033 ('port', 'bitcoind_p2p_port', int),
1035 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1036 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1037 if args.bitcoind_rpc_password is None:
1038 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1040 if args.bitcoind_rpc_username is None:
1041 args.bitcoind_rpc_username = ''
1043 if args.bitcoind_rpc_port is None:
1044 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1046 if args.bitcoind_p2p_port is None:
1047 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1049 if args.p2pool_port is None:
1050 args.p2pool_port = net.P2P_PORT
1052 if args.worker_endpoint is None:
1053 worker_endpoint = '', net.WORKER_PORT
1054 elif ':' not in args.worker_endpoint:
1055 worker_endpoint = '', int(args.worker_endpoint)
1057 addr, port = args.worker_endpoint.rsplit(':', 1)
1058 worker_endpoint = addr, int(port)
1060 if args.address is not None:
1062 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1063 except Exception, e:
1064 parser.error('error parsing address: ' + repr(e))
1066 args.pubkey_hash = None
1068 def separate_url(url):
1069 s = urlparse.urlsplit(url)
1070 if '@' not in s.netloc:
1071 parser.error('merged url netloc must contain an "@"')
1072 userpass, new_netloc = s.netloc.rsplit('@', 1)
1073 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1074 merged_urls = map(separate_url, args.merged_urls)
1076 if args.logfile is None:
1077 args.logfile = os.path.join(datadir_path, 'log')
1079 logfile = logging.LogFile(args.logfile)
1080 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1081 sys.stdout = logging.AbortPipe(pipe)
1082 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1083 if hasattr(signal, "SIGUSR1"):
1084 def sigusr1(signum, frame):
1085 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1087 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1088 signal.signal(signal.SIGUSR1, sigusr1)
1089 task.LoopingCall(logfile.reopen).start(5)
1091 class ErrorReporter(object):
1093 self.last_sent = None
1095 def emit(self, eventDict):
1096 if not eventDict["isError"]:
1099 if self.last_sent is not None and time.time() < self.last_sent + 5:
1101 self.last_sent = time.time()
1103 if 'failure' in eventDict:
1104 text = ((eventDict.get('why') or 'Unhandled Error')
1105 + '\n' + eventDict['failure'].getTraceback())
1107 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1109 from twisted.web import client
1111 url='http://u.forre.st/p2pool_error.cgi',
1113 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1115 ).addBoth(lambda x: None)
1116 if not args.no_bugreport:
1117 log.addObserver(ErrorReporter().emit)
1119 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)