1 from __future__ import division
15 from twisted.internet import defer, error, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((yield bitcoind.rpc_getblock('%x' % (block_hash,)))['blockcount'])))
172 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173 def get_height_rel_highest(block_hash):
174 this_height = height_cacher.call_now(block_hash, 0)
175 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177 return this_height - best_height_cached.value
179 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
181 def set_real_work2():
182 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184 t = dict(pre_current_work.value)
185 t['best_share_hash'] = best
186 t['mm_chains'] = pre_merged_work.value
190 for peer2, share_hash in desired:
191 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193 last_request_time, count = requested.get(share_hash, (None, 0))
194 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196 potential_peers = set()
197 for head in tracker.tails[share_hash]:
198 potential_peers.update(peer_heads.get(head, set()))
199 potential_peers = [peer for peer in potential_peers if peer.connected2]
200 if count == 0 and peer2 is not None and peer2.connected2:
203 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
211 stops=list(set(tracker.heads) | set(
212 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215 requested[share_hash] = t, count + 1
216 pre_current_work.changed.watch(lambda _: set_real_work2())
217 pre_merged_work.changed.watch(lambda _: set_real_work2())
223 @defer.inlineCallbacks
224 def set_merged_work(merged_url, merged_userpass):
225 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
227 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229 hash=int(auxblock['hash'], 16),
230 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231 merged_proxy=merged_proxy,
233 yield deferral.sleep(1)
234 for merged_url, merged_userpass in merged_urls:
235 set_merged_work(merged_url, merged_userpass)
237 @pre_merged_work.changed.watch
238 def _(new_merged_work):
239 print 'Got new merged mining work!'
241 # setup p2p logic and join p2pool network
243 class Node(p2p.Node):
244 def handle_shares(self, shares, peer):
246 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250 if share.hash in tracker.shares:
251 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260 if shares and peer is not None:
261 peer_heads.setdefault(shares[0].hash, set()).add(peer)
267 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269 def handle_share_hashes(self, hashes, peer):
272 for share_hash in hashes:
273 if share_hash in tracker.shares:
275 last_request_time, count = requested.get(share_hash, (None, 0))
276 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279 get_hashes.append(share_hash)
280 requested[share_hash] = t, count + 1
282 if hashes and peer is not None:
283 peer_heads.setdefault(hashes[0], set()).add(peer)
285 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287 def handle_get_shares(self, hashes, parents, stops, peer):
288 parents = min(parents, 1000//len(hashes))
291 for share_hash in hashes:
292 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293 if share.hash in stops:
296 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 peer.sendShares(shares)
299 @tracker.verified.added.watch
301 if share.pow_hash <= share.header['bits'].target:
302 if factory.conn.value is not None:
303 factory.conn.value.send_block(block=share.as_block(tracker))
305 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
307 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
309 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
311 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
313 @defer.inlineCallbacks
316 ip, port = x.split(':')
317 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
319 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
322 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
324 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
326 print >>sys.stderr, "error reading addrs"
327 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
330 if addr not in addrs:
331 addrs[addr] = (0, time.time(), time.time())
335 connect_addrs = set()
336 for addr_df in map(parse, args.p2pool_nodes):
338 connect_addrs.add((yield addr_df))
343 best_share_hash_func=lambda: current_work.value['best_share_hash'],
344 port=args.p2pool_port,
347 connect_addrs=connect_addrs,
352 open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
353 task.LoopingCall(save_addrs).start(60)
355 # send share when the chain changes to their chain
356 def work_changed(new_work):
357 #print 'Work changed:', new_work
359 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
360 if share.hash in shared_share_hashes:
362 shared_share_hashes.add(share.hash)
365 for peer in p2p_node.peers.itervalues():
366 peer.sendShares([share for share in shares if share.peer is not peer])
368 current_work.changed.watch(work_changed)
371 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
373 if share.hash in tracker.verified.shares:
374 ss.add_verified_hash(share.hash)
375 task.LoopingCall(save_shares).start(60)
380 @defer.inlineCallbacks
384 is_lan, lan_ip = yield ipdiscover.get_local_ip()
386 pm = yield portmapper.get_port_mapper()
387 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
388 except defer.TimeoutError:
392 log.err(None, "UPnP error:")
393 yield deferral.sleep(random.expovariate(1/120))
398 # start listening for workers with a JSON-RPC server
400 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
402 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
403 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
404 vip_pass = f.read().strip('\r\n')
406 vip_pass = '%016x' % (random.randrange(2**64),)
407 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
409 print ' Worker password:', vip_pass, '(only required for generating graphs)'
413 removed_unstales_var = variable.Variable((0, 0, 0))
414 removed_doa_unstales_var = variable.Variable(0)
415 @tracker.verified.removed.watch
417 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
418 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
419 removed_unstales_var.set((
420 removed_unstales_var.value[0] + 1,
421 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
422 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
424 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
425 removed_doa_unstales.set(removed_doa_unstales.value + 1)
427 def get_stale_counts():
428 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
429 my_shares = len(my_share_hashes)
430 my_doa_shares = len(my_doa_share_hashes)
431 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
432 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
433 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
434 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
435 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
437 my_shares_not_in_chain = my_shares - my_shares_in_chain
438 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
440 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
443 local_rate_monitor = math.RateMonitor(10*60)
445 class WorkerBridge(worker_interface.WorkerBridge):
447 worker_interface.WorkerBridge.__init__(self)
448 self.new_work_event = current_work.changed
449 self.recent_shares_ts_work = []
451 def preprocess_request(self, request):
452 user = request.getUser() if request.getUser() is not None else ''
453 pubkey_hash = my_pubkey_hash
454 max_target = 2**256 - 1
456 user, min_diff_str = user.rsplit('/', 1)
458 max_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
462 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
465 if random.uniform(0, 100) < args.worker_fee:
466 pubkey_hash = my_pubkey_hash
467 return pubkey_hash, max_target
469 def get_work(self, pubkey_hash, max_target):
470 if len(p2p_node.peers) == 0 and net.PERSIST:
471 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
472 if current_work.value['best_share_hash'] is None and net.PERSIST:
473 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
474 if time.time() > current_work2.value['last_update'] + 60:
475 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
477 if current_work.value['mm_chains']:
478 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
479 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
480 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
481 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
485 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
490 new = time.time() > net.SWITCH_TIME
493 share_info, generate_tx = p2pool_data.new_generate_transaction(
496 previous_share_hash=current_work.value['best_share_hash'],
497 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
498 nonce=random.randrange(2**32),
499 pubkey_hash=pubkey_hash,
500 subsidy=current_work2.value['subsidy'],
501 donation=math.perfect_round(65535*args.donation_percentage/100),
502 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
503 253 if orphans > orphans_recorded_in_chain else
504 254 if doas > doas_recorded_in_chain else
506 )(*get_stale_counts()),
508 block_target=current_work.value['bits'].target,
509 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
510 desired_target=max_target,
514 share_info, generate_tx = p2pool_data.generate_transaction(
517 previous_share_hash=current_work.value['best_share_hash'],
518 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
519 nonce=struct.pack('<Q', random.randrange(2**64)),
520 new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
521 subsidy=current_work2.value['subsidy'],
522 donation=math.perfect_round(65535*args.donation_percentage/100),
523 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
524 253 if orphans > orphans_recorded_in_chain else
525 254 if doas > doas_recorded_in_chain else
527 )(*get_stale_counts()),
529 block_target=current_work.value['bits'].target,
530 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
534 target = net.PARENT.SANE_MAX_TARGET
535 if len(self.recent_shares_ts_work) == 50:
536 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
537 target = min(target, 2**256//(hash_rate * 5))
538 target = max(target, share_info['bits'].target)
539 for aux_work in current_work.value['mm_chains'].itervalues():
540 target = max(target, aux_work['target'])
542 transactions = [generate_tx] + list(current_work2.value['transactions'])
543 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
544 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
546 getwork_time = time.time()
547 merkle_branch = current_work2.value['merkle_branch']
549 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
550 bitcoin_data.target_to_difficulty(target),
551 bitcoin_data.target_to_difficulty(share_info['bits'].target),
552 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
553 len(current_work2.value['transactions']),
556 ba = bitcoin_getwork.BlockAttempt(
557 version=current_work.value['version'],
558 previous_block=current_work.value['previous_block'],
559 merkle_root=merkle_root,
560 timestamp=current_work2.value['time'],
561 bits=current_work.value['bits'],
565 received_header_hashes = set()
567 def got_response(header, request):
568 assert header['merkle_root'] == merkle_root
570 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
571 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
572 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
575 if pow_hash <= header['bits'].target or p2pool.DEBUG:
576 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
578 if factory.conn.value is None:
579 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
580 raise deferral.RetrySilentlyException()
581 factory.conn.value.send_block(block=dict(header=header, txs=transactions))
583 if pow_hash <= header['bits'].target:
585 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
587 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
589 log.err(None, 'Error while processing potential block:')
591 for aux_work, index, hashes in mm_later:
593 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
594 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
595 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
596 bitcoin_data.aux_pow_type.pack(dict(
599 block_hash=header_hash,
600 merkle_branch=merkle_branch,
603 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
605 parent_block_header=header,
610 if result != (pow_hash <= aux_work['target']):
611 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
613 print 'Merged block submittal result: %s' % (result,)
616 log.err(err, 'Error submitting merged block:')
618 log.err(None, 'Error while processing merged mining POW:')
620 if pow_hash <= share_info['bits'].target:
622 min_header = dict(header);del min_header['merkle_root']
623 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
624 share = p2pool_data.NewShare(net, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
626 share = p2pool_data.Share(net, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
627 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
629 p2pool_data.format_hash(share.hash),
630 p2pool_data.format_hash(share.previous_hash),
631 time.time() - getwork_time,
632 ' DEAD ON ARRIVAL' if not on_time else '',
634 my_share_hashes.add(share.hash)
636 my_doa_share_hashes.add(share.hash)
640 tracker.verified.add(share)
644 if pow_hash <= header['bits'].target or p2pool.DEBUG:
645 for peer in p2p_node.peers.itervalues():
646 peer.sendShares([share])
647 shared_share_hashes.add(share.hash)
649 log.err(None, 'Error forwarding block solution:')
651 if pow_hash <= target and header_hash not in received_header_hashes:
652 reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
653 if request.getPassword() == vip_pass:
654 reactor.callLater(1, grapher.add_localminer_point, request.getUser(), bitcoin_data.target_to_average_attempts(target), not on_time)
655 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
656 while len(self.recent_shares_ts_work) > 50:
657 self.recent_shares_ts_work.pop(0)
658 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
660 if header_hash in received_header_hashes:
661 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
662 received_header_hashes.add(header_hash)
664 if pow_hash > target:
665 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
666 print ' Hash: %56x' % (pow_hash,)
667 print ' Target: %56x' % (target,)
671 return ba, got_response
674 def get_current_txouts():
675 weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'],
676 min(tracker.get_height(current_work.value['best_share_hash']), net.REAL_CHAIN_LENGTH),
677 65535*net.SPREAD*bitcoin_data.target_to_average_attempts(current_work.value['bits'].target),
679 res = dict((script, current_work2.value['subsidy']*weight//total_weight) for script, weight in weights.iteritems())
680 res[p2pool_data.DONATION_SCRIPT] = res.get(p2pool_data.DONATION_SCRIPT, 0) + current_work2.value['subsidy'] - sum(res.itervalues())
683 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks)
684 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
686 def attempt_listen():
688 reactor.listenTCP(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
689 except error.CannotListenError, e:
690 print >>sys.stderr, 'Error binding to worker port: %s. Retrying in 1 second.' % (e.socketError,)
691 reactor.callLater(1, attempt_listen)
693 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
701 @defer.inlineCallbacks
704 flag = factory.new_block.get_deferred()
706 yield set_real_work1()
709 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
714 print 'Started successfully!'
718 if hasattr(signal, 'SIGALRM'):
719 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
720 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
722 signal.siginterrupt(signal.SIGALRM, False)
723 task.LoopingCall(signal.alarm, 30).start(1)
725 if args.irc_announce:
726 from twisted.words.protocols import irc
727 class IRCClient(irc.IRCClient):
728 nickname = 'p2pool%02i' % (random.randrange(100),)
729 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
730 def lineReceived(self, line):
732 irc.IRCClient.lineReceived(self, line)
734 irc.IRCClient.signedOn(self)
735 self.factory.resetDelay()
736 self.join(self.channel)
737 self.watch_id = tracker.verified.added.watch(self._new_share)
738 self.announced_hashes = set()
739 self.delayed_messages = {}
740 def privmsg(self, user, channel, message):
741 if channel == self.channel and message in self.delayed_messages:
742 self.delayed_messages.pop(message).cancel()
743 def _new_share(self, share):
744 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
745 self.announced_hashes.add(share.header_hash)
746 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
747 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
748 def connectionLost(self, reason):
749 tracker.verified.added.unwatch(self.watch_id)
750 print 'IRC connection lost:', reason.getErrorMessage()
751 class IRCClientFactory(protocol.ReconnectingClientFactory):
753 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
755 @defer.inlineCallbacks
760 yield deferral.sleep(3)
762 if time.time() > current_work2.value['last_update'] + 60:
763 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
765 height = tracker.get_height(current_work.value['best_share_hash'])
766 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
768 len(tracker.verified.shares),
771 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
772 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
774 datums, dt = local_rate_monitor.get_datums_in_last()
775 my_att_s = sum(datum['work']/dt for datum in datums)
776 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
777 math.format(int(my_att_s)),
779 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
780 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
784 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
785 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
786 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
788 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
789 shares, stale_orphan_shares, stale_doa_shares,
790 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
791 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
792 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
794 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
795 math.format(int(real_att_s)),
797 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
800 if this_str != last_str or time.time() > last_time + 15:
803 last_time = time.time()
808 log.err(None, 'Fatal error:')
812 class FixedArgumentParser(argparse.ArgumentParser):
813 def _read_args_from_files(self, arg_strings):
814 # expand arguments referencing files
816 for arg_string in arg_strings:
818 # for regular arguments, just add them back into the list
819 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
820 new_arg_strings.append(arg_string)
822 # replace arguments referencing files with the file content
825 args_file = open(arg_string[1:])
828 for arg_line in args_file.read().splitlines():
829 for arg in self.convert_arg_line_to_args(arg_line):
830 arg_strings.append(arg)
831 arg_strings = self._read_args_from_files(arg_strings)
832 new_arg_strings.extend(arg_strings)
836 err = sys.exc_info()[1]
839 # return the modified argument list
840 return new_arg_strings
842 def convert_arg_line_to_args(self, arg_line):
843 return [arg for arg in arg_line.split() if arg.strip()]
846 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
848 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
849 parser.add_argument('--version', action='version', version=p2pool.__version__)
850 parser.add_argument('--net',
851 help='use specified network (default: bitcoin)',
852 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
853 parser.add_argument('--testnet',
854 help='''use the network's testnet''',
855 action='store_const', const=True, default=False, dest='testnet')
856 parser.add_argument('--debug',
857 help='enable debugging mode',
858 action='store_const', const=True, default=False, dest='debug')
859 parser.add_argument('-a', '--address',
860 help='generate payouts to this address (default: <address requested from bitcoind>)',
861 type=str, action='store', default=None, dest='address')
862 parser.add_argument('--datadir',
863 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
864 type=str, action='store', default=None, dest='datadir')
865 parser.add_argument('--logfile',
866 help='''log to this file (default: data/<NET>/log)''',
867 type=str, action='store', default=None, dest='logfile')
868 parser.add_argument('--merged',
869 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
870 type=str, action='append', default=[], dest='merged_urls')
871 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
872 help='donate this percentage of work to author of p2pool (default: 0.5)',
873 type=float, action='store', default=0.5, dest='donation_percentage')
874 parser.add_argument('--irc-announce',
875 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
876 action='store_true', default=False, dest='irc_announce')
878 p2pool_group = parser.add_argument_group('p2pool interface')
879 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
880 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
881 type=int, action='store', default=None, dest='p2pool_port')
882 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
883 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
884 type=str, action='append', default=[], dest='p2pool_nodes')
885 parser.add_argument('--disable-upnp',
886 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
887 action='store_false', default=True, dest='upnp')
889 worker_group = parser.add_argument_group('worker interface')
890 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
891 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
892 type=str, action='store', default=None, dest='worker_endpoint')
893 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
894 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
895 type=float, action='store', default=0, dest='worker_fee')
897 bitcoind_group = parser.add_argument_group('bitcoind interface')
898 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
899 help='connect to this address (default: 127.0.0.1)',
900 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
901 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
902 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
903 type=int, action='store', default=None, dest='bitcoind_rpc_port')
904 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
905 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
906 type=int, action='store', default=None, dest='bitcoind_p2p_port')
908 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
909 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
910 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
912 args = parser.parse_args()
917 net_name = args.net_name + ('_testnet' if args.testnet else '')
918 net = networks.nets[net_name]
920 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
921 if not os.path.exists(datadir_path):
922 os.makedirs(datadir_path)
924 if len(args.bitcoind_rpc_userpass) > 2:
925 parser.error('a maximum of two arguments are allowed')
926 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
928 if args.bitcoind_rpc_password is None:
929 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
930 parser.error('This network has no configuration file function. Manually enter your RPC password.')
931 conf_path = net.PARENT.CONF_FILE_FUNC()
932 if not os.path.exists(conf_path):
933 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
934 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
937 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
938 with open(conf_path, 'rb') as f:
939 cp = ConfigParser.RawConfigParser()
940 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
941 for conf_name, var_name, var_type in [
942 ('rpcuser', 'bitcoind_rpc_username', str),
943 ('rpcpassword', 'bitcoind_rpc_password', str),
944 ('rpcport', 'bitcoind_rpc_port', int),
945 ('port', 'bitcoind_p2p_port', int),
947 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
948 setattr(args, var_name, var_type(cp.get('x', conf_name)))
950 if args.bitcoind_rpc_username is None:
951 args.bitcoind_rpc_username = ''
953 if args.bitcoind_rpc_port is None:
954 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
956 if args.bitcoind_p2p_port is None:
957 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
959 if args.p2pool_port is None:
960 args.p2pool_port = net.P2P_PORT
962 if args.worker_endpoint is None:
963 worker_endpoint = '', net.WORKER_PORT
964 elif ':' not in args.worker_endpoint:
965 worker_endpoint = '', int(args.worker_endpoint)
967 addr, port = args.worker_endpoint.rsplit(':', 1)
968 worker_endpoint = addr, int(port)
970 if args.address is not None:
972 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
974 parser.error('error parsing address: ' + repr(e))
976 args.pubkey_hash = None
978 def separate_url(url):
979 s = urlparse.urlsplit(url)
980 if '@' not in s.netloc:
981 parser.error('merged url netloc must contain an "@"')
982 userpass, new_netloc = s.netloc.rsplit('@', 1)
983 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
984 merged_urls = map(separate_url, args.merged_urls)
986 if args.logfile is None:
987 args.logfile = os.path.join(datadir_path, 'log')
989 logfile = logging.LogFile(args.logfile)
990 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
991 sys.stdout = logging.AbortPipe(pipe)
992 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
993 if hasattr(signal, "SIGUSR1"):
994 def sigusr1(signum, frame):
995 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
997 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
998 signal.signal(signal.SIGUSR1, sigusr1)
999 task.LoopingCall(logfile.reopen).start(5)
1001 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)