1 from __future__ import division
15 if '--iocp' in sys.argv:
16 from twisted.internet import iocpreactor
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
33 work = yield bitcoind.rpc_getmemorypool()
34 except jsonrpc.Error, e:
35 if e.code == -32601: # Method not found
36 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37 raise deferral.RetrySilentlyException()
39 packed_transactions = [x.decode('hex') for x in work['transactions']]
40 defer.returnValue(dict(
41 version=work['version'],
42 previous_block_hash=int(work['previousblockhash'], 16),
43 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44 merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45 subsidy=work['coinbasevalue'],
47 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51 class WorkerBridge(worker_interface.WorkerBridge):
52 def __init__(self, my_pubkey_hash, net, donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var):
53 worker_interface.WorkerBridge.__init__(self)
54 self.recent_shares_ts_work = []
56 self.my_pubkey_hash = my_pubkey_hash
58 self.donation_percentage = donation_percentage
59 self.bitcoind_work = bitcoind_work
60 self.best_block_header = best_block_header
61 self.best_share_var = best_share_var
62 self.tracker = tracker
63 self.my_share_hashes = my_share_hashes
64 self.my_doa_share_hashes = my_doa_share_hashes
65 self.worker_fee = worker_fee
66 self.p2p_node = p2p_node
67 self.submit_block = submit_block
68 self.set_best_share = set_best_share
69 self.shared_share_hashes = shared_share_hashes
70 self.block_height_var = block_height_var
72 self.pseudoshare_received = variable.Event()
73 self.share_received = variable.Event()
74 self.local_rate_monitor = math.RateMonitor(10*60)
76 self.removed_unstales_var = variable.Variable((0, 0, 0))
77 self.removed_doa_unstales_var = variable.Variable(0)
79 @tracker.verified.removed.watch
81 if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value):
82 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
83 self.removed_unstales_var.set((
84 self.removed_unstales_var.value[0] + 1,
85 self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
86 self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
88 if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value):
89 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
93 self.merged_work = variable.Variable({})
95 @defer.inlineCallbacks
96 def set_merged_work(merged_url, merged_userpass):
97 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
99 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
100 self.merged_work.set(dict(self.merged_work.value, **{auxblock['chainid']: dict(
101 hash=int(auxblock['hash'], 16),
102 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
103 merged_proxy=merged_proxy,
105 yield deferral.sleep(1)
106 for merged_url, merged_userpass in merged_urls:
107 set_merged_work(merged_url, merged_userpass)
109 @self.merged_work.changed.watch
110 def _(new_merged_work):
111 print 'Got new merged mining work!'
115 self.current_work = variable.Variable(None)
117 t = dict(self.bitcoind_work.value)
119 bb = self.best_block_header.value
120 if bb is not None and bb['previous_block'] == t['previous_block'] and net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target:
121 print 'Skipping from block %x to block %x!' % (bb['previous_block'],
122 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)))
124 version=bb['version'],
125 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb)),
126 bits=bb['bits'], # not always true
128 time=bb['timestamp'] + 600, # better way?
130 merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
131 subsidy=net.PARENT.SUBSIDY_FUNC(self.block_height_var.value),
132 clock_offset=self.current_work.value['clock_offset'],
133 last_update=self.current_work.value['last_update'],
136 self.current_work.set(t)
137 self.bitcoind_work.changed.watch(lambda _: compute_work())
138 self.best_block_header.changed.watch(lambda _: compute_work())
141 self.new_work_event = variable.Event()
142 @self.current_work.transitioned.watch
143 def _(before, after):
144 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
145 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']):
146 self.new_work_event.happened()
147 self.merged_work.changed.watch(lambda _: self.new_work_event.happened())
148 self.best_share_var.changed.watch(lambda _: self.new_work_event.happened())
150 def get_stale_counts(self):
151 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
152 my_shares = len(self.my_share_hashes)
153 my_doa_shares = len(self.my_doa_share_hashes)
154 delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value)
155 my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
156 my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
157 orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
158 doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
160 my_shares_not_in_chain = my_shares - my_shares_in_chain
161 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
163 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
165 def get_user_details(self, request):
166 user = request.getUser() if request.getUser() is not None else ''
168 desired_pseudoshare_target = None
170 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
172 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
176 desired_share_target = 2**256 - 1
178 user, min_diff_str = user.rsplit('/', 1)
180 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
184 if random.uniform(0, 100) < self.worker_fee:
185 pubkey_hash = self.my_pubkey_hash
188 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
190 pubkey_hash = self.my_pubkey_hash
192 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
194 def preprocess_request(self, request):
195 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
196 return pubkey_hash, desired_share_target, desired_pseudoshare_target
198 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
199 if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
200 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
201 if self.best_share_var.value is None and self.net.PERSIST:
202 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
203 if time.time() > self.current_work.value['last_update'] + 60:
204 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
206 if self.merged_work.value:
207 tree, size = bitcoin_data.make_auxpow_tree(self.merged_work.value)
208 mm_hashes = [self.merged_work.value.get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
209 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
210 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
214 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.merged_work.value.iteritems()]
220 share_info, generate_tx = p2pool_data.Share.generate_transaction(
221 tracker=self.tracker,
223 previous_share_hash=self.best_share_var.value,
224 coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
225 nonce=random.randrange(2**32),
226 pubkey_hash=pubkey_hash,
227 subsidy=self.current_work.value['subsidy'],
228 donation=math.perfect_round(65535*self.donation_percentage/100),
229 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
230 'orphan' if orphans > orphans_recorded_in_chain else
231 'doa' if doas > doas_recorded_in_chain else
233 )(*self.get_stale_counts()),
236 block_target=self.current_work.value['bits'].target,
237 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
238 desired_target=desired_share_target,
239 ref_merkle_link=dict(branch=[], index=0),
243 if desired_pseudoshare_target is None:
245 if len(self.recent_shares_ts_work) == 50:
246 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
248 target = min(target, int(2**256/hash_rate))
250 target = desired_pseudoshare_target
251 target = max(target, share_info['bits'].target)
252 for aux_work in self.merged_work.value.itervalues():
253 target = max(target, aux_work['target'])
254 target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
256 transactions = [generate_tx] + list(self.current_work.value['transactions'])
257 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
258 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
260 getwork_time = time.time()
261 merkle_link = self.current_work.value['merkle_link']
263 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
264 bitcoin_data.target_to_difficulty(target),
265 bitcoin_data.target_to_difficulty(share_info['bits'].target),
266 self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
267 len(self.current_work.value['transactions']),
270 bits = self.current_work.value['bits']
271 previous_block = self.current_work.value['previous_block']
272 ba = bitcoin_getwork.BlockAttempt(
273 version=self.current_work.value['version'],
274 previous_block=self.current_work.value['previous_block'],
275 merkle_root=merkle_root,
276 timestamp=self.current_work.value['time'],
277 bits=self.current_work.value['bits'],
281 received_header_hashes = set()
283 def got_response(header, request):
284 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
285 pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
287 if pow_hash <= header['bits'].target or p2pool.DEBUG:
288 self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
289 if pow_hash <= header['bits'].target:
291 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
294 log.err(None, 'Error while processing potential block:')
296 user, _, _, _ = self.get_user_details(request)
297 assert header['merkle_root'] == merkle_root
298 assert header['previous_block'] == previous_block
299 assert header['bits'] == bits
301 on_time = self.best_share_var.value == share_info['share_data']['previous_share_hash']
303 for aux_work, index, hashes in mm_later:
305 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
306 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
307 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
308 bitcoin_data.aux_pow_type.pack(dict(
311 block_hash=header_hash,
312 merkle_link=merkle_link,
314 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
315 parent_block_header=header,
320 if result != (pow_hash <= aux_work['target']):
321 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
323 print 'Merged block submittal result: %s' % (result,)
326 log.err(err, 'Error submitting merged block:')
328 log.err(None, 'Error while processing merged mining POW:')
330 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
331 min_header = dict(header);del min_header['merkle_root']
332 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
333 share = p2pool_data.Share(self.net, None, dict(
334 min_header=min_header, share_info=share_info, hash_link=hash_link,
335 ref_merkle_link=dict(branch=[], index=0),
336 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
338 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
340 p2pool_data.format_hash(share.hash),
341 p2pool_data.format_hash(share.previous_hash),
342 time.time() - getwork_time,
343 ' DEAD ON ARRIVAL' if not on_time else '',
345 self.my_share_hashes.add(share.hash)
347 self.my_doa_share_hashes.add(share.hash)
349 self.tracker.add(share)
351 self.tracker.verified.add(share)
352 self.set_best_share()
355 if pow_hash <= header['bits'].target or p2pool.DEBUG:
356 for peer in self.p2p_node.peers.itervalues():
357 peer.sendShares([share])
358 self.shared_share_hashes.add(share.hash)
360 log.err(None, 'Error forwarding block solution:')
362 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
364 if pow_hash > target:
365 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
366 print ' Hash: %56x' % (pow_hash,)
367 print ' Target: %56x' % (target,)
368 elif header_hash in received_header_hashes:
369 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
371 received_header_hashes.add(header_hash)
373 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
374 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
375 while len(self.recent_shares_ts_work) > 50:
376 self.recent_shares_ts_work.pop(0)
377 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
381 return ba, got_response
383 @defer.inlineCallbacks
384 def main(args, net, datadir_path, merged_urls, worker_endpoint):
386 print 'p2pool (version %s)' % (p2pool.__version__,)
389 # connect to bitcoind over JSON-RPC and do initial getmemorypool
390 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
391 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
392 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
393 @deferral.retry('Error while checking Bitcoin connection:', 1)
394 @defer.inlineCallbacks
396 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
397 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
398 raise deferral.RetrySilentlyException()
399 temp_work = yield getwork(bitcoind)
400 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
401 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
402 raise deferral.RetrySilentlyException()
403 defer.returnValue(temp_work)
404 temp_work = yield check()
406 block_height_var = variable.Variable(None)
407 @defer.inlineCallbacks
409 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
411 task.LoopingCall(poll_height).start(60*60)
414 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
415 print ' Current block height: %i' % (block_height_var.value,)
418 # connect to bitcoind over bitcoin-p2p
419 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
420 factory = bitcoin_p2p.ClientFactory(net.PARENT)
421 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
422 yield factory.getProtocol() # waits until handshake is successful
426 print 'Determining payout address...'
427 if args.pubkey_hash is None:
428 address_path = os.path.join(datadir_path, 'cached_payout_address')
430 if os.path.exists(address_path):
431 with open(address_path, 'rb') as f:
432 address = f.read().strip('\r\n')
433 print ' Loaded cached address: %s...' % (address,)
437 if address is not None:
438 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
439 if not res['isvalid'] or not res['ismine']:
440 print ' Cached address is either invalid or not controlled by local bitcoind!'
444 print ' Getting payout address from bitcoind...'
445 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
447 with open(address_path, 'wb') as f:
450 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
452 my_pubkey_hash = args.pubkey_hash
453 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
456 my_share_hashes = set()
457 my_doa_share_hashes = set()
459 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
460 shared_share_hashes = set()
461 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
462 known_verified = set()
463 print "Loading shares..."
464 for i, (mode, contents) in enumerate(ss.get_shares()):
466 if contents.hash in tracker.shares:
468 shared_share_hashes.add(contents.hash)
469 contents.time_seen = 0
470 tracker.add(contents)
471 if len(tracker.shares) % 1000 == 0 and tracker.shares:
472 print " %i" % (len(tracker.shares),)
473 elif mode == 'verified_hash':
474 known_verified.add(contents)
476 raise AssertionError()
477 print " ...inserting %i verified shares..." % (len(known_verified),)
478 for h in known_verified:
479 if h not in tracker.shares:
480 ss.forget_verified_share(h)
482 tracker.verified.add(tracker.shares[h])
483 print " ...done loading %i shares!" % (len(tracker.shares),)
485 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
486 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
487 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
489 print 'Initializing work...'
494 bitcoind_work = variable.Variable(None)
496 @defer.inlineCallbacks
498 work = yield getwork(bitcoind)
499 bitcoind_work.set(dict(
500 version=work['version'],
501 previous_block=work['previous_block_hash'],
503 coinbaseflags=work['coinbaseflags'],
505 transactions=work['transactions'],
506 merkle_link=work['merkle_link'],
507 subsidy=work['subsidy'],
508 clock_offset=time.time() - work['time'],
509 last_update=time.time(),
511 yield poll_bitcoind()
513 @defer.inlineCallbacks
516 flag = factory.new_block.get_deferred()
518 yield poll_bitcoind()
521 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
526 best_block_header = variable.Variable(None)
527 def handle_header(new_header):
528 # check that header matches current target
529 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
531 bitcoind_best_block = bitcoind_work.value['previous_block']
532 if (best_block_header.value is None
534 new_header['previous_block'] == bitcoind_best_block and
535 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
536 ) # new is child of current and previous is current
538 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
539 best_block_header.value['previous_block'] != bitcoind_best_block
540 )): # new is current and previous is not a child of current
541 best_block_header.set(new_header)
542 @defer.inlineCallbacks
544 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
545 bitcoind_work.changed.watch(lambda _: poll_header())
550 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
551 requested = expiring_dict.ExpiringDict(300)
552 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
554 best_share_var = variable.Variable(None)
555 def set_best_share():
556 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
558 best_share_var.set(best)
561 for peer2, share_hash in desired:
562 if share_hash not in tracker.tails: # was received in the time tracker.think was running
564 last_request_time, count = requested.get(share_hash, (None, 0))
565 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
567 potential_peers = set()
568 for head in tracker.tails[share_hash]:
569 potential_peers.update(peer_heads.get(head, set()))
570 potential_peers = [peer for peer in potential_peers if peer.connected2]
571 if count == 0 and peer2 is not None and peer2.connected2:
574 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
578 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
582 stops=list(set(tracker.heads) | set(
583 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
586 requested[share_hash] = t, count + 1
587 bitcoind_work.changed.watch(lambda _: set_best_share())
594 # setup p2p logic and join p2pool network
596 class Node(p2p.Node):
597 def handle_shares(self, shares, peer):
599 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
603 if share.hash in tracker.shares:
604 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
609 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
613 if shares and peer is not None:
614 peer_heads.setdefault(shares[0].hash, set()).add(peer)
620 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
622 def handle_share_hashes(self, hashes, peer):
625 for share_hash in hashes:
626 if share_hash in tracker.shares:
628 last_request_time, count = requested.get(share_hash, (None, 0))
629 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
631 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
632 get_hashes.append(share_hash)
633 requested[share_hash] = t, count + 1
635 if hashes and peer is not None:
636 peer_heads.setdefault(hashes[0], set()).add(peer)
638 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
640 def handle_get_shares(self, hashes, parents, stops, peer):
641 parents = min(parents, 1000//len(hashes))
644 for share_hash in hashes:
645 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
646 if share.hash in stops:
649 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
652 def handle_bestblock(self, header, peer):
653 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
654 raise p2p.PeerMisbehavingError('received block header fails PoW test')
655 handle_header(header)
657 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
658 def submit_block_p2p(block):
659 if factory.conn.value is None:
660 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
661 raise deferral.RetrySilentlyException()
662 factory.conn.value.send_block(block=block)
664 @deferral.retry('Error submitting block: (will retry)', 10, 10)
665 @defer.inlineCallbacks
666 def submit_block_rpc(block, ignore_failure):
667 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
668 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
669 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
670 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
672 def submit_block(block, ignore_failure):
673 submit_block_p2p(block)
674 submit_block_rpc(block, ignore_failure)
676 @tracker.verified.added.watch
678 if share.pow_hash <= share.header['bits'].target:
679 submit_block(share.as_block(tracker), ignore_failure=True)
681 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
684 if (get_height_rel_highest(share.header['previous_block']) > -5 or
685 bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
686 broadcast_share(share.hash)
688 reactor.callLater(5, spread) # so get_height_rel_highest can update
690 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
692 @defer.inlineCallbacks
695 ip, port = x.split(':')
696 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
698 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
701 if os.path.exists(os.path.join(datadir_path, 'addrs')):
703 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
704 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
706 print >>sys.stderr, 'error parsing addrs'
707 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
710 if addr not in addrs:
711 addrs[addr] = (0, time.time(), time.time())
715 connect_addrs = set()
716 for addr_df in map(parse, args.p2pool_nodes):
718 connect_addrs.add((yield addr_df))
723 best_share_hash_func=lambda: best_share_var.value,
724 port=args.p2pool_port,
727 connect_addrs=connect_addrs,
728 max_incoming_conns=args.p2pool_conns,
733 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
734 f.write(json.dumps(p2p_node.addr_store.items()))
735 task.LoopingCall(save_addrs).start(60)
737 @best_block_header.changed.watch
739 for peer in p2p_node.peers.itervalues():
740 peer.send_bestblock(header=header)
742 def broadcast_share(share_hash):
744 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
745 if share.hash in shared_share_hashes:
747 shared_share_hashes.add(share.hash)
750 for peer in p2p_node.peers.itervalues():
751 peer.sendShares([share for share in shares if share.peer is not peer])
753 # send share when the chain changes to their chain
754 best_share_var.changed.watch(broadcast_share)
757 for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
759 if share.hash in tracker.verified.shares:
760 ss.add_verified_hash(share.hash)
761 task.LoopingCall(save_shares).start(60)
767 @defer.inlineCallbacks
771 is_lan, lan_ip = yield ipdiscover.get_local_ip()
773 pm = yield portmapper.get_port_mapper()
774 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
775 except defer.TimeoutError:
779 log.err(None, 'UPnP error:')
780 yield deferral.sleep(random.expovariate(1/120))
783 # start listening for workers with a JSON-RPC server
785 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
787 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
789 wb = WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
790 web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
791 worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
793 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
795 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
803 print 'Started successfully!'
804 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
805 if args.donation_percentage > 0.51:
806 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
807 elif args.donation_percentage < 0.49:
808 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
810 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
811 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
815 if hasattr(signal, 'SIGALRM'):
816 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
817 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
819 signal.siginterrupt(signal.SIGALRM, False)
820 task.LoopingCall(signal.alarm, 30).start(1)
822 if args.irc_announce:
823 from twisted.words.protocols import irc
824 class IRCClient(irc.IRCClient):
825 nickname = 'p2pool%02i' % (random.randrange(100),)
826 channel = net.ANNOUNCE_CHANNEL
827 def lineReceived(self, line):
830 irc.IRCClient.lineReceived(self, line)
832 irc.IRCClient.signedOn(self)
833 self.factory.resetDelay()
834 self.join(self.channel)
835 @defer.inlineCallbacks
836 def new_share(share):
837 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
838 yield deferral.sleep(random.expovariate(1/60))
839 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
840 if message not in self.recent_messages:
841 self.say(self.channel, message)
842 self._remember_message(message)
843 self.watch_id = tracker.verified.added.watch(new_share)
844 self.recent_messages = []
845 def _remember_message(self, message):
846 self.recent_messages.append(message)
847 while len(self.recent_messages) > 100:
848 self.recent_messages.pop(0)
849 def privmsg(self, user, channel, message):
850 if channel == self.channel:
851 self._remember_message(message)
852 def connectionLost(self, reason):
853 tracker.verified.added.unwatch(self.watch_id)
854 print 'IRC connection lost:', reason.getErrorMessage()
855 class IRCClientFactory(protocol.ReconnectingClientFactory):
857 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
859 @defer.inlineCallbacks
864 yield deferral.sleep(3)
866 if time.time() > bitcoind_work.value['last_update'] + 60:
867 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
869 height = tracker.get_height(best_share_var.value)
870 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
872 len(tracker.verified.shares),
875 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
876 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
878 datums, dt = wb.local_rate_monitor.get_datums_in_last()
879 my_att_s = sum(datum['work']/dt for datum in datums)
880 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
881 math.format(int(my_att_s)),
883 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
884 math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
888 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
889 stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
890 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
892 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
893 shares, stale_orphan_shares, stale_doa_shares,
894 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
895 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
896 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
898 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
899 math.format(int(real_att_s)),
901 math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
904 for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
905 print >>sys.stderr, '#'*40
906 print >>sys.stderr, '>>> Warning: ' + warning
907 print >>sys.stderr, '#'*40
909 if this_str != last_str or time.time() > last_time + 15:
912 last_time = time.time()
918 log.err(None, 'Fatal error:')
921 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
923 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
924 parser.add_argument('--version', action='version', version=p2pool.__version__)
925 parser.add_argument('--net',
926 help='use specified network (default: bitcoin)',
927 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
928 parser.add_argument('--testnet',
929 help='''use the network's testnet''',
930 action='store_const', const=True, default=False, dest='testnet')
931 parser.add_argument('--debug',
932 help='enable debugging mode',
933 action='store_const', const=True, default=False, dest='debug')
934 parser.add_argument('-a', '--address',
935 help='generate payouts to this address (default: <address requested from bitcoind>)',
936 type=str, action='store', default=None, dest='address')
937 parser.add_argument('--datadir',
938 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
939 type=str, action='store', default=None, dest='datadir')
940 parser.add_argument('--logfile',
941 help='''log to this file (default: data/<NET>/log)''',
942 type=str, action='store', default=None, dest='logfile')
943 parser.add_argument('--merged',
944 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
945 type=str, action='append', default=[], dest='merged_urls')
946 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
947 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
948 type=float, action='store', default=0.5, dest='donation_percentage')
949 parser.add_argument('--iocp',
950 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
951 action='store_true', default=False, dest='iocp')
952 parser.add_argument('--irc-announce',
953 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
954 action='store_true', default=False, dest='irc_announce')
955 parser.add_argument('--no-bugreport',
956 help='disable submitting caught exceptions to the author',
957 action='store_true', default=False, dest='no_bugreport')
959 p2pool_group = parser.add_argument_group('p2pool interface')
960 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
961 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
962 type=int, action='store', default=None, dest='p2pool_port')
963 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
964 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
965 type=str, action='append', default=[], dest='p2pool_nodes')
966 parser.add_argument('--disable-upnp',
967 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
968 action='store_false', default=True, dest='upnp')
969 p2pool_group.add_argument('--max-conns', metavar='CONNS',
970 help='maximum incoming connections (default: 40)',
971 type=int, action='store', default=40, dest='p2pool_conns')
973 worker_group = parser.add_argument_group('worker interface')
974 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
975 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
976 type=str, action='store', default=None, dest='worker_endpoint')
977 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
978 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
979 type=float, action='store', default=0, dest='worker_fee')
981 bitcoind_group = parser.add_argument_group('bitcoind interface')
982 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
983 help='connect to this address (default: 127.0.0.1)',
984 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
985 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
986 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
987 type=int, action='store', default=None, dest='bitcoind_rpc_port')
988 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
989 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
990 type=int, action='store', default=None, dest='bitcoind_p2p_port')
992 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
993 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
994 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
996 args = parser.parse_args()
1001 net_name = args.net_name + ('_testnet' if args.testnet else '')
1002 net = networks.nets[net_name]
1004 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1005 if not os.path.exists(datadir_path):
1006 os.makedirs(datadir_path)
1008 if len(args.bitcoind_rpc_userpass) > 2:
1009 parser.error('a maximum of two arguments are allowed')
1010 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1012 if args.bitcoind_rpc_password is None:
1013 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1014 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1015 conf_path = net.PARENT.CONF_FILE_FUNC()
1016 if not os.path.exists(conf_path):
1017 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1018 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1021 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1022 with open(conf_path, 'rb') as f:
1023 cp = ConfigParser.RawConfigParser()
1024 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1025 for conf_name, var_name, var_type in [
1026 ('rpcuser', 'bitcoind_rpc_username', str),
1027 ('rpcpassword', 'bitcoind_rpc_password', str),
1028 ('rpcport', 'bitcoind_rpc_port', int),
1029 ('port', 'bitcoind_p2p_port', int),
1031 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1032 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1033 if args.bitcoind_rpc_password is None:
1034 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1036 if args.bitcoind_rpc_username is None:
1037 args.bitcoind_rpc_username = ''
1039 if args.bitcoind_rpc_port is None:
1040 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1042 if args.bitcoind_p2p_port is None:
1043 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1045 if args.p2pool_port is None:
1046 args.p2pool_port = net.P2P_PORT
1048 if args.worker_endpoint is None:
1049 worker_endpoint = '', net.WORKER_PORT
1050 elif ':' not in args.worker_endpoint:
1051 worker_endpoint = '', int(args.worker_endpoint)
1053 addr, port = args.worker_endpoint.rsplit(':', 1)
1054 worker_endpoint = addr, int(port)
1056 if args.address is not None:
1058 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1059 except Exception, e:
1060 parser.error('error parsing address: ' + repr(e))
1062 args.pubkey_hash = None
1064 def separate_url(url):
1065 s = urlparse.urlsplit(url)
1066 if '@' not in s.netloc:
1067 parser.error('merged url netloc must contain an "@"')
1068 userpass, new_netloc = s.netloc.rsplit('@', 1)
1069 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1070 merged_urls = map(separate_url, args.merged_urls)
1072 if args.logfile is None:
1073 args.logfile = os.path.join(datadir_path, 'log')
1075 logfile = logging.LogFile(args.logfile)
1076 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1077 sys.stdout = logging.AbortPipe(pipe)
1078 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1079 if hasattr(signal, "SIGUSR1"):
1080 def sigusr1(signum, frame):
1081 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1083 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1084 signal.signal(signal.SIGUSR1, sigusr1)
1085 task.LoopingCall(logfile.reopen).start(5)
1087 class ErrorReporter(object):
1089 self.last_sent = None
1091 def emit(self, eventDict):
1092 if not eventDict["isError"]:
1095 if self.last_sent is not None and time.time() < self.last_sent + 5:
1097 self.last_sent = time.time()
1099 if 'failure' in eventDict:
1100 text = ((eventDict.get('why') or 'Unhandled Error')
1101 + '\n' + eventDict['failure'].getTraceback())
1103 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1105 from twisted.web import client
1107 url='http://u.forre.st/p2pool_error.cgi',
1109 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1111 ).addBoth(lambda x: None)
1112 if not args.no_bugreport:
1113 log.addObserver(ErrorReporter().emit)
1115 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)