1 from __future__ import division
15 from twisted.internet import defer, reactor, protocol, task
16 from twisted.web import server
17 from twisted.python import log
18 from nattraverso import portmapper, ipdiscover
20 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
21 from bitcoin import worker_interface
22 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
23 from . import p2p, networks, web
24 import p2pool, p2pool.data as p2pool_data
26 @deferral.retry('Error getting work from bitcoind:', 3)
27 @defer.inlineCallbacks
28 def getwork(bitcoind):
30 work = yield bitcoind.rpc_getmemorypool()
31 except jsonrpc.Error, e:
32 if e.code == -32601: # Method not found
33 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
34 raise deferral.RetrySilentlyException()
36 packed_transactions = [x.decode('hex') for x in work['transactions']]
37 defer.returnValue(dict(
38 version=work['version'],
39 previous_block_hash=int(work['previousblockhash'], 16),
40 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
41 merkle_branch=bitcoin_data.calculate_merkle_branch([0] + map(bitcoin_data.hash256, packed_transactions), 0),
42 subsidy=work['coinbasevalue'],
44 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
45 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
48 @defer.inlineCallbacks
49 def main(args, net, datadir_path, merged_urls, worker_endpoint):
51 print 'p2pool (version %s)' % (p2pool.__version__,)
54 # connect to bitcoind over JSON-RPC and do initial getmemorypool
55 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57 bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58 good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
60 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
62 temp_work = yield getwork(bitcoind)
64 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
67 # connect to bitcoind over bitcoin-p2p
68 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69 factory = bitcoin_p2p.ClientFactory(net.PARENT)
70 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71 yield factory.getProtocol() # waits until handshake is successful
75 print 'Determining payout address...'
76 if args.pubkey_hash is None:
77 address_path = os.path.join(datadir_path, 'cached_payout_address')
79 if os.path.exists(address_path):
80 with open(address_path, 'rb') as f:
81 address = f.read().strip('\r\n')
82 print ' Loaded cached address: %s...' % (address,)
86 if address is not None:
87 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
88 if not res['isvalid'] or not res['ismine']:
89 print ' Cached address is either invalid or not controlled by local bitcoind!'
93 print ' Getting payout address from bitcoind...'
94 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
96 with open(address_path, 'wb') as f:
99 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
101 my_pubkey_hash = args.pubkey_hash
102 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
105 my_share_hashes = set()
106 my_doa_share_hashes = set()
108 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
109 shared_share_hashes = set()
110 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
111 known_verified = set()
113 print "Loading shares..."
114 for i, (mode, contents) in enumerate(ss.get_shares()):
116 if contents.hash in tracker.shares:
118 shared_share_hashes.add(contents.hash)
119 contents.time_seen = 0
120 tracker.add(contents)
121 if len(tracker.shares) % 1000 == 0 and tracker.shares:
122 print " %i" % (len(tracker.shares),)
123 elif mode == 'verified_hash':
124 known_verified.add(contents)
126 raise AssertionError()
127 print " ...inserting %i verified shares..." % (len(known_verified),)
128 for h in known_verified:
129 if h not in tracker.shares:
130 ss.forget_verified_share(h)
132 tracker.verified.add(tracker.shares[h])
133 print " ...done loading %i shares!" % (len(tracker.shares),)
135 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
136 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
137 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
139 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
141 pre_current_work = variable.Variable(None)
142 pre_merged_work = variable.Variable({})
143 # information affecting work that should trigger a long-polling update
144 current_work = variable.Variable(None)
145 # information affecting work that should not trigger a long-polling update
146 current_work2 = variable.Variable(None)
148 requested = expiring_dict.ExpiringDict(300)
150 print 'Initializing work...'
151 @defer.inlineCallbacks
152 def set_real_work1():
153 work = yield getwork(bitcoind)
154 current_work2.set(dict(
156 transactions=work['transactions'],
157 merkle_branch=work['merkle_branch'],
158 subsidy=work['subsidy'],
159 clock_offset=time.time() - work['time'],
160 last_update=time.time(),
161 )) # second set first because everything hooks on the first
162 pre_current_work.set(dict(
163 version=work['version'],
164 previous_block=work['previous_block_hash'],
166 coinbaseflags=work['coinbaseflags'],
168 yield set_real_work1()
170 if '\ngetblock ' in (yield deferral.retry()(bitcoind.rpc_help)()):
171 height_cacher = deferral.DeferredCacher(defer.inlineCallbacks(lambda block_hash: defer.returnValue((lambda x: x['blockcount'] if 'blockcount' in x else x['height'])((yield bitcoind.rpc_getblock('%x' % (block_hash,)))))))
172 best_height_cached = variable.Variable((yield deferral.retry()(height_cacher)(pre_current_work.value['previous_block'])))
173 def get_height_rel_highest(block_hash):
174 this_height = height_cacher.call_now(block_hash, 0)
175 best_height = height_cacher.call_now(pre_current_work.value['previous_block'], 0)
176 best_height_cached.set(max(best_height_cached.value, this_height, best_height))
177 return this_height - best_height_cached.value
179 get_height_rel_highest = bitcoin_p2p.HeightTracker(bitcoind, factory, 5*net.SHARE_PERIOD*net.CHAIN_LENGTH/net.PARENT.BLOCK_PERIOD).get_height_rel_highest
181 def set_real_work2():
182 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
184 t = dict(pre_current_work.value)
185 t['best_share_hash'] = best
186 t['mm_chains'] = pre_merged_work.value
190 for peer2, share_hash in desired:
191 if share_hash not in tracker.tails: # was received in the time tracker.think was running
193 last_request_time, count = requested.get(share_hash, (None, 0))
194 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
196 potential_peers = set()
197 for head in tracker.tails[share_hash]:
198 potential_peers.update(peer_heads.get(head, set()))
199 potential_peers = [peer for peer in potential_peers if peer.connected2]
200 if count == 0 and peer2 is not None and peer2.connected2:
203 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
211 stops=list(set(tracker.heads) | set(
212 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215 requested[share_hash] = t, count + 1
216 pre_current_work.changed.watch(lambda _: set_real_work2())
217 pre_merged_work.changed.watch(lambda _: set_real_work2())
223 @defer.inlineCallbacks
224 def set_merged_work(merged_url, merged_userpass):
225 merged_proxy = jsonrpc.Proxy(merged_url, (merged_userpass,))
227 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
228 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
229 hash=int(auxblock['hash'], 16),
230 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
231 merged_proxy=merged_proxy,
233 yield deferral.sleep(1)
234 for merged_url, merged_userpass in merged_urls:
235 set_merged_work(merged_url, merged_userpass)
237 @pre_merged_work.changed.watch
238 def _(new_merged_work):
239 print 'Got new merged mining work!'
241 # setup p2p logic and join p2pool network
243 class Node(p2p.Node):
244 def handle_shares(self, shares, peer):
246 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250 if share.hash in tracker.shares:
251 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
256 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260 if shares and peer is not None:
261 peer_heads.setdefault(shares[0].hash, set()).add(peer)
267 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
269 def handle_share_hashes(self, hashes, peer):
272 for share_hash in hashes:
273 if share_hash in tracker.shares:
275 last_request_time, count = requested.get(share_hash, (None, 0))
276 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
278 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
279 get_hashes.append(share_hash)
280 requested[share_hash] = t, count + 1
282 if hashes and peer is not None:
283 peer_heads.setdefault(hashes[0], set()).add(peer)
285 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
287 def handle_get_shares(self, hashes, parents, stops, peer):
288 parents = min(parents, 1000//len(hashes))
291 for share_hash in hashes:
292 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
293 if share.hash in stops:
296 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
297 peer.sendShares(shares)
300 submit_block = deferral.retry('Error submitting primary block: (will retry)', 10, 10)(lambda block: bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex')))
302 @tracker.verified.added.watch
304 if share.pow_hash <= share.header['bits'].target:
305 submit_block(share.as_block(tracker))
307 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
309 recent_blocks.append(dict(ts=share.timestamp, hash='%064x' % (share.header_hash,)))
311 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
313 @defer.inlineCallbacks
316 ip, port = x.split(':')
317 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
319 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
322 if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
324 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
326 print >>sys.stderr, "error reading addrs"
327 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
330 if addr not in addrs:
331 addrs[addr] = (0, time.time(), time.time())
335 connect_addrs = set()
336 for addr_df in map(parse, args.p2pool_nodes):
338 connect_addrs.add((yield addr_df))
343 best_share_hash_func=lambda: current_work.value['best_share_hash'],
344 port=args.p2pool_port,
347 connect_addrs=connect_addrs,
351 task.LoopingCall(lambda: open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())).start(60)
353 # send share when the chain changes to their chain
354 def work_changed(new_work):
355 #print 'Work changed:', new_work
357 for share in tracker.get_chain(new_work['best_share_hash'], min(5, tracker.get_height(new_work['best_share_hash']))):
358 if share.hash in shared_share_hashes:
360 shared_share_hashes.add(share.hash)
363 for peer in p2p_node.peers.itervalues():
364 peer.sendShares([share for share in shares if share.peer is not peer])
366 current_work.changed.watch(work_changed)
369 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
371 if share.hash in tracker.verified.shares:
372 ss.add_verified_hash(share.hash)
373 task.LoopingCall(save_shares).start(60)
379 @defer.inlineCallbacks
383 is_lan, lan_ip = yield ipdiscover.get_local_ip()
385 pm = yield portmapper.get_port_mapper()
386 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
387 except defer.TimeoutError:
391 log.err(None, 'UPnP error:')
392 yield deferral.sleep(random.expovariate(1/120))
395 # start listening for workers with a JSON-RPC server
397 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
399 if os.path.exists(os.path.join(datadir_path, 'vip_pass')):
400 with open(os.path.join(datadir_path, 'vip_pass'), 'rb') as f:
401 vip_pass = f.read().strip('\r\n')
403 vip_pass = '%016x' % (random.randrange(2**64),)
404 with open(os.path.join(datadir_path, 'vip_pass'), 'wb') as f:
406 print ' Worker password:', vip_pass, '(only required for generating graphs)'
410 removed_unstales_var = variable.Variable((0, 0, 0))
411 removed_doa_unstales_var = variable.Variable(0)
412 @tracker.verified.removed.watch
414 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
416 removed_unstales_var.set((
417 removed_unstales_var.value[0] + 1,
418 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
419 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
421 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
422 removed_doa_unstales.set(removed_doa_unstales.value + 1)
424 def get_stale_counts():
425 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
426 my_shares = len(my_share_hashes)
427 my_doa_shares = len(my_doa_share_hashes)
428 delta = tracker.verified.get_delta(current_work.value['best_share_hash'])
429 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
430 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
431 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
432 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
434 my_shares_not_in_chain = my_shares - my_shares_in_chain
435 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
437 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
440 pseudoshare_received = variable.Event()
441 local_rate_monitor = math.RateMonitor(10*60)
443 class WorkerBridge(worker_interface.WorkerBridge):
445 worker_interface.WorkerBridge.__init__(self)
446 self.new_work_event = current_work.changed
447 self.recent_shares_ts_work = []
449 def preprocess_request(self, request):
450 user = request.getUser() if request.getUser() is not None else ''
452 desired_pseudoshare_target = None
454 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
456 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
460 desired_share_target = 2**256 - 1
462 user, min_diff_str = user.rsplit('/', 1)
464 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
468 if random.uniform(0, 100) < args.worker_fee:
469 pubkey_hash = my_pubkey_hash
472 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
474 pubkey_hash = my_pubkey_hash
476 return pubkey_hash, desired_share_target, desired_pseudoshare_target
478 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
479 if len(p2p_node.peers) == 0 and net.PERSIST:
480 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
481 if current_work.value['best_share_hash'] is None and net.PERSIST:
482 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
483 if time.time() > current_work2.value['last_update'] + 60:
484 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
486 if current_work.value['mm_chains']:
487 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
488 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
489 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
490 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
494 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
499 new = (tracker.shares[current_work.value['best_share_hash']].timestamp if current_work.value['best_share_hash'] is not None else time.time()) > net.SWITCH_TIME
502 share_info, generate_tx = p2pool_data.new_generate_transaction(
505 previous_share_hash=current_work.value['best_share_hash'],
506 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
507 nonce=random.randrange(2**32),
508 pubkey_hash=pubkey_hash,
509 subsidy=current_work2.value['subsidy'],
510 donation=math.perfect_round(65535*args.donation_percentage/100),
511 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
512 253 if orphans > orphans_recorded_in_chain else
513 254 if doas > doas_recorded_in_chain else
515 )(*get_stale_counts()),
517 block_target=current_work.value['bits'].target,
518 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
519 desired_target=desired_share_target,
523 share_info, generate_tx = p2pool_data.generate_transaction(
526 previous_share_hash=current_work.value['best_share_hash'],
527 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
528 nonce=struct.pack('<Q', random.randrange(2**64)),
529 new_script=bitcoin_data.pubkey_hash_to_script2(pubkey_hash),
530 subsidy=current_work2.value['subsidy'],
531 donation=math.perfect_round(65535*args.donation_percentage/100),
532 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
533 253 if orphans > orphans_recorded_in_chain else
534 254 if doas > doas_recorded_in_chain else
536 )(*get_stale_counts()),
538 block_target=current_work.value['bits'].target,
539 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
543 target = net.PARENT.SANE_MAX_TARGET
544 if desired_pseudoshare_target is None:
545 if len(self.recent_shares_ts_work) == 50:
546 hash_rate = sum(work for ts, work in self.recent_shares_ts_work)//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
547 target = min(target, 2**256//hash_rate)
549 target = min(target, desired_pseudoshare_target)
550 target = max(target, share_info['bits'].target)
551 for aux_work in current_work.value['mm_chains'].itervalues():
552 target = max(target, aux_work['target'])
554 transactions = [generate_tx] + list(current_work2.value['transactions'])
555 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
556 merkle_root = bitcoin_data.check_merkle_branch(bitcoin_data.hash256(packed_generate_tx), 0, current_work2.value['merkle_branch'])
558 getwork_time = time.time()
559 merkle_branch = current_work2.value['merkle_branch']
561 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
562 bitcoin_data.target_to_difficulty(target),
563 bitcoin_data.target_to_difficulty(share_info['bits'].target),
564 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
565 len(current_work2.value['transactions']),
568 ba = bitcoin_getwork.BlockAttempt(
569 version=current_work.value['version'],
570 previous_block=current_work.value['previous_block'],
571 merkle_root=merkle_root,
572 timestamp=current_work2.value['time'],
573 bits=current_work.value['bits'],
577 received_header_hashes = set()
579 def got_response(header, request):
580 assert header['merkle_root'] == merkle_root
582 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
583 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
584 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
587 if pow_hash <= header['bits'].target or p2pool.DEBUG:
588 submit_block(dict(header=header, txs=transactions))
589 if pow_hash <= header['bits'].target:
591 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
593 recent_blocks.append(dict(ts=time.time(), hash='%064x' % (header_hash,)))
595 log.err(None, 'Error while processing potential block:')
597 for aux_work, index, hashes in mm_later:
599 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
600 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
601 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
602 bitcoin_data.aux_pow_type.pack(dict(
605 block_hash=header_hash,
606 merkle_branch=merkle_branch,
609 merkle_branch=bitcoin_data.calculate_merkle_branch(hashes, index),
611 parent_block_header=header,
616 if result != (pow_hash <= aux_work['target']):
617 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
619 print 'Merged block submittal result: %s' % (result,)
622 log.err(err, 'Error submitting merged block:')
624 log.err(None, 'Error while processing merged mining POW:')
626 if pow_hash <= share_info['bits'].target:
628 min_header = dict(header);del min_header['merkle_root']
629 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], p2pool_data.gentx_before_refhash)
630 share = p2pool_data.NewShare(net, None, min_header, share_info, hash_link=hash_link, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
632 share = p2pool_data.Share(net, None, header, share_info, merkle_branch=merkle_branch, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
633 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
635 p2pool_data.format_hash(share.hash),
636 p2pool_data.format_hash(share.previous_hash),
637 time.time() - getwork_time,
638 ' DEAD ON ARRIVAL' if not on_time else '',
640 my_share_hashes.add(share.hash)
642 my_doa_share_hashes.add(share.hash)
646 tracker.verified.add(share)
650 if pow_hash <= header['bits'].target or p2pool.DEBUG:
651 for peer in p2p_node.peers.itervalues():
652 peer.sendShares([share])
653 shared_share_hashes.add(share.hash)
655 log.err(None, 'Error forwarding block solution:')
657 if pow_hash > target:
658 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
659 print ' Hash: %56x' % (pow_hash,)
660 print ' Target: %56x' % (target,)
661 elif header_hash in received_header_hashes:
662 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
664 received_header_hashes.add(header_hash)
666 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, request.getUser() if request.getPassword() == vip_pass else None)
667 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
668 while len(self.recent_shares_ts_work) > 50:
669 self.recent_shares_ts_work.pop(0)
670 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=request.getUser()))
674 return ba, got_response
676 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
678 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, recent_blocks, pseudoshare_received)
679 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
681 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
683 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
690 @defer.inlineCallbacks
693 flag = factory.new_block.get_deferred()
695 yield set_real_work1()
698 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
703 print 'Started successfully!'
707 if hasattr(signal, 'SIGALRM'):
708 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
709 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
711 signal.siginterrupt(signal.SIGALRM, False)
712 task.LoopingCall(signal.alarm, 30).start(1)
714 if args.irc_announce:
715 from twisted.words.protocols import irc
716 class IRCClient(irc.IRCClient):
717 nickname = 'p2pool%02i' % (random.randrange(100),)
718 channel = '#p2pool' if net.NAME == 'bitcoin' else '#p2pool-alt'
719 def lineReceived(self, line):
721 irc.IRCClient.lineReceived(self, line)
723 irc.IRCClient.signedOn(self)
724 self.factory.resetDelay()
725 self.join(self.channel)
726 self.watch_id = tracker.verified.added.watch(self._new_share)
727 self.announced_hashes = set()
728 self.delayed_messages = {}
729 def privmsg(self, user, channel, message):
730 if channel == self.channel and message in self.delayed_messages:
731 self.delayed_messages.pop(message).cancel()
732 def _new_share(self, share):
733 if share.pow_hash <= share.header['bits'].target and share.header_hash not in self.announced_hashes and abs(share.timestamp - time.time()) < 10*60:
734 self.announced_hashes.add(share.header_hash)
735 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
736 self.delayed_messages[message] = reactor.callLater(random.expovariate(1/5), lambda: (self.say(self.channel, message), self.delayed_messages.pop(message)))
737 def connectionLost(self, reason):
738 tracker.verified.added.unwatch(self.watch_id)
739 print 'IRC connection lost:', reason.getErrorMessage()
740 class IRCClientFactory(protocol.ReconnectingClientFactory):
742 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
744 @defer.inlineCallbacks
749 yield deferral.sleep(3)
751 if time.time() > current_work2.value['last_update'] + 60:
752 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
754 height = tracker.get_height(current_work.value['best_share_hash'])
755 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
757 len(tracker.verified.shares),
760 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
761 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
763 datums, dt = local_rate_monitor.get_datums_in_last()
764 my_att_s = sum(datum['work']/dt for datum in datums)
765 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
766 math.format(int(my_att_s)),
768 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
769 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
773 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
774 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
775 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720)) / (1 - stale_prop)
777 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
778 shares, stale_orphan_shares, stale_doa_shares,
779 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
780 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
781 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
783 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
784 math.format(int(real_att_s)),
786 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
789 if this_str != last_str or time.time() > last_time + 15:
792 last_time = time.time()
797 log.err(None, 'Fatal error:')
801 class FixedArgumentParser(argparse.ArgumentParser):
802 def _read_args_from_files(self, arg_strings):
803 # expand arguments referencing files
805 for arg_string in arg_strings:
807 # for regular arguments, just add them back into the list
808 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
809 new_arg_strings.append(arg_string)
811 # replace arguments referencing files with the file content
814 args_file = open(arg_string[1:])
817 for arg_line in args_file.read().splitlines():
818 for arg in self.convert_arg_line_to_args(arg_line):
819 arg_strings.append(arg)
820 arg_strings = self._read_args_from_files(arg_strings)
821 new_arg_strings.extend(arg_strings)
825 err = sys.exc_info()[1]
828 # return the modified argument list
829 return new_arg_strings
831 def convert_arg_line_to_args(self, arg_line):
832 return [arg for arg in arg_line.split() if arg.strip()]
835 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
837 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
838 parser.add_argument('--version', action='version', version=p2pool.__version__)
839 parser.add_argument('--net',
840 help='use specified network (default: bitcoin)',
841 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
842 parser.add_argument('--testnet',
843 help='''use the network's testnet''',
844 action='store_const', const=True, default=False, dest='testnet')
845 parser.add_argument('--debug',
846 help='enable debugging mode',
847 action='store_const', const=True, default=False, dest='debug')
848 parser.add_argument('-a', '--address',
849 help='generate payouts to this address (default: <address requested from bitcoind>)',
850 type=str, action='store', default=None, dest='address')
851 parser.add_argument('--datadir',
852 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
853 type=str, action='store', default=None, dest='datadir')
854 parser.add_argument('--logfile',
855 help='''log to this file (default: data/<NET>/log)''',
856 type=str, action='store', default=None, dest='logfile')
857 parser.add_argument('--merged',
858 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
859 type=str, action='append', default=[], dest='merged_urls')
860 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
861 help='donate this percentage of work to author of p2pool (default: 0.5)',
862 type=float, action='store', default=0.5, dest='donation_percentage')
863 parser.add_argument('--irc-announce',
864 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
865 action='store_true', default=False, dest='irc_announce')
867 p2pool_group = parser.add_argument_group('p2pool interface')
868 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
869 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
870 type=int, action='store', default=None, dest='p2pool_port')
871 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
872 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
873 type=str, action='append', default=[], dest='p2pool_nodes')
874 parser.add_argument('--disable-upnp',
875 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
876 action='store_false', default=True, dest='upnp')
878 worker_group = parser.add_argument_group('worker interface')
879 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
880 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
881 type=str, action='store', default=None, dest='worker_endpoint')
882 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
883 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
884 type=float, action='store', default=0, dest='worker_fee')
886 bitcoind_group = parser.add_argument_group('bitcoind interface')
887 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
888 help='connect to this address (default: 127.0.0.1)',
889 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
890 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
891 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
892 type=int, action='store', default=None, dest='bitcoind_rpc_port')
893 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
894 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
895 type=int, action='store', default=None, dest='bitcoind_p2p_port')
897 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
898 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
899 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
901 args = parser.parse_args()
906 net_name = args.net_name + ('_testnet' if args.testnet else '')
907 net = networks.nets[net_name]
909 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
910 if not os.path.exists(datadir_path):
911 os.makedirs(datadir_path)
913 if len(args.bitcoind_rpc_userpass) > 2:
914 parser.error('a maximum of two arguments are allowed')
915 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
917 if args.bitcoind_rpc_password is None:
918 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
919 parser.error('This network has no configuration file function. Manually enter your RPC password.')
920 conf_path = net.PARENT.CONF_FILE_FUNC()
921 if not os.path.exists(conf_path):
922 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
923 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
926 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
927 with open(conf_path, 'rb') as f:
928 cp = ConfigParser.RawConfigParser()
929 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
930 for conf_name, var_name, var_type in [
931 ('rpcuser', 'bitcoind_rpc_username', str),
932 ('rpcpassword', 'bitcoind_rpc_password', str),
933 ('rpcport', 'bitcoind_rpc_port', int),
934 ('port', 'bitcoind_p2p_port', int),
936 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
937 setattr(args, var_name, var_type(cp.get('x', conf_name)))
938 if args.bitcoind_rpc_password is None:
939 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
941 if args.bitcoind_rpc_username is None:
942 args.bitcoind_rpc_username = ''
944 if args.bitcoind_rpc_port is None:
945 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
947 if args.bitcoind_p2p_port is None:
948 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
950 if args.p2pool_port is None:
951 args.p2pool_port = net.P2P_PORT
953 if args.worker_endpoint is None:
954 worker_endpoint = '', net.WORKER_PORT
955 elif ':' not in args.worker_endpoint:
956 worker_endpoint = '', int(args.worker_endpoint)
958 addr, port = args.worker_endpoint.rsplit(':', 1)
959 worker_endpoint = addr, int(port)
961 if args.address is not None:
963 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
965 parser.error('error parsing address: ' + repr(e))
967 args.pubkey_hash = None
969 def separate_url(url):
970 s = urlparse.urlsplit(url)
971 if '@' not in s.netloc:
972 parser.error('merged url netloc must contain an "@"')
973 userpass, new_netloc = s.netloc.rsplit('@', 1)
974 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
975 merged_urls = map(separate_url, args.merged_urls)
977 if args.logfile is None:
978 args.logfile = os.path.join(datadir_path, 'log')
980 logfile = logging.LogFile(args.logfile)
981 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
982 sys.stdout = logging.AbortPipe(pipe)
983 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
984 if hasattr(signal, "SIGUSR1"):
985 def sigusr1(signum, frame):
986 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
988 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
989 signal.signal(signal.SIGUSR1, sigusr1)
990 task.LoopingCall(logfile.reopen).start(5)
992 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)