1 from __future__ import division
15 if '--iocp' in sys.argv:
16 from twisted.internet import iocpreactor
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging, pack
26 from . import p2p, networks, web
27 import p2pool, p2pool.data as p2pool_data
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
33 work = yield bitcoind.rpc_getmemorypool()
34 except jsonrpc.Error, e:
35 if e.code == -32601: # Method not found
36 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37 raise deferral.RetrySilentlyException()
39 packed_transactions = [x.decode('hex') for x in work['transactions']]
40 defer.returnValue(dict(
41 version=work['version'],
42 previous_block_hash=int(work['previousblockhash'], 16),
43 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44 merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45 subsidy=work['coinbasevalue'],
47 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51 class WorkerBridge(worker_interface.WorkerBridge):
52 def __init__(self, lp_signal, my_pubkey_hash, net, donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes):
53 worker_interface.WorkerBridge.__init__(self)
54 self.new_work_event = lp_signal
55 self.recent_shares_ts_work = []
57 self.my_pubkey_hash = my_pubkey_hash
59 self.donation_percentage = donation_percentage
60 self.current_work = current_work
61 self.tracker = tracker
62 self.my_share_hashes = my_share_hashes
63 self.my_doa_share_hashes = my_doa_share_hashes
64 self.worker_fee = worker_fee
65 self.p2p_node = p2p_node
66 self.submit_block = submit_block
67 self.compute_work = compute_work
68 self.shared_share_hashes = shared_share_hashes
70 self.pseudoshare_received = variable.Event()
71 self.share_received = variable.Event()
72 self.local_rate_monitor = math.RateMonitor(10*60)
74 self.removed_unstales_var = variable.Variable((0, 0, 0))
75 self.removed_doa_unstales_var = variable.Variable(0)
77 @tracker.verified.removed.watch
79 if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
80 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
81 self.removed_unstales_var.set((
82 self.removed_unstales_var.value[0] + 1,
83 self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
84 self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
86 if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.current_work.value['best_share_hash']):
87 self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1)
89 def get_stale_counts(self):
90 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
91 my_shares = len(self.my_share_hashes)
92 my_doa_shares = len(self.my_doa_share_hashes)
93 delta = self.tracker.verified.get_delta_to_last(self.current_work.value['best_share_hash'])
94 my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0]
95 my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value
96 orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1]
97 doas_recorded_in_chain = delta.my_dead_announce_count + self.removed_unstales_var.value[2]
99 my_shares_not_in_chain = my_shares - my_shares_in_chain
100 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
102 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
104 def get_user_details(self, request):
105 user = request.getUser() if request.getUser() is not None else ''
107 desired_pseudoshare_target = None
109 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
111 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
115 desired_share_target = 2**256 - 1
117 user, min_diff_str = user.rsplit('/', 1)
119 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
123 if random.uniform(0, 100) < self.worker_fee:
124 pubkey_hash = self.my_pubkey_hash
127 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT)
129 pubkey_hash = self.my_pubkey_hash
131 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
133 def preprocess_request(self, request):
134 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
135 return pubkey_hash, desired_share_target, desired_pseudoshare_target
137 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
138 if len(self.p2p_node.peers) == 0 and self.net.PERSIST:
139 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
140 if self.current_work.value['best_share_hash'] is None and self.net.PERSIST:
141 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
142 if time.time() > self.current_work.value['last_update'] + 60:
143 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
145 if self.current_work.value['mm_chains']:
146 tree, size = bitcoin_data.make_auxpow_tree(self.current_work.value['mm_chains'])
147 mm_hashes = [self.current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
148 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
149 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
153 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in self.current_work.value['mm_chains'].iteritems()]
159 share_info, generate_tx = p2pool_data.Share.generate_transaction(
160 tracker=self.tracker,
162 previous_share_hash=self.current_work.value['best_share_hash'],
163 coinbase=(mm_data + self.current_work.value['coinbaseflags'])[:100],
164 nonce=random.randrange(2**32),
165 pubkey_hash=pubkey_hash,
166 subsidy=self.current_work.value['subsidy'],
167 donation=math.perfect_round(65535*self.donation_percentage/100),
168 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
169 'orphan' if orphans > orphans_recorded_in_chain else
170 'doa' if doas > doas_recorded_in_chain else
172 )(*self.get_stale_counts()),
175 block_target=self.current_work.value['bits'].target,
176 desired_timestamp=int(time.time() - self.current_work.value['clock_offset']),
177 desired_target=desired_share_target,
178 ref_merkle_link=dict(branch=[], index=0),
182 if desired_pseudoshare_target is None:
184 if len(self.recent_shares_ts_work) == 50:
185 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
187 target = min(target, int(2**256/hash_rate))
189 target = desired_pseudoshare_target
190 target = max(target, share_info['bits'].target)
191 for aux_work in self.current_work.value['mm_chains'].itervalues():
192 target = max(target, aux_work['target'])
193 target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE)
195 transactions = [generate_tx] + list(self.current_work.value['transactions'])
196 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
197 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), self.current_work.value['merkle_link'])
199 getwork_time = time.time()
200 merkle_link = self.current_work.value['merkle_link']
202 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
203 bitcoin_data.target_to_difficulty(target),
204 bitcoin_data.target_to_difficulty(share_info['bits'].target),
205 self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL,
206 len(self.current_work.value['transactions']),
209 bits = self.current_work.value['bits']
210 previous_block = self.current_work.value['previous_block']
211 ba = bitcoin_getwork.BlockAttempt(
212 version=self.current_work.value['version'],
213 previous_block=self.current_work.value['previous_block'],
214 merkle_root=merkle_root,
215 timestamp=self.current_work.value['time'],
216 bits=self.current_work.value['bits'],
220 received_header_hashes = set()
222 def got_response(header, request):
223 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
224 pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
226 if pow_hash <= header['bits'].target or p2pool.DEBUG:
227 self.submit_block(dict(header=header, txs=transactions), ignore_failure=False)
228 if pow_hash <= header['bits'].target:
230 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
233 log.err(None, 'Error while processing potential block:')
235 user, _, _, _ = self.get_user_details(request)
236 assert header['merkle_root'] == merkle_root
237 assert header['previous_block'] == previous_block
238 assert header['bits'] == bits
240 on_time = self.current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
242 for aux_work, index, hashes in mm_later:
244 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
245 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
246 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
247 bitcoin_data.aux_pow_type.pack(dict(
250 block_hash=header_hash,
251 merkle_link=merkle_link,
253 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
254 parent_block_header=header,
259 if result != (pow_hash <= aux_work['target']):
260 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
262 print 'Merged block submittal result: %s' % (result,)
265 log.err(err, 'Error submitting merged block:')
267 log.err(None, 'Error while processing merged mining POW:')
269 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
270 min_header = dict(header);del min_header['merkle_root']
271 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.Share.gentx_before_refhash)
272 share = p2pool_data.Share(self.net, None, dict(
273 min_header=min_header, share_info=share_info, hash_link=hash_link,
274 ref_merkle_link=dict(branch=[], index=0),
275 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
277 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
279 p2pool_data.format_hash(share.hash),
280 p2pool_data.format_hash(share.previous_hash),
281 time.time() - getwork_time,
282 ' DEAD ON ARRIVAL' if not on_time else '',
284 self.my_share_hashes.add(share.hash)
286 self.my_doa_share_hashes.add(share.hash)
288 self.tracker.add(share)
290 self.tracker.verified.add(share)
294 if pow_hash <= header['bits'].target or p2pool.DEBUG:
295 for peer in self.p2p_node.peers.itervalues():
296 peer.sendShares([share])
297 self.shared_share_hashes.add(share.hash)
299 log.err(None, 'Error forwarding block solution:')
301 self.share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
303 if pow_hash > target:
304 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
305 print ' Hash: %56x' % (pow_hash,)
306 print ' Target: %56x' % (target,)
307 elif header_hash in received_header_hashes:
308 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
310 received_header_hashes.add(header_hash)
312 self.pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
313 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
314 while len(self.recent_shares_ts_work) > 50:
315 self.recent_shares_ts_work.pop(0)
316 self.local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
320 return ba, got_response
322 @defer.inlineCallbacks
323 def main(args, net, datadir_path, merged_urls, worker_endpoint):
325 print 'p2pool (version %s)' % (p2pool.__version__,)
328 # connect to bitcoind over JSON-RPC and do initial getmemorypool
329 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
330 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
331 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
332 @deferral.retry('Error while checking Bitcoin connection:', 1)
333 @defer.inlineCallbacks
335 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
336 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
337 raise deferral.RetrySilentlyException()
338 temp_work = yield getwork(bitcoind)
339 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
340 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
341 raise deferral.RetrySilentlyException()
342 defer.returnValue(temp_work)
343 temp_work = yield check()
345 block_height_var = variable.Variable(None)
346 @defer.inlineCallbacks
348 block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
350 task.LoopingCall(poll_height).start(60*60)
353 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
354 print ' Current block height: %i' % (block_height_var.value,)
357 # connect to bitcoind over bitcoin-p2p
358 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
359 factory = bitcoin_p2p.ClientFactory(net.PARENT)
360 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
361 yield factory.getProtocol() # waits until handshake is successful
365 print 'Determining payout address...'
366 if args.pubkey_hash is None:
367 address_path = os.path.join(datadir_path, 'cached_payout_address')
369 if os.path.exists(address_path):
370 with open(address_path, 'rb') as f:
371 address = f.read().strip('\r\n')
372 print ' Loaded cached address: %s...' % (address,)
376 if address is not None:
377 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
378 if not res['isvalid'] or not res['ismine']:
379 print ' Cached address is either invalid or not controlled by local bitcoind!'
383 print ' Getting payout address from bitcoind...'
384 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
386 with open(address_path, 'wb') as f:
389 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
391 my_pubkey_hash = args.pubkey_hash
392 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
395 my_share_hashes = set()
396 my_doa_share_hashes = set()
398 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
399 shared_share_hashes = set()
400 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
401 known_verified = set()
402 print "Loading shares..."
403 for i, (mode, contents) in enumerate(ss.get_shares()):
405 if contents.hash in tracker.shares:
407 shared_share_hashes.add(contents.hash)
408 contents.time_seen = 0
409 tracker.add(contents)
410 if len(tracker.shares) % 1000 == 0 and tracker.shares:
411 print " %i" % (len(tracker.shares),)
412 elif mode == 'verified_hash':
413 known_verified.add(contents)
415 raise AssertionError()
416 print " ...inserting %i verified shares..." % (len(known_verified),)
417 for h in known_verified:
418 if h not in tracker.shares:
419 ss.forget_verified_share(h)
421 tracker.verified.add(tracker.shares[h])
422 print " ...done loading %i shares!" % (len(tracker.shares),)
424 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
425 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
426 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
428 print 'Initializing work...'
433 bitcoind_work = variable.Variable(None)
435 @defer.inlineCallbacks
437 work = yield getwork(bitcoind)
438 bitcoind_work.set(dict(
439 version=work['version'],
440 previous_block=work['previous_block_hash'],
442 coinbaseflags=work['coinbaseflags'],
444 transactions=work['transactions'],
445 merkle_link=work['merkle_link'],
446 subsidy=work['subsidy'],
447 clock_offset=time.time() - work['time'],
448 last_update=time.time(),
450 yield poll_bitcoind()
452 @defer.inlineCallbacks
455 flag = factory.new_block.get_deferred()
457 yield poll_bitcoind()
460 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
465 best_block_header = variable.Variable(None)
466 def handle_header(new_header):
467 # check that header matches current target
468 if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
470 bitcoind_best_block = bitcoind_work.value['previous_block']
471 if (best_block_header.value is None
473 new_header['previous_block'] == bitcoind_best_block and
474 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
475 ) # new is child of current and previous is current
477 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
478 best_block_header.value['previous_block'] != bitcoind_best_block
479 )): # new is current and previous is not a child of current
480 best_block_header.set(new_header)
481 @defer.inlineCallbacks
483 handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
484 bitcoind_work.changed.watch(lambda _: poll_header())
489 merged_work = variable.Variable({})
491 @defer.inlineCallbacks
492 def set_merged_work(merged_url, merged_userpass):
493 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
495 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
496 merged_work.set(dict(merged_work.value, **{auxblock['chainid']: dict(
497 hash=int(auxblock['hash'], 16),
498 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
499 merged_proxy=merged_proxy,
501 yield deferral.sleep(1)
502 for merged_url, merged_userpass in merged_urls:
503 set_merged_work(merged_url, merged_userpass)
505 @merged_work.changed.watch
506 def _(new_merged_work):
507 print 'Got new merged mining work!'
511 current_work = variable.Variable(None)
513 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
514 requested = expiring_dict.ExpiringDict(300)
515 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
517 best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
519 t = dict(bitcoind_work.value)
521 if (best_block_header.value is not None and
522 best_block_header.value['previous_block'] == t['previous_block'] and
523 net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(best_block_header.value)) <= t['bits'].target):
524 print 'Skipping from block %x to block %x!' % (best_block_header.value['previous_block'],
525 bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)))
527 version=best_block_header.value['version'],
528 previous_block=bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)),
529 bits=best_block_header.value['bits'], # not always true
531 time=best_block_header.value['timestamp'] + 600, # better way?
533 merkle_link=bitcoin_data.calculate_merkle_link([None], 0),
534 subsidy=net.PARENT.SUBSIDY_FUNC(block_height_var.value),
535 clock_offset=current_work.value['clock_offset'],
536 last_update=current_work.value['last_update'],
539 t['best_share_hash'] = best
540 t['mm_chains'] = merged_work.value
544 for peer2, share_hash in desired:
545 if share_hash not in tracker.tails: # was received in the time tracker.think was running
547 last_request_time, count = requested.get(share_hash, (None, 0))
548 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
550 potential_peers = set()
551 for head in tracker.tails[share_hash]:
552 potential_peers.update(peer_heads.get(head, set()))
553 potential_peers = [peer for peer in potential_peers if peer.connected2]
554 if count == 0 and peer2 is not None and peer2.connected2:
557 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
561 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
565 stops=list(set(tracker.heads) | set(
566 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
569 requested[share_hash] = t, count + 1
570 bitcoind_work.changed.watch(lambda _: compute_work())
571 merged_work.changed.watch(lambda _: compute_work())
572 best_block_header.changed.watch(lambda _: compute_work())
577 lp_signal = variable.Event()
579 @current_work.transitioned.watch
580 def _(before, after):
581 # trigger LP if version/previous_block/bits changed or transactions changed from nothing
582 if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits', 'best_share_hash', 'mm_chains']) or (not before['transactions'] and after['transactions']):
589 # setup p2p logic and join p2pool network
591 class Node(p2p.Node):
592 def handle_shares(self, shares, peer):
594 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
598 if share.hash in tracker.shares:
599 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
604 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
608 if shares and peer is not None:
609 peer_heads.setdefault(shares[0].hash, set()).add(peer)
615 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
617 def handle_share_hashes(self, hashes, peer):
620 for share_hash in hashes:
621 if share_hash in tracker.shares:
623 last_request_time, count = requested.get(share_hash, (None, 0))
624 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
626 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
627 get_hashes.append(share_hash)
628 requested[share_hash] = t, count + 1
630 if hashes and peer is not None:
631 peer_heads.setdefault(hashes[0], set()).add(peer)
633 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
635 def handle_get_shares(self, hashes, parents, stops, peer):
636 parents = min(parents, 1000//len(hashes))
639 for share_hash in hashes:
640 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
641 if share.hash in stops:
644 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
647 def handle_bestblock(self, header, peer):
648 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
649 raise p2p.PeerMisbehavingError('received block header fails PoW test')
650 handle_header(header)
652 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
653 def submit_block_p2p(block):
654 if factory.conn.value is None:
655 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
656 raise deferral.RetrySilentlyException()
657 factory.conn.value.send_block(block=block)
659 @deferral.retry('Error submitting block: (will retry)', 10, 10)
660 @defer.inlineCallbacks
661 def submit_block_rpc(block, ignore_failure):
662 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
663 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
664 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
665 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
667 def submit_block(block, ignore_failure):
668 submit_block_p2p(block)
669 submit_block_rpc(block, ignore_failure)
671 @tracker.verified.added.watch
673 if share.pow_hash <= share.header['bits'].target:
674 submit_block(share.as_block(tracker), ignore_failure=True)
676 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
679 if (get_height_rel_highest(share.header['previous_block']) > -5 or
680 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
681 broadcast_share(share.hash)
683 reactor.callLater(5, spread) # so get_height_rel_highest can update
685 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
687 @defer.inlineCallbacks
690 ip, port = x.split(':')
691 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
693 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
696 if os.path.exists(os.path.join(datadir_path, 'addrs')):
698 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
699 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
701 print >>sys.stderr, 'error parsing addrs'
702 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
704 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
706 print >>sys.stderr, "error reading addrs.txt"
707 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
710 if addr not in addrs:
711 addrs[addr] = (0, time.time(), time.time())
715 connect_addrs = set()
716 for addr_df in map(parse, args.p2pool_nodes):
718 connect_addrs.add((yield addr_df))
723 best_share_hash_func=lambda: current_work.value['best_share_hash'],
724 port=args.p2pool_port,
727 connect_addrs=connect_addrs,
728 max_incoming_conns=args.p2pool_conns,
733 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
734 f.write(json.dumps(p2p_node.addr_store.items()))
735 task.LoopingCall(save_addrs).start(60)
737 @best_block_header.changed.watch
739 for peer in p2p_node.peers.itervalues():
740 peer.send_bestblock(header=header)
742 def broadcast_share(share_hash):
744 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
745 if share.hash in shared_share_hashes:
747 shared_share_hashes.add(share.hash)
750 for peer in p2p_node.peers.itervalues():
751 peer.sendShares([share for share in shares if share.peer is not peer])
753 # send share when the chain changes to their chain
754 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
757 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
759 if share.hash in tracker.verified.shares:
760 ss.add_verified_hash(share.hash)
761 task.LoopingCall(save_shares).start(60)
767 @defer.inlineCallbacks
771 is_lan, lan_ip = yield ipdiscover.get_local_ip()
773 pm = yield portmapper.get_port_mapper()
774 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
775 except defer.TimeoutError:
779 log.err(None, 'UPnP error:')
780 yield deferral.sleep(random.expovariate(1/120))
783 # start listening for workers with a JSON-RPC server
785 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
787 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work.value['subsidy'], net)
789 wb = WorkerBridge(lp_signal, my_pubkey_hash, net, args.donation_percentage, current_work, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, compute_work, shared_share_hashes)
790 web_root = web.get_web_root(tracker, current_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received)
791 worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
793 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
795 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
803 print 'Started successfully!'
804 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
805 if args.donation_percentage > 0.51:
806 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
807 elif args.donation_percentage < 0.49:
808 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
810 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
811 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
815 if hasattr(signal, 'SIGALRM'):
816 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
817 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
819 signal.siginterrupt(signal.SIGALRM, False)
820 task.LoopingCall(signal.alarm, 30).start(1)
822 if args.irc_announce:
823 from twisted.words.protocols import irc
824 class IRCClient(irc.IRCClient):
825 nickname = 'p2pool%02i' % (random.randrange(100),)
826 channel = net.ANNOUNCE_CHANNEL
827 def lineReceived(self, line):
830 irc.IRCClient.lineReceived(self, line)
832 irc.IRCClient.signedOn(self)
833 self.factory.resetDelay()
834 self.join(self.channel)
835 @defer.inlineCallbacks
836 def new_share(share):
837 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
838 yield deferral.sleep(random.expovariate(1/60))
839 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
840 if message not in self.recent_messages:
841 self.say(self.channel, message)
842 self._remember_message(message)
843 self.watch_id = tracker.verified.added.watch(new_share)
844 self.recent_messages = []
845 def _remember_message(self, message):
846 self.recent_messages.append(message)
847 while len(self.recent_messages) > 100:
848 self.recent_messages.pop(0)
849 def privmsg(self, user, channel, message):
850 if channel == self.channel:
851 self._remember_message(message)
852 def connectionLost(self, reason):
853 tracker.verified.added.unwatch(self.watch_id)
854 print 'IRC connection lost:', reason.getErrorMessage()
855 class IRCClientFactory(protocol.ReconnectingClientFactory):
857 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
859 @defer.inlineCallbacks
864 yield deferral.sleep(3)
866 if time.time() > current_work.value['last_update'] + 60:
867 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work.value['last_update']),)
869 height = tracker.get_height(current_work.value['best_share_hash'])
870 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
872 len(tracker.verified.shares),
875 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
876 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
878 datums, dt = wb.local_rate_monitor.get_datums_in_last()
879 my_att_s = sum(datum['work']/dt for datum in datums)
880 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
881 math.format(int(my_att_s)),
883 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
884 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
888 (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
889 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
890 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
892 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
893 shares, stale_orphan_shares, stale_doa_shares,
894 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
895 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
896 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
898 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
899 math.format(int(real_att_s)),
901 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
904 for warning in p2pool_data.get_warnings(tracker, current_work, net):
905 print >>sys.stderr, '#'*40
906 print >>sys.stderr, '>>> Warning: ' + warning
907 print >>sys.stderr, '#'*40
909 if this_str != last_str or time.time() > last_time + 15:
912 last_time = time.time()
918 log.err(None, 'Fatal error:')
921 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
923 parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
924 parser.add_argument('--version', action='version', version=p2pool.__version__)
925 parser.add_argument('--net',
926 help='use specified network (default: bitcoin)',
927 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
928 parser.add_argument('--testnet',
929 help='''use the network's testnet''',
930 action='store_const', const=True, default=False, dest='testnet')
931 parser.add_argument('--debug',
932 help='enable debugging mode',
933 action='store_const', const=True, default=False, dest='debug')
934 parser.add_argument('-a', '--address',
935 help='generate payouts to this address (default: <address requested from bitcoind>)',
936 type=str, action='store', default=None, dest='address')
937 parser.add_argument('--datadir',
938 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
939 type=str, action='store', default=None, dest='datadir')
940 parser.add_argument('--logfile',
941 help='''log to this file (default: data/<NET>/log)''',
942 type=str, action='store', default=None, dest='logfile')
943 parser.add_argument('--merged',
944 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
945 type=str, action='append', default=[], dest='merged_urls')
946 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
947 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
948 type=float, action='store', default=0.5, dest='donation_percentage')
949 parser.add_argument('--iocp',
950 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
951 action='store_true', default=False, dest='iocp')
952 parser.add_argument('--irc-announce',
953 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
954 action='store_true', default=False, dest='irc_announce')
955 parser.add_argument('--no-bugreport',
956 help='disable submitting caught exceptions to the author',
957 action='store_true', default=False, dest='no_bugreport')
959 p2pool_group = parser.add_argument_group('p2pool interface')
960 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
961 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
962 type=int, action='store', default=None, dest='p2pool_port')
963 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
964 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
965 type=str, action='append', default=[], dest='p2pool_nodes')
966 parser.add_argument('--disable-upnp',
967 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
968 action='store_false', default=True, dest='upnp')
969 p2pool_group.add_argument('--max-conns', metavar='CONNS',
970 help='maximum incoming connections (default: 40)',
971 type=int, action='store', default=40, dest='p2pool_conns')
973 worker_group = parser.add_argument_group('worker interface')
974 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
975 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
976 type=str, action='store', default=None, dest='worker_endpoint')
977 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
978 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
979 type=float, action='store', default=0, dest='worker_fee')
981 bitcoind_group = parser.add_argument_group('bitcoind interface')
982 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
983 help='connect to this address (default: 127.0.0.1)',
984 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
985 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
986 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
987 type=int, action='store', default=None, dest='bitcoind_rpc_port')
988 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
989 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
990 type=int, action='store', default=None, dest='bitcoind_p2p_port')
992 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
993 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
994 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
996 args = parser.parse_args()
1001 net_name = args.net_name + ('_testnet' if args.testnet else '')
1002 net = networks.nets[net_name]
1004 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
1005 if not os.path.exists(datadir_path):
1006 os.makedirs(datadir_path)
1008 if len(args.bitcoind_rpc_userpass) > 2:
1009 parser.error('a maximum of two arguments are allowed')
1010 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
1012 if args.bitcoind_rpc_password is None:
1013 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
1014 parser.error('This network has no configuration file function. Manually enter your RPC password.')
1015 conf_path = net.PARENT.CONF_FILE_FUNC()
1016 if not os.path.exists(conf_path):
1017 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
1018 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
1021 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
1022 with open(conf_path, 'rb') as f:
1023 cp = ConfigParser.RawConfigParser()
1024 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
1025 for conf_name, var_name, var_type in [
1026 ('rpcuser', 'bitcoind_rpc_username', str),
1027 ('rpcpassword', 'bitcoind_rpc_password', str),
1028 ('rpcport', 'bitcoind_rpc_port', int),
1029 ('port', 'bitcoind_p2p_port', int),
1031 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
1032 setattr(args, var_name, var_type(cp.get('x', conf_name)))
1033 if args.bitcoind_rpc_password is None:
1034 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
1036 if args.bitcoind_rpc_username is None:
1037 args.bitcoind_rpc_username = ''
1039 if args.bitcoind_rpc_port is None:
1040 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1042 if args.bitcoind_p2p_port is None:
1043 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1045 if args.p2pool_port is None:
1046 args.p2pool_port = net.P2P_PORT
1048 if args.worker_endpoint is None:
1049 worker_endpoint = '', net.WORKER_PORT
1050 elif ':' not in args.worker_endpoint:
1051 worker_endpoint = '', int(args.worker_endpoint)
1053 addr, port = args.worker_endpoint.rsplit(':', 1)
1054 worker_endpoint = addr, int(port)
1056 if args.address is not None:
1058 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1059 except Exception, e:
1060 parser.error('error parsing address: ' + repr(e))
1062 args.pubkey_hash = None
1064 def separate_url(url):
1065 s = urlparse.urlsplit(url)
1066 if '@' not in s.netloc:
1067 parser.error('merged url netloc must contain an "@"')
1068 userpass, new_netloc = s.netloc.rsplit('@', 1)
1069 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1070 merged_urls = map(separate_url, args.merged_urls)
1072 if args.logfile is None:
1073 args.logfile = os.path.join(datadir_path, 'log')
1075 logfile = logging.LogFile(args.logfile)
1076 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1077 sys.stdout = logging.AbortPipe(pipe)
1078 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1079 if hasattr(signal, "SIGUSR1"):
1080 def sigusr1(signum, frame):
1081 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1083 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1084 signal.signal(signal.SIGUSR1, sigusr1)
1085 task.LoopingCall(logfile.reopen).start(5)
1087 class ErrorReporter(object):
1089 self.last_sent = None
1091 def emit(self, eventDict):
1092 if not eventDict["isError"]:
1095 if self.last_sent is not None and time.time() < self.last_sent + 5:
1097 self.last_sent = time.time()
1099 if 'failure' in eventDict:
1100 text = ((eventDict.get('why') or 'Unhandled Error')
1101 + '\n' + eventDict['failure'].getTraceback())
1103 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1105 from twisted.web import client
1107 url='http://u.forre.st/p2pool_error.cgi',
1109 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1111 ).addBoth(lambda x: None)
1112 if not args.no_bugreport:
1113 log.addObserver(ErrorReporter().emit)
1115 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)