1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((lambda x: x['blockcount'] if 'blockcount' in x else x['height'])((yield bitcoind.rpc_getblock('%x' % (block_hash,)))))))
172 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173 def get_height_rel_highest(block_hash):
174 this_height = height_cacher.call_now(block_hash, 0)
175 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177 return this_height - best_height_cached.value
179 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
181 def set_real_work2():
182 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184 t = dict(pre_current_work.value)
185 t['best_share_hash'] = best
186 t['mm_chains'] = pre_merged_work.value
190 for peer2, share_hash in desired:
191 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193 last_request_time, count = requested.get(share_hash, (None, 0))
194 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196 potential_peers = set()
197 for head in tracker.tails[share_hash]:
198 potential_peers.update(peer_heads.get(head, set()))
199 potential_peers = [peer for peer in potential_peers if peer.connected2]
200 if count == 0 and peer2 is not None and peer2.connected2:
203 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
211 stops=list(set(tracker.heads) | set(
212 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215 requested[share_hash] = t, count + 1
216 pre_current_work.changed.watch(lambda _: set_real_work2())
217 pre_merged_work.changed.watch(lambda _: set_real_work2())
223 @defer.inlineCallbacks
224 def set_merged_work(merged_url, merged_userpass):
225 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
227 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229 hash=int(auxblock['hash'], 16),
230 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231 merged_proxy=merged_proxy,
233 yield deferral.sleep(1)
234 for merged_url, merged_userpass in merged_urls:
235 set_merged_work(merged_url, merged_userpass)
237 @pre_merged_work.changed.watch
238 def _(new_merged_work):
239 print 'Got new merged mining work!'
241 # setup p2p logic and join p2pool network
243 class Node(p2p.Node):
244 def handle_shares(self, shares, peer):
246 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250 if share.hash in tracker.shares:
251 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260 if shares and peer is not None:
261 peer_heads.setdefault(shares[0].hash, set()).add(peer)
267 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269 def handle_share_hashes(self, hashes, peer):
272 for share_hash in hashes:
273 if share_hash in tracker.shares:
275 last_request_time, count = requested.get(share_hash, (None, 0))
276 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279 get_hashes.append(share_hash)
280 requested[share_hash] = t, count + 1
282 if hashes and peer is not None:
283 peer_heads.setdefault(hashes[0], set()).add(peer)
285 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287 def handle_get_shares(self, hashes, parents, stops, peer):
288 parents = min(parents, 1000//len(hashes))
291 for share_hash in hashes:
292 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293 if share.hash in stops:
296 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 peer.sendShares(shares)
299 @deferral.retry('Error submitting block: (will retry)', 10, 10)
300 @defer.inlineCallbacks
301 def submit_block(block, ignore_failure):
302 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
303 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
304 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
305 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (result, expected_result)
307 @tracker.verified.added.watch
309 if share.pow_hash <= share.header['bits'].target:
310 submit_block(share.as_block(tracker), ignore_failure=True)
312 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
314 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
316 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
318 @defer.inlineCallbacks
321 ip, port = x.split(':')
322 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
324 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
327 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
329 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
331 print >>sys.stderr, "error reading addrs"
332 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
335 if addr not in addrs:
336 addrs[addr] = (0, time.time(), time.time())
340 connect_addrs = set()
341 for addr_df in map(parse, args.p2pool_nodes):
343 connect_addrs.add((yield addr_df))
348 best_share_hash_func=lambda: current_work.value['best_share_hash'],
349 port=args.p2pool_port,
352 connect_addrs=connect_addrs,
356 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
358 # send share when the chain changes to their chain
359 def work_changed(new_work):
360 #print 'Work changed:', new_work
362 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
363 if share.hash in shared_share_hashes:
365 shared_share_hashes.add(share.hash)
368 for peer in p2p_node.peers.itervalues():
369 peer.sendShares([share for share in shares if share.peer is not peer])
371 current_work.changed.watch(work_changed)
374 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
376 if share.hash in tracker.verified.shares:
377 ss.add_verified_hash(share.hash)
378 task.LoopingCall(save_shares).start(60)
384 @defer.inlineCallbacks
388 is_lan, lan_ip = yield ipdiscover.get_local_ip()
390 pm = yield portmapper.get_port_mapper()
391 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
392 except defer.TimeoutError:
396 log.err(None, 'UPnP error:')
397 yield deferral.sleep(random.expovariate(1/120))
400 # start listening for workers with a JSON-RPC server
402 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
404 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
405 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
406 vip_pass = f.read().strip('\r\n')
408 vip_pass = '%016x' % (random.randrange(2**64),)
409 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
411 print ' Worker password:', vip_pass, '(only required for generating graphs)'
415 removed_unstales_var = variable.Variable((0, 0, 0))
416 removed_doa_unstales_var = variable.Variable(0)
417 @tracker.verified.removed.watch
419 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
420 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
421 removed_unstales_var.set((
422 removed_unstales_var.value[0] + 1,
423 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
424 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
426 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
427 removed_doa_unstales.set(removed_doa_unstales.value + 1)
429 def get_stale_counts():
430 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
431 my_shares = len(my_share_hashes)
432 my_doa_shares = len(my_doa_share_hashes)
433 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
434 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
435 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
436 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
437 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
439 my_shares_not_in_chain = my_shares - my_shares_in_chain
440 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
442 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
445 pseudoshare_received = variable.Event()
446 local_rate_monitor = math.RateMonitor(10*60)
448 class WorkerBridge(worker_interface.WorkerBridge):
450 worker_interface.WorkerBridge.__init__(self)
451 self.new_work_event = current_work.changed
452 self.recent_shares_ts_work = []
454 def preprocess_request(self, request):
455 user = request.getUser() if request.getUser() is not None else ''
457 desired_pseudoshare_target = None
459 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
461 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
465 desired_share_target = 2**256 - 1
467 user, min_diff_str = user.rsplit('/', 1)
469 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
473 if random.uniform(0, 100) < args.worker_fee:
474 pubkey_hash = my_pubkey_hash
477 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
479 pubkey_hash = my_pubkey_hash
481 return pubkey_hash, desired_share_target, desired_pseudoshare_target
483 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
484 if len(p2p_node.peers) == 0 and net.PERSIST:
485 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
486 if current_work.value['best_share_hash'] is None and net.PERSIST:
487 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
488 if time.time() > current_work2.value['last_update'] + 60:
489 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
491 if current_work.value['mm_chains']:
492 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
493 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
494 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
495 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
499 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
504 share_info, generate_tx = p2pool_data.generate_transaction(
507 previous_share_hash=current_work.value['best_share_hash'],
508 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
509 nonce=random.randrange(2**32),
510 pubkey_hash=pubkey_hash,
511 subsidy=current_work2.value['subsidy'],
512 donation=math.perfect_round(65535*args.donation_percentage/100),
513 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
514 253 if orphans > orphans_recorded_in_chain else
515 254 if doas > doas_recorded_in_chain else
517 )(*get_stale_counts()),
519 block_target=current_work.value['bits'].target,
520 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
521 desired_target=desired_share_target,
525 target = net.PARENT.SANE_MAX_TARGET
526 if desired_pseudoshare_target is None:
527 if len(self.recent_shares_ts_work) == 50:
528 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
529 target = min(target, 2**256//hash_rate)
531 target = min(target, desired_pseudoshare_target)
532 target = max(target, share_info['bits'].target)
533 for aux_work in current_work.value['mm_chains'].itervalues():
534 target = max(target, aux_work['target'])
536 transactions = [generate_tx] + list(current_work2.value['transactions'])
537 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
538 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
540 getwork_time = time.time()
541 merkle_branch = current_work2.value['merkle_branch']
543 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
544 bitcoin_data.target_to_difficulty(target),
545 bitcoin_data.target_to_difficulty(share_info['bits'].target),
546 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
547 len(current_work2.value['transactions']),
550 ba = bitcoin_getwork.BlockAttempt(
551 version=current_work.value['version'],
552 previous_block=current_work.value['previous_block'],
553 merkle_root=merkle_root,
554 timestamp=current_work2.value['time'],
555 bits=current_work.value['bits'],
559 received_header_hashes = set()
561 def got_response(header, request):
562 assert header['merkle_root'] == merkle_root
564 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
565 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
566 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
569 if pow_hash <= header['bits'].target or p2pool.DEBUG:
570 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
571 if pow_hash <= header['bits'].target:
573 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
575 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
577 log.err(None, 'Error while processing potential block:')
579 for aux_work, index, hashes in mm_later:
581 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
582 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
583 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
584 bitcoin_data.aux_pow_type.pack(dict(
587 block_hash=header_hash,
588 merkle_branch=merkle_branch,
591 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
593 parent_block_header=header,
598 if result != (pow_hash <= aux_work['target']):
599 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
601 print 'Merged block submittal result: %s' % (result,)
604 log.err(err, 'Error submitting merged block:')
606 log.err(None, 'Error while processing merged mining POW:')
608 if pow_hash <= share_info['bits'].target:
609 min_header = dict(header);del min_header['merkle_root']
610 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
611 share = p2pool_data.Share(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
613 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
615 p2pool_data.format_hash(share.hash),
616 p2pool_data.format_hash(share.previous_hash),
617 time.time() - getwork_time,
618 ' DEAD ON ARRIVAL' if not on_time else '',
620 my_share_hashes.add(share.hash)
622 my_doa_share_hashes.add(share.hash)
626 tracker.verified.add(share)
630 if pow_hash <= header['bits'].target or p2pool.DEBUG:
631 for peer in p2p_node.peers.itervalues():
632 peer.sendShares([share])
633 shared_share_hashes.add(share.hash)
635 log.err(None, 'Error forwarding block solution:')
637 if pow_hash > target:
638 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
639 print ' Hash: %56x' % (pow_hash,)
640 print ' Target: %56x' % (target,)
641 elif header_hash in received_header_hashes:
642 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
644 received_header_hashes.add(header_hash)
646 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
647 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
648 while len(self.recent_shares_ts_work) > 50:
649 self.recent_shares_ts_work.pop(0)
650 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
654 return ba, got_response
656 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
658 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
659 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
661 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
663 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
670 @defer.inlineCallbacks
673 flag = factory.new_block.get_deferred()
675 yield set_real_work1()
678 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
683 print 'Started successfully!'
687 if hasattr(signal, 'SIGALRM'):
688 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
689 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
691 signal.siginterrupt(signal.SIGALRM, False)
692 task.LoopingCall(signal.alarm, 30).start(1)
694 if args.irc_announce:
695 from twisted.words.protocols import irc
696 class IRCClient(irc.IRCClient):
697 nickname = 'p2pool%02i' % (random.randrange(100),)
698 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
699 def lineReceived(self, line):
701 irc.IRCClient.lineReceived(self, line)
703 irc.IRCClient.signedOn(self)
704 self.factory.resetDelay()
705 self.join(self.channel)
706 self.watch_id = tracker.verified.added.watch(self._new_share)
707 self.announced_hashes = set()
708 self.delayed_messages = {}
709 def privmsg(self, user, channel, message):
710 if channel == self.channel and message in self.delayed_messages:
711 self.delayed_messages.pop(message).cancel()
712 def _new_share(self, share):
713 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
714 self.announced_hashes.add(share.header_hash)
715 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
716 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
717 def connectionLost(self, reason):
718 tracker.verified.added.unwatch(self.watch_id)
719 print 'IRC connection lost:', reason.getErrorMessage()
720 class IRCClientFactory(protocol.ReconnectingClientFactory):
722 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
724 @defer.inlineCallbacks
729 yield deferral.sleep(3)
731 if time.time() > current_work2.value['last_update'] + 60:
732 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
734 height = tracker.get_height(current_work.value['best_share_hash'])
735 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
737 len(tracker.verified.shares),
740 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
741 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
743 datums, dt = local_rate_monitor.get_datums_in_last()
744 my_att_s = sum(datum['work']/dt for datum in datums)
745 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
746 math.format(int(my_att_s)),
748 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
749 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
753 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
754 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
755 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
757 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
758 shares, stale_orphan_shares, stale_doa_shares,
759 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
760 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
761 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
763 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
764 math.format(int(real_att_s)),
766 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
769 if this_str != last_str or time.time() > last_time + 15:
772 last_time = time.time()
777 log.err(None, 'Fatal error:')
781 class FixedArgumentParser(argparse.ArgumentParser):
782 def _read_args_from_files(self, arg_strings):
783 # expand arguments referencing files
785 for arg_string in arg_strings:
787 # for regular arguments, just add them back into the list
788 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
789 new_arg_strings.append(arg_string)
791 # replace arguments referencing files with the file content
794 args_file = open(arg_string[1:])
797 for arg_line in args_file.read().splitlines():
798 for arg in self.convert_arg_line_to_args(arg_line):
799 arg_strings.append(arg)
800 arg_strings = self._read_args_from_files(arg_strings)
801 new_arg_strings.extend(arg_strings)
805 err = sys.exc_info()[1]
808 # return the modified argument list
809 return new_arg_strings
811 def convert_arg_line_to_args(self, arg_line):
812 return [arg for arg in arg_line.split() if arg.strip()]
815 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
817 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
818 parser.add_argument('--version', action='version', version=p2pool.__version__)
819 parser.add_argument('--net',
820 help='use specified network (default: bitcoin)',
821 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
822 parser.add_argument('--testnet',
823 help='''use the network's testnet''',
824 action='store_const', const=True, default=False, dest='testnet')
825 parser.add_argument('--debug',
826 help='enable debugging mode',
827 action='store_const', const=True, default=False, dest='debug')
828 parser.add_argument('-a', '--address',
829 help='generate payouts to this address (default: <address requested from bitcoind>)',
830 type=str, action='store', default=None, dest='address')
831 parser.add_argument('--datadir',
832 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
833 type=str, action='store', default=None, dest='datadir')
834 parser.add_argument('--logfile',
835 help='''log to this file (default: data/<NET>/log)''',
836 type=str, action='store', default=None, dest='logfile')
837 parser.add_argument('--merged',
838 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
839 type=str, action='append', default=[], dest='merged_urls')
840 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
841 help='donate this percentage of work to author of p2pool (default: 0.5)',
842 type=float, action='store', default=0.5, dest='donation_percentage')
843 parser.add_argument('--irc-announce',
844 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
845 action='store_true', default=False, dest='irc_announce')
847 p2pool_group = parser.add_argument_group('p2pool interface')
848 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
849 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
850 type=int, action='store', default=None, dest='p2pool_port')
851 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
852 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
853 type=str, action='append', default=[], dest='p2pool_nodes')
854 parser.add_argument('--disable-upnp',
855 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
856 action='store_false', default=True, dest='upnp')
858 worker_group = parser.add_argument_group('worker interface')
859 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
860 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
861 type=str, action='store', default=None, dest='worker_endpoint')
862 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
863 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
864 type=float, action='store', default=0, dest='worker_fee')
866 bitcoind_group = parser.add_argument_group('bitcoind interface')
867 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
868 help='connect to this address (default: 127.0.0.1)',
869 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
870 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
871 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
872 type=int, action='store', default=None, dest='bitcoind_rpc_port')
873 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
874 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
875 type=int, action='store', default=None, dest='bitcoind_p2p_port')
877 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
878 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
879 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
881 args = parser.parse_args()
886 net_name = args.net_name + ('_testnet' if args.testnet else '')
887 net = networks.nets[net_name]
889 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
890 if not os.path.exists(datadir_path):
891 os.makedirs(datadir_path)
893 if len(args.bitcoind_rpc_userpass) > 2:
894 parser.error('a maximum of two arguments are allowed')
895 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
897 if args.bitcoind_rpc_password is None:
898 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
899 parser.error('This network has no configuration file function. Manually enter your RPC password.')
900 conf_path = net.PARENT.CONF_FILE_FUNC()
901 if not os.path.exists(conf_path):
902 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
903 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
906 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
907 with open(conf_path, 'rb') as f:
908 cp = ConfigParser.RawConfigParser()
909 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
910 for conf_name, var_name, var_type in [
911 ('rpcuser', 'bitcoind_rpc_username', str),
912 ('rpcpassword', 'bitcoind_rpc_password', str),
913 ('rpcport', 'bitcoind_rpc_port', int),
914 ('port', 'bitcoind_p2p_port', int),
916 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
917 setattr(args, var_name, var_type(cp.get('x', conf_name)))
918 if args.bitcoind_rpc_password is None:
919 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
921 if args.bitcoind_rpc_username is None:
922 args.bitcoind_rpc_username = ''
924 if args.bitcoind_rpc_port is None:
925 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
927 if args.bitcoind_p2p_port is None:
928 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
930 if args.p2pool_port is None:
931 args.p2pool_port = net.P2P_PORT
933 if args.worker_endpoint is None:
934 worker_endpoint = '', net.WORKER_PORT
935 elif ':' not in args.worker_endpoint:
936 worker_endpoint = '', int(args.worker_endpoint)
938 addr, port = args.worker_endpoint.rsplit(':', 1)
939 worker_endpoint = addr, int(port)
941 if args.address is not None:
943 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
945 parser.error('error parsing address: ' + repr(e))
947 args.pubkey_hash = None
949 def separate_url(url):
950 s = urlparse.urlsplit(url)
951 if '@' not in s.netloc:
952 parser.error('merged url netloc must contain an "@"')
953 userpass, new_netloc = s.netloc.rsplit('@', 1)
954 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
955 merged_urls = map(separate_url, args.merged_urls)
957 if args.logfile is None:
958 args.logfile = os.path.join(datadir_path, 'log')
960 logfile = logging.LogFile(args.logfile)
961 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
962 sys.stdout = logging.AbortPipe(pipe)
963 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
964 if hasattr(signal, "SIGUSR1"):
965 def sigusr1(signum, frame):
966 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
968 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
969 signal.signal(signal.SIGUSR1, sigusr1)
970 task.LoopingCall(logfile.reopen).start(5)
972 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)