1 from __future__ import division
16 if '--iocp' in sys.argv:
17 from twisted.internet import iocpreactor
19 from twisted.internet import defer, reactor, protocol, task
20 from twisted.web import server
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface, height_tracker
26 from util import expiring_dict, jsonrpc, variable, deferral, math, logging, pack
27 from . import p2p, networks, web
28 import p2pool, p2pool.data as p2pool_data
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
34 work = yield bitcoind.rpc_getmemorypool()
35 except jsonrpc.Error, e:
36 if e.code == -32601: # Method not found
37 print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
38 raise deferral.RetrySilentlyException()
40 packed_transactions = [x.decode('hex') for x in work['transactions']]
41 defer.returnValue(dict(
42 version=work['version'],
43 previous_block_hash=int(work['previousblockhash'], 16),
44 transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45 merkle_link=bitcoin_data.calculate_merkle_link([0] + map(bitcoin_data.hash256, packed_transactions), 0), # using 0 is a bit of a hack, but will always work when index=0
46 subsidy=work['coinbasevalue'],
48 bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49 coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
52 @defer.inlineCallbacks
53 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55 print 'p2pool (version %s)' % (p2pool.__version__,)
58 # connect to bitcoind over JSON-RPC and do initial getmemorypool
59 url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
60 print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
61 bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
62 @deferral.retry('Error while checking Bitcoin connection:', 1)
63 @defer.inlineCallbacks
65 if not (yield net.PARENT.RPC_CHECK)(bitcoind):
66 print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
67 raise deferral.RetrySilentlyException()
68 temp_work = yield getwork(bitcoind)
69 if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
70 print >>sys.stderr, ' Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
71 raise deferral.RetrySilentlyException()
72 defer.returnValue(temp_work)
73 temp_work = yield check()
75 print ' Current block hash: %x' % (temp_work['previous_block_hash'],)
78 # connect to bitcoind over bitcoin-p2p
79 print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80 factory = bitcoin_p2p.ClientFactory(net.PARENT)
81 reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82 yield factory.getProtocol() # waits until handshake is successful
86 print 'Determining payout address...'
87 if args.pubkey_hash is None:
88 address_path = os.path.join(datadir_path, 'cached_payout_address')
90 if os.path.exists(address_path):
91 with open(address_path, 'rb') as f:
92 address = f.read().strip('\r\n')
93 print ' Loaded cached address: %s...' % (address,)
97 if address is not None:
98 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
99 if not res['isvalid'] or not res['ismine']:
100 print ' Cached address is either invalid or not controlled by local bitcoind!'
104 print ' Getting payout address from bitcoind...'
105 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
107 with open(address_path, 'wb') as f:
110 my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
112 my_pubkey_hash = args.pubkey_hash
113 print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
116 my_share_hashes = set()
117 my_doa_share_hashes = set()
119 tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
120 shared_share_hashes = set()
121 ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
122 known_verified = set()
123 print "Loading shares..."
124 for i, (mode, contents) in enumerate(ss.get_shares()):
126 if contents.hash in tracker.shares:
128 shared_share_hashes.add(contents.hash)
129 contents.time_seen = 0
130 tracker.add(contents)
131 if len(tracker.shares) % 1000 == 0 and tracker.shares:
132 print " %i" % (len(tracker.shares),)
133 elif mode == 'verified_hash':
134 known_verified.add(contents)
136 raise AssertionError()
137 print " ...inserting %i verified shares..." % (len(known_verified),)
138 for h in known_verified:
139 if h not in tracker.shares:
140 ss.forget_verified_share(h)
142 tracker.verified.add(tracker.shares[h])
143 print " ...done loading %i shares!" % (len(tracker.shares),)
145 tracker.removed.watch(lambda share: ss.forget_share(share.hash))
146 tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
147 tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
149 peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
151 pre_current_work = variable.Variable(None)
152 pre_merged_work = variable.Variable({})
153 # information affecting work that should trigger a long-polling update
154 current_work = variable.Variable(None)
155 # information affecting work that should not trigger a long-polling update
156 current_work2 = variable.Variable(None)
158 requested = expiring_dict.ExpiringDict(300)
160 print 'Initializing work...'
161 @defer.inlineCallbacks
162 def set_real_work1():
163 work = yield getwork(bitcoind)
164 current_work2.set(dict(
166 transactions=work['transactions'],
167 merkle_link=work['merkle_link'],
168 subsidy=work['subsidy'],
169 clock_offset=time.time() - work['time'],
170 last_update=time.time(),
171 )) # second set first because everything hooks on the first
172 pre_current_work.set(dict(
173 version=work['version'],
174 previous_block=work['previous_block_hash'],
176 coinbaseflags=work['coinbaseflags'],
178 yield set_real_work1()
180 get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: pre_current_work.value['previous_block'], net)
182 def set_real_work2():
183 best, desired = tracker.think(get_height_rel_highest, pre_current_work.value['previous_block'], pre_current_work.value['bits'])
185 t = dict(pre_current_work.value)
186 t['best_share_hash'] = best
187 t['mm_chains'] = pre_merged_work.value
191 for peer2, share_hash in desired:
192 if share_hash not in tracker.tails: # was received in the time tracker.think was running
194 last_request_time, count = requested.get(share_hash, (None, 0))
195 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
197 potential_peers = set()
198 for head in tracker.tails[share_hash]:
199 potential_peers.update(peer_heads.get(head, set()))
200 potential_peers = [peer for peer in potential_peers if peer.connected2]
201 if count == 0 and peer2 is not None and peer2.connected2:
204 peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
208 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
212 stops=list(set(tracker.heads) | set(
213 tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
216 requested[share_hash] = t, count + 1
217 pre_current_work.changed.watch(lambda _: set_real_work2())
218 pre_merged_work.changed.watch(lambda _: set_real_work2())
224 @defer.inlineCallbacks
225 def set_merged_work(merged_url, merged_userpass):
226 merged_proxy = jsonrpc.Proxy(merged_url, dict(Authorization='Basic ' + base64.b64encode(merged_userpass)))
228 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
229 pre_merged_work.set(dict(pre_merged_work.value, **{auxblock['chainid']: dict(
230 hash=int(auxblock['hash'], 16),
231 target=pack.IntType(256).unpack(auxblock['target'].decode('hex')),
232 merged_proxy=merged_proxy,
234 yield deferral.sleep(1)
235 for merged_url, merged_userpass in merged_urls:
236 set_merged_work(merged_url, merged_userpass)
238 @pre_merged_work.changed.watch
239 def _(new_merged_work):
240 print 'Got new merged mining work!'
242 # setup p2p logic and join p2pool network
244 class Node(p2p.Node):
245 def handle_shares(self, shares, peer):
247 print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
251 if share.hash in tracker.shares:
252 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
261 if shares and peer is not None:
262 peer_heads.setdefault(shares[0].hash, set()).add(peer)
268 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
270 def handle_share_hashes(self, hashes, peer):
273 for share_hash in hashes:
274 if share_hash in tracker.shares:
276 last_request_time, count = requested.get(share_hash, (None, 0))
277 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
279 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
280 get_hashes.append(share_hash)
281 requested[share_hash] = t, count + 1
283 if hashes and peer is not None:
284 peer_heads.setdefault(hashes[0], set()).add(peer)
286 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
288 def handle_get_shares(self, hashes, parents, stops, peer):
289 parents = min(parents, 1000//len(hashes))
292 for share_hash in hashes:
293 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
294 if share.hash in stops:
297 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
300 @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
301 def submit_block_p2p(block):
302 if factory.conn.value is None:
303 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
304 raise deferral.RetrySilentlyException()
305 factory.conn.value.send_block(block=block)
307 @deferral.retry('Error submitting block: (will retry)', 10, 10)
308 @defer.inlineCallbacks
309 def submit_block_rpc(block, ignore_failure):
310 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311 success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
312 if (not success and success_expected and not ignore_failure) or (success and not success_expected):
313 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
315 def submit_block(block, ignore_failure):
316 submit_block_p2p(block)
317 submit_block_rpc(block, ignore_failure)
319 @tracker.verified.added.watch
321 if share.pow_hash <= share.header['bits'].target:
322 submit_block(share.as_block(tracker), ignore_failure=True)
324 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
327 if (get_height_rel_highest(share.header['previous_block']) > -5 or
328 current_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
329 broadcast_share(share.hash)
331 reactor.callLater(5, spread) # so get_height_rel_highest can update
333 print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335 @defer.inlineCallbacks
338 ip, port = x.split(':')
339 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
344 if os.path.exists(os.path.join(datadir_path, 'addrs')):
346 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347 addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349 print >>sys.stderr, 'error parsing addrs'
350 elif os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
352 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
354 print >>sys.stderr, "error reading addrs.txt"
355 for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
358 if addr not in addrs:
359 addrs[addr] = (0, time.time(), time.time())
363 connect_addrs = set()
364 for addr_df in map(parse, args.p2pool_nodes):
366 connect_addrs.add((yield addr_df))
371 best_share_hash_func=lambda: current_work.value['best_share_hash'],
372 port=args.p2pool_port,
375 connect_addrs=connect_addrs,
376 max_incoming_conns=args.p2pool_conns,
381 with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
382 f.write(json.dumps(p2p_node.addr_store.items()))
383 task.LoopingCall(save_addrs).start(60)
385 def broadcast_share(share_hash):
387 for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388 if share.hash in shared_share_hashes:
390 shared_share_hashes.add(share.hash)
393 for peer in p2p_node.peers.itervalues():
394 peer.sendShares([share for share in shares if share.peer is not peer])
396 # send share when the chain changes to their chain
397 current_work.changed.watch(lambda new_work: broadcast_share(new_work['best_share_hash']))
400 for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
402 if share.hash in tracker.verified.shares:
403 ss.add_verified_hash(share.hash)
404 task.LoopingCall(save_shares).start(60)
410 @defer.inlineCallbacks
414 is_lan, lan_ip = yield ipdiscover.get_local_ip()
416 pm = yield portmapper.get_port_mapper()
417 yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
418 except defer.TimeoutError:
422 log.err(None, 'UPnP error:')
423 yield deferral.sleep(random.expovariate(1/120))
426 # start listening for workers with a JSON-RPC server
428 print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
432 removed_unstales_var = variable.Variable((0, 0, 0))
433 removed_doa_unstales_var = variable.Variable(0)
434 @tracker.verified.removed.watch
436 if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
437 assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance
438 removed_unstales_var.set((
439 removed_unstales_var.value[0] + 1,
440 removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0),
441 removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0),
443 if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
444 removed_doa_unstales_var.set(removed_doa_unstales_var.value + 1)
446 def get_stale_counts():
447 '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
448 my_shares = len(my_share_hashes)
449 my_doa_shares = len(my_doa_share_hashes)
450 delta = tracker.verified.get_delta_to_last(current_work.value['best_share_hash'])
451 my_shares_in_chain = delta.my_count + removed_unstales_var.value[0]
452 my_doa_shares_in_chain = delta.my_doa_count + removed_doa_unstales_var.value
453 orphans_recorded_in_chain = delta.my_orphan_announce_count + removed_unstales_var.value[1]
454 doas_recorded_in_chain = delta.my_dead_announce_count + removed_unstales_var.value[2]
456 my_shares_not_in_chain = my_shares - my_shares_in_chain
457 my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
459 return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
462 pseudoshare_received = variable.Event()
463 share_received = variable.Event()
464 local_rate_monitor = math.RateMonitor(10*60)
466 class WorkerBridge(worker_interface.WorkerBridge):
468 worker_interface.WorkerBridge.__init__(self)
469 self.new_work_event = current_work.changed
470 self.recent_shares_ts_work = []
472 def get_user_details(self, request):
473 user = request.getUser() if request.getUser() is not None else ''
475 desired_pseudoshare_target = None
477 user, desired_pseudoshare_difficulty_str = user.rsplit('+', 1)
479 desired_pseudoshare_target = bitcoin_data.difficulty_to_target(float(desired_pseudoshare_difficulty_str))
483 desired_share_target = 2**256 - 1
485 user, min_diff_str = user.rsplit('/', 1)
487 desired_share_target = bitcoin_data.difficulty_to_target(float(min_diff_str))
491 if random.uniform(0, 100) < args.worker_fee:
492 pubkey_hash = my_pubkey_hash
495 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
497 pubkey_hash = my_pubkey_hash
499 return user, pubkey_hash, desired_share_target, desired_pseudoshare_target
501 def preprocess_request(self, request):
502 user, pubkey_hash, desired_share_target, desired_pseudoshare_target = self.get_user_details(request)
503 return pubkey_hash, desired_share_target, desired_pseudoshare_target
505 def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target):
506 if len(p2p_node.peers) == 0 and net.PERSIST:
507 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
508 if current_work.value['best_share_hash'] is None and net.PERSIST:
509 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
510 if time.time() > current_work2.value['last_update'] + 60:
511 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
513 if current_work.value['mm_chains']:
514 tree, size = bitcoin_data.make_auxpow_tree(current_work.value['mm_chains'])
515 mm_hashes = [current_work.value['mm_chains'].get(tree.get(i), dict(hash=0))['hash'] for i in xrange(size)]
516 mm_data = '\xfa\xbemm' + bitcoin_data.aux_pow_coinbase_type.pack(dict(
517 merkle_root=bitcoin_data.merkle_hash(mm_hashes),
521 mm_later = [(aux_work, mm_hashes.index(aux_work['hash']), mm_hashes) for chain_id, aux_work in current_work.value['mm_chains'].iteritems()]
526 share_type = p2pool_data.NewShare
527 if current_work.value['best_share_hash'] is not None:
528 previous_share = tracker.shares[current_work.value['best_share_hash']]
529 if isinstance(previous_share, p2pool_data.Share):
530 # Share -> NewShare only valid if 85% of hashes in [net.CHAIN_LENGTH*9//10, net.CHAIN_LENGTH] for new version
531 if tracker.get_height(previous_share.hash) < net.CHAIN_LENGTH:
532 share_type = p2pool_data.Share
534 counts = p2pool_data.get_desired_version_counts(tracker,
535 tracker.get_nth_parent_hash(previous_share.hash, net.CHAIN_LENGTH*9//10), net.CHAIN_LENGTH//10)
536 if counts.get(2, 0) < sum(counts.itervalues())*95//100:
537 share_type = p2pool_data.Share
540 share_info, generate_tx = share_type.generate_transaction(
543 previous_share_hash=current_work.value['best_share_hash'],
544 coinbase=(mm_data + current_work.value['coinbaseflags'])[:100],
545 nonce=random.randrange(2**32),
546 pubkey_hash=pubkey_hash,
547 subsidy=current_work2.value['subsidy'],
548 donation=math.perfect_round(65535*args.donation_percentage/100),
549 stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
550 'orphan' if orphans > orphans_recorded_in_chain else
551 'doa' if doas > doas_recorded_in_chain else
553 )(*get_stale_counts()),
556 block_target=current_work.value['bits'].target,
557 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
558 desired_target=desired_share_target,
559 ref_merkle_link=dict(branch=[], index=0),
563 if desired_pseudoshare_target is None:
565 if len(self.recent_shares_ts_work) == 50:
566 hash_rate = sum(work for ts, work in self.recent_shares_ts_work[1:])//(self.recent_shares_ts_work[-1][0] - self.recent_shares_ts_work[0][0])
568 target = min(target, int(2**256/hash_rate))
570 target = desired_pseudoshare_target
571 target = max(target, share_info['bits'].target)
572 for aux_work in current_work.value['mm_chains'].itervalues():
573 target = max(target, aux_work['target'])
574 target = math.clip(target, net.PARENT.SANE_TARGET_RANGE)
576 transactions = [generate_tx] + list(current_work2.value['transactions'])
577 packed_generate_tx = bitcoin_data.tx_type.pack(generate_tx)
578 merkle_root = bitcoin_data.check_merkle_link(bitcoin_data.hash256(packed_generate_tx), current_work2.value['merkle_link'])
580 getwork_time = time.time()
581 merkle_link = current_work2.value['merkle_link']
583 print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % (
584 bitcoin_data.target_to_difficulty(target),
585 bitcoin_data.target_to_difficulty(share_info['bits'].target),
586 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
587 len(current_work2.value['transactions']),
590 bits = current_work.value['bits']
591 previous_block = current_work.value['previous_block']
592 ba = bitcoin_getwork.BlockAttempt(
593 version=current_work.value['version'],
594 previous_block=current_work.value['previous_block'],
595 merkle_root=merkle_root,
596 timestamp=current_work2.value['time'],
597 bits=current_work.value['bits'],
601 received_header_hashes = set()
603 def got_response(header, request):
604 header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header))
605 pow_hash = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header))
607 if pow_hash <= header['bits'].target or p2pool.DEBUG:
608 submit_block(dict(header=header, txs=transactions), ignore_failure=False)
609 if pow_hash <= header['bits'].target:
611 print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash)
614 log.err(None, 'Error while processing potential block:')
616 user, _, _, _ = self.get_user_details(request)
617 assert header['merkle_root'] == merkle_root
618 assert header['previous_block'] == previous_block
619 assert header['bits'] == bits
621 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
623 for aux_work, index, hashes in mm_later:
625 if pow_hash <= aux_work['target'] or p2pool.DEBUG:
626 df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(aux_work['merged_proxy'].rpc_getauxblock)(
627 pack.IntType(256, 'big').pack(aux_work['hash']).encode('hex'),
628 bitcoin_data.aux_pow_type.pack(dict(
631 block_hash=header_hash,
632 merkle_link=merkle_link,
634 merkle_link=bitcoin_data.calculate_merkle_link(hashes, index),
635 parent_block_header=header,
640 if result != (pow_hash <= aux_work['target']):
641 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
643 print 'Merged block submittal result: %s' % (result,)
646 log.err(err, 'Error submitting merged block:')
648 log.err(None, 'Error while processing merged mining POW:')
650 if pow_hash <= share_info['bits'].target and header_hash not in received_header_hashes:
651 min_header = dict(header);del min_header['merkle_root']
652 hash_link = p2pool_data.prefix_to_hash_link(packed_generate_tx[:-32-4], share_type.gentx_before_refhash)
653 share = share_type(net, None, dict(
654 min_header=min_header, share_info=share_info, hash_link=hash_link,
655 ref_merkle_link=dict(branch=[], index=0),
656 ), merkle_link=merkle_link, other_txs=transactions[1:] if pow_hash <= header['bits'].target else None)
658 print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
660 p2pool_data.format_hash(share.hash),
661 p2pool_data.format_hash(share.previous_hash),
662 time.time() - getwork_time,
663 ' DEAD ON ARRIVAL' if not on_time else '',
665 my_share_hashes.add(share.hash)
667 my_doa_share_hashes.add(share.hash)
671 tracker.verified.add(share)
675 if pow_hash <= header['bits'].target or p2pool.DEBUG:
676 for peer in p2p_node.peers.itervalues():
677 peer.sendShares([share])
678 shared_share_hashes.add(share.hash)
680 log.err(None, 'Error forwarding block solution:')
682 share_received.happened(bitcoin_data.target_to_average_attempts(share.target), not on_time)
684 if pow_hash > target:
685 print 'Worker %s submitted share with hash > target:' % (request.getUser(),)
686 print ' Hash: %56x' % (pow_hash,)
687 print ' Target: %56x' % (target,)
688 elif header_hash in received_header_hashes:
689 print >>sys.stderr, 'Worker %s @ %s submitted share more than once!' % (request.getUser(), request.getClientIP())
691 received_header_hashes.add(header_hash)
693 pseudoshare_received.happened(bitcoin_data.target_to_average_attempts(target), not on_time, user)
694 self.recent_shares_ts_work.append((time.time(), bitcoin_data.target_to_average_attempts(target)))
695 while len(self.recent_shares_ts_work) > 50:
696 self.recent_shares_ts_work.pop(0)
697 local_rate_monitor.add_datum(dict(work=bitcoin_data.target_to_average_attempts(target), dead=not on_time, user=user))
701 return ba, got_response
703 get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, current_work.value['best_share_hash'], current_work.value['bits'].target, current_work2.value['subsidy'], net)
705 web_root = web.get_web_root(tracker, current_work, current_work2, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, args.worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received)
706 worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
708 deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
710 with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
717 @defer.inlineCallbacks
720 flag = factory.new_block.get_deferred()
722 yield set_real_work1()
725 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
730 print 'Started successfully!'
731 print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
732 if args.donation_percentage > 0.51:
733 print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
734 elif args.donation_percentage < 0.49:
735 print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
737 print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
738 print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
742 if hasattr(signal, 'SIGALRM'):
743 signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
744 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
746 signal.siginterrupt(signal.SIGALRM, False)
747 task.LoopingCall(signal.alarm, 30).start(1)
749 if args.irc_announce:
750 from twisted.words.protocols import irc
751 class IRCClient(irc.IRCClient):
752 nickname = 'p2pool%02i' % (random.randrange(100),)
753 channel = net.ANNOUNCE_CHANNEL
754 def lineReceived(self, line):
757 irc.IRCClient.lineReceived(self, line)
759 irc.IRCClient.signedOn(self)
760 self.factory.resetDelay()
761 self.join(self.channel)
762 @defer.inlineCallbacks
763 def new_share(share):
764 if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
765 yield deferral.sleep(random.expovariate(1/60))
766 message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
767 if message not in self.recent_messages:
768 self.say(self.channel, message)
769 self._remember_message(message)
770 self.watch_id = tracker.verified.added.watch(new_share)
771 self.recent_messages = []
772 def _remember_message(self, message):
773 self.recent_messages.append(message)
774 while len(self.recent_messages) > 100:
775 self.recent_messages.pop(0)
776 def privmsg(self, user, channel, message):
777 if channel == self.channel:
778 self._remember_message(message)
779 def connectionLost(self, reason):
780 tracker.verified.added.unwatch(self.watch_id)
781 print 'IRC connection lost:', reason.getErrorMessage()
782 class IRCClientFactory(protocol.ReconnectingClientFactory):
784 reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
786 @defer.inlineCallbacks
791 yield deferral.sleep(3)
793 if time.time() > current_work2.value['last_update'] + 60:
794 print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - current_work2.value['last_update']),)
796 height = tracker.get_height(current_work.value['best_share_hash'])
797 this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
799 len(tracker.verified.shares),
802 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
803 ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
805 datums, dt = local_rate_monitor.get_datums_in_last()
806 my_att_s = sum(datum['work']/dt for datum in datums)
807 this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
808 math.format(int(my_att_s)),
810 math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
811 math.format_dt(2**256 / tracker.shares[current_work.value['best_share_hash']].max_target / my_att_s) if my_att_s and current_work.value['best_share_hash'] else '???',
815 (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
816 stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(60*60//net.SHARE_PERIOD, height))
817 real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
819 this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
820 shares, stale_orphan_shares, stale_doa_shares,
821 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
822 math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
823 get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
825 this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
826 math.format(int(real_att_s)),
828 math.format_dt(2**256 / current_work.value['bits'].target / real_att_s),
831 for warning in p2pool_data.get_warnings(tracker, current_work, net):
832 print >>sys.stderr, '#'*40
833 print >>sys.stderr, '>>> Warning: ' + warning
834 print >>sys.stderr, '#'*40
836 if this_str != last_str or time.time() > last_time + 15:
839 last_time = time.time()
845 log.err(None, 'Fatal error:')
848 class FixedArgumentParser(argparse.ArgumentParser):
849 def _read_args_from_files(self, arg_strings):
850 # expand arguments referencing files
852 for arg_string in arg_strings:
854 # for regular arguments, just add them back into the list
855 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
856 new_arg_strings.append(arg_string)
858 # replace arguments referencing files with the file content
861 args_file = open(arg_string[1:])
864 for arg_line in args_file.read().splitlines():
865 for arg in self.convert_arg_line_to_args(arg_line):
866 arg_strings.append(arg)
867 arg_strings = self._read_args_from_files(arg_strings)
868 new_arg_strings.extend(arg_strings)
872 err = sys.exc_info()[1]
875 # return the modified argument list
876 return new_arg_strings
878 def convert_arg_line_to_args(self, arg_line):
879 return [arg for arg in arg_line.split() if arg.strip()]
882 realnets=dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
884 parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
885 parser.add_argument('--version', action='version', version=p2pool.__version__)
886 parser.add_argument('--net',
887 help='use specified network (default: bitcoin)',
888 action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
889 parser.add_argument('--testnet',
890 help='''use the network's testnet''',
891 action='store_const', const=True, default=False, dest='testnet')
892 parser.add_argument('--debug',
893 help='enable debugging mode',
894 action='store_const', const=True, default=False, dest='debug')
895 parser.add_argument('-a', '--address',
896 help='generate payouts to this address (default: <address requested from bitcoind>)',
897 type=str, action='store', default=None, dest='address')
898 parser.add_argument('--datadir',
899 help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
900 type=str, action='store', default=None, dest='datadir')
901 parser.add_argument('--logfile',
902 help='''log to this file (default: data/<NET>/log)''',
903 type=str, action='store', default=None, dest='logfile')
904 parser.add_argument('--merged',
905 help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
906 type=str, action='append', default=[], dest='merged_urls')
907 parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
908 help='donate this percentage of work towards the development of p2pool (default: 0.5)',
909 type=float, action='store', default=0.5, dest='donation_percentage')
910 parser.add_argument('--iocp',
911 help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
912 action='store_true', default=False, dest='iocp')
913 parser.add_argument('--irc-announce',
914 help='announce any blocks found on irc://irc.freenode.net/#p2pool',
915 action='store_true', default=False, dest='irc_announce')
916 parser.add_argument('--no-bugreport',
917 help='disable submitting caught exceptions to the author',
918 action='store_true', default=False, dest='no_bugreport')
920 p2pool_group = parser.add_argument_group('p2pool interface')
921 p2pool_group.add_argument('--p2pool-port', metavar='PORT',
922 help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
923 type=int, action='store', default=None, dest='p2pool_port')
924 p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
925 help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
926 type=str, action='append', default=[], dest='p2pool_nodes')
927 parser.add_argument('--disable-upnp',
928 help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
929 action='store_false', default=True, dest='upnp')
930 p2pool_group.add_argument('--max-conns', metavar='CONNS',
931 help='maximum incoming connections (default: 40)',
932 type=int, action='store', default=40, dest='p2pool_conns')
934 worker_group = parser.add_argument_group('worker interface')
935 worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
936 help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
937 type=str, action='store', default=None, dest='worker_endpoint')
938 worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
939 help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
940 type=float, action='store', default=0, dest='worker_fee')
942 bitcoind_group = parser.add_argument_group('bitcoind interface')
943 bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
944 help='connect to this address (default: 127.0.0.1)',
945 type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
946 bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
947 help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
948 type=int, action='store', default=None, dest='bitcoind_rpc_port')
949 bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
950 help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
951 type=int, action='store', default=None, dest='bitcoind_p2p_port')
953 bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
954 help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
955 type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
957 args = parser.parse_args()
962 net_name = args.net_name + ('_testnet' if args.testnet else '')
963 net = networks.nets[net_name]
965 datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
966 if not os.path.exists(datadir_path):
967 os.makedirs(datadir_path)
969 if len(args.bitcoind_rpc_userpass) > 2:
970 parser.error('a maximum of two arguments are allowed')
971 args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
973 if args.bitcoind_rpc_password is None:
974 if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
975 parser.error('This network has no configuration file function. Manually enter your RPC password.')
976 conf_path = net.PARENT.CONF_FILE_FUNC()
977 if not os.path.exists(conf_path):
978 parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
979 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
982 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
983 with open(conf_path, 'rb') as f:
984 cp = ConfigParser.RawConfigParser()
985 cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
986 for conf_name, var_name, var_type in [
987 ('rpcuser', 'bitcoind_rpc_username', str),
988 ('rpcpassword', 'bitcoind_rpc_password', str),
989 ('rpcport', 'bitcoind_rpc_port', int),
990 ('port', 'bitcoind_p2p_port', int),
992 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
993 setattr(args, var_name, var_type(cp.get('x', conf_name)))
994 if args.bitcoind_rpc_password is None:
995 parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
997 if args.bitcoind_rpc_username is None:
998 args.bitcoind_rpc_username = ''
1000 if args.bitcoind_rpc_port is None:
1001 args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1003 if args.bitcoind_p2p_port is None:
1004 args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1006 if args.p2pool_port is None:
1007 args.p2pool_port = net.P2P_PORT
1009 if args.worker_endpoint is None:
1010 worker_endpoint = '', net.WORKER_PORT
1011 elif ':' not in args.worker_endpoint:
1012 worker_endpoint = '', int(args.worker_endpoint)
1014 addr, port = args.worker_endpoint.rsplit(':', 1)
1015 worker_endpoint = addr, int(port)
1017 if args.address is not None:
1019 args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1020 except Exception, e:
1021 parser.error('error parsing address: ' + repr(e))
1023 args.pubkey_hash = None
1025 def separate_url(url):
1026 s = urlparse.urlsplit(url)
1027 if '@' not in s.netloc:
1028 parser.error('merged url netloc must contain an "@"')
1029 userpass, new_netloc = s.netloc.rsplit('@', 1)
1030 return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
1031 merged_urls = map(separate_url, args.merged_urls)
1033 if args.logfile is None:
1034 args.logfile = os.path.join(datadir_path, 'log')
1036 logfile = logging.LogFile(args.logfile)
1037 pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
1038 sys.stdout = logging.AbortPipe(pipe)
1039 sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
1040 if hasattr(signal, "SIGUSR1"):
1041 def sigusr1(signum, frame):
1042 print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1044 print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1045 signal.signal(signal.SIGUSR1, sigusr1)
1046 task.LoopingCall(logfile.reopen).start(5)
1048 class ErrorReporter(object):
1050 self.last_sent = None
1052 def emit(self, eventDict):
1053 if not eventDict["isError"]:
1056 if self.last_sent is not None and time.time() < self.last_sent + 5:
1058 self.last_sent = time.time()
1060 if 'failure' in eventDict:
1061 text = ((eventDict.get('why') or 'Unhandled Error')
1062 + '\n' + eventDict['failure'].getTraceback())
1064 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
1066 from twisted.web import client
1068 url='http://u.forre.st/p2pool_error.cgi',
1070 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
1072 ).addBoth(lambda x: None)
1073 if not args.no_bugreport:
1074 log.addObserver(ErrorReporter().emit)
1076 reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)