From: Forrest Voight Date: Fri, 19 Oct 2012 00:56:33 +0000 (-0400) Subject: refactored p2pool node implementation from p2pool.main to p2pool.node. dedicated... X-Git-Tag: 9.0~39 X-Git-Url: https://git.novaco.in/?p=p2pool.git;a=commitdiff_plain;h=764a90c3ab7c0fe29a85fd588333d17a1138ac6a refactored p2pool node implementation from p2pool.main to p2pool.node. dedicated to luke-jr. --- diff --git a/p2pool/bitcoin/helper.py b/p2pool/bitcoin/helper.py new file mode 100644 index 0000000..3d5810b --- /dev/null +++ b/p2pool/bitcoin/helper.py @@ -0,0 +1,78 @@ +import sys +import time + +from twisted.internet import defer + +import p2pool +from p2pool.bitcoin import data as bitcoin_data +from p2pool.util import deferral, jsonrpc + +@deferral.retry('Error while checking Bitcoin connection:', 1) +@defer.inlineCallbacks +def check(bitcoind, net): + if not (yield net.PARENT.RPC_CHECK(bitcoind)): + print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!" + raise deferral.RetrySilentlyException() + if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']): + print >>sys.stderr, ' Bitcoin version too old! Upgrade to 0.6.4 or newer!' + raise deferral.RetrySilentlyException() + +@deferral.retry('Error getting work from bitcoind:', 3) +@defer.inlineCallbacks +def getwork(bitcoind, use_getblocktemplate=False): + def go(): + if use_getblocktemplate: + return bitcoind.rpc_getblocktemplate(dict(mode='template')) + else: + return bitcoind.rpc_getmemorypool() + try: + work = yield go() + except jsonrpc.Error_for_code(-32601): # Method not found + use_getblocktemplate = not use_getblocktemplate + try: + work = yield go() + except jsonrpc.Error_for_code(-32601): # Method not found + print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!' + raise deferral.RetrySilentlyException() + packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']] + if 'height' not in work: + work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1 + elif p2pool.DEBUG: + assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1 + defer.returnValue(dict( + version=work['version'], + previous_block=int(work['previousblockhash'], 16), + transactions=map(bitcoin_data.tx_type.unpack, packed_transactions), + transaction_hashes=map(bitcoin_data.hash256, packed_transactions), + subsidy=work['coinbasevalue'], + time=work['time'] if 'time' in work else work['curtime'], + bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']), + coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '', + height=work['height'], + last_update=time.time(), + use_getblocktemplate=use_getblocktemplate, + )) + +@deferral.retry('Error submitting primary block: (will retry)', 10, 10) +def submit_block_p2p(block, factory, net): + if factory.conn.value is None: + print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header']))) + raise deferral.RetrySilentlyException() + factory.conn.value.send_block(block=block) + +@deferral.retry('Error submitting block: (will retry)', 10, 10) +@defer.inlineCallbacks +def submit_block_rpc(block, ignore_failure, bitcoind, bitcoind_work, net): + if bitcoind_work.value['use_getblocktemplate']: + result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex')) + success = result is None + else: + result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex')) + success = result + success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target + if (not success and success_expected and not ignore_failure) or (success and not success_expected): + print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected) + +def submit_block(block, ignore_failure, factory, bitcoind, bitcoind_work, net): + submit_block_p2p(block, factory, net) + submit_block_rpc(block, ignore_failure, bitcoind, bitcoind_work, net) diff --git a/p2pool/data.py b/p2pool/data.py index 2aa0efc..e24e90b 100644 --- a/p2pool/data.py +++ b/p2pool/data.py @@ -601,7 +601,7 @@ class WeightsSkipList(forest.TrackerSkipList): return math.add_dicts(*math.flatten_linked_list(weights_list)), total_weight, total_donation_weight class OkayTracker(forest.Tracker): - def __init__(self, net, my_share_hashes, my_doa_share_hashes): + def __init__(self, net): forest.Tracker.__init__(self, delta_type=forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs, work=lambda share: bitcoin_data.target_to_average_attempts(share.target), min_work=lambda share: bitcoin_data.target_to_average_attempts(share.max_target), @@ -609,10 +609,6 @@ class OkayTracker(forest.Tracker): self.net = net self.verified = forest.SubsetTracker(delta_type=forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs, work=lambda share: bitcoin_data.target_to_average_attempts(share.target), - my_count=lambda share: 1 if share.hash in my_share_hashes else 0, - my_doa_count=lambda share: 1 if share.hash in my_doa_share_hashes else 0, - my_orphan_announce_count=lambda share: 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 'orphan' else 0, - my_dead_announce_count=lambda share: 1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 'doa' else 0, )), subset_of=self) self.get_cumulative_weights = WeightsSkipList(self) diff --git a/p2pool/main.py b/p2pool/main.py index ff85aac..7eb6d3e 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -19,46 +19,10 @@ from twisted.python import log from nattraverso import portmapper, ipdiscover import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data -from bitcoin import worker_interface, height_tracker +from bitcoin import worker_interface, helper from util import fixargparse, jsonrpc, variable, deferral, math, logging -from . import p2p, networks, web, work -import p2pool, p2pool.data as p2pool_data - -@deferral.retry('Error getting work from bitcoind:', 3) -@defer.inlineCallbacks -def getwork(bitcoind, use_getblocktemplate=False): - def go(): - if use_getblocktemplate: - return bitcoind.rpc_getblocktemplate(dict(mode='template')) - else: - return bitcoind.rpc_getmemorypool() - try: - work = yield go() - except jsonrpc.Error_for_code(-32601): # Method not found - use_getblocktemplate = not use_getblocktemplate - try: - work = yield go() - except jsonrpc.Error_for_code(-32601): # Method not found - print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!' - raise deferral.RetrySilentlyException() - packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']] - if 'height' not in work: - work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1 - elif p2pool.DEBUG: - assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1 - defer.returnValue(dict( - version=work['version'], - previous_block=int(work['previousblockhash'], 16), - transactions=map(bitcoin_data.tx_type.unpack, packed_transactions), - transaction_hashes=map(bitcoin_data.hash256, packed_transactions), - subsidy=work['coinbasevalue'], - time=work['time'] if 'time' in work else work['curtime'], - bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']), - coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '', - height=work['height'], - last_update=time.time(), - use_getblocktemplate=use_getblocktemplate, - )) +from . import networks, web, work +import p2pool, p2pool.data as p2pool_data, p2pool.node as p2pool_node @defer.inlineCallbacks def main(args, net, datadir_path, merged_urls, worker_endpoint): @@ -66,8 +30,6 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'p2pool (version %s)' % (p2pool.__version__,) print - traffic_happened = variable.Event() - @defer.inlineCallbacks def connect_p2p(): # connect to bitcoind over bitcoin-p2p @@ -86,17 +48,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port) print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username) bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30) - @deferral.retry('Error while checking Bitcoin connection:', 1) - @defer.inlineCallbacks - def check(): - if not (yield net.PARENT.RPC_CHECK(bitcoind)): - print >>sys.stderr, " Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!" - raise deferral.RetrySilentlyException() - if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']): - print >>sys.stderr, ' Bitcoin version too old! Upgrade to 0.6.4 or newer!' - raise deferral.RetrySilentlyException() - yield check() - temp_work = yield getwork(bitcoind) + yield helper.check(bitcoind, net) + temp_work = yield helper.getwork(bitcoind) bitcoind_warning_var = variable.Variable(None) @defer.inlineCallbacks @@ -144,222 +97,49 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print ' ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT) print - my_share_hashes = set() - my_doa_share_hashes = set() - - tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes) - shared_share_hashes = set() ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net) + shares = {} known_verified = set() print "Loading shares..." for i, (mode, contents) in enumerate(ss.get_shares()): if mode == 'share': - if contents.hash in tracker.items: - continue - shared_share_hashes.add(contents.hash) contents.time_seen = 0 - tracker.add(contents) - if len(tracker.items) % 1000 == 0 and tracker.items: - print " %i" % (len(tracker.items),) + shares[contents.hash] = contents + if len(shares) % 1000 == 0 and shares: + print " %i" % (len(shares),) elif mode == 'verified_hash': known_verified.add(contents) else: raise AssertionError() - print " ...inserting %i verified shares..." % (len(known_verified),) - for h in known_verified: - if h not in tracker.items: - ss.forget_verified_share(h) - continue - tracker.verified.add(tracker.items[h]) - print " ...done loading %i shares!" % (len(tracker.items),) + print " ...done loading %i shares (%i verified)!" % (len(shares), len(known_verified)) print - tracker.removed.watch(lambda share: ss.forget_share(share.hash)) - tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash)) - tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash)) - - print 'Initializing work...' - - # BITCOIND WORK - bitcoind_work = variable.Variable((yield getwork(bitcoind))) - @defer.inlineCallbacks - def work_poller(): - while True: - flag = factory.new_block.get_deferred() - try: - bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate']))) - except: - log.err() - yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True) - work_poller() - - # PEER WORK - - best_block_header = variable.Variable(None) - def handle_header(new_header): - # check that header matches current target - if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target): - return - bitcoind_best_block = bitcoind_work.value['previous_block'] - if (best_block_header.value is None - or ( - new_header['previous_block'] == bitcoind_best_block and - bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block - ) # new is child of current and previous is current - or ( - bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and - best_block_header.value['previous_block'] != bitcoind_best_block - )): # new is current and previous is not a child of current - best_block_header.set(new_header) - @defer.inlineCallbacks - def poll_header(): - handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block']))) - bitcoind_work.changed.watch(lambda _: poll_header()) - yield deferral.retry('Error while requesting best block header:')(poll_header)() + print 'Initializing work...' - # BEST SHARE + node = p2pool_node.Node(factory, bitcoind, shares.values(), known_verified, net) + yield node.start() - known_txs_var = variable.Variable({}) # hash -> tx - mining_txs_var = variable.Variable({}) # hash -> tx - get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net) + for share_hash in shares: + if share_hash not in node.tracker.items: + ss.forget_share(share_hash) + for share_hash in known_verified: + if share_hash not in node.tracker.verified.items: + ss.forget_verified_share(share_hash) + del shares, known_verified + node.tracker.removed.watch(lambda share: ss.forget_share(share.hash)) + node.tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash)) - best_share_var = variable.Variable(None) - desired_var = variable.Variable(None) - def set_best_share(): - best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value) - - best_share_var.set(best) - desired_var.set(desired) - bitcoind_work.changed.watch(lambda _: set_best_share()) - set_best_share() + def save_shares(): + for share in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 2*net.CHAIN_LENGTH)): + ss.add_share(share) + if share.hash in node.tracker.verified.items: + ss.add_verified_hash(share.hash) + task.LoopingCall(save_shares).start(60) print ' ...success!' print - # setup p2p logic and join p2pool network - - # update mining_txs according to getwork results - @bitcoind_work.changed.run_and_watch - def _(_=None): - new_mining_txs = {} - new_known_txs = dict(known_txs_var.value) - for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']): - new_mining_txs[tx_hash] = tx - new_known_txs[tx_hash] = tx - mining_txs_var.set(new_mining_txs) - known_txs_var.set(new_known_txs) - # add p2p transactions from bitcoind to known_txs - @factory.new_tx.watch - def _(tx): - new_known_txs = dict(known_txs_var.value) - new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx - known_txs_var.set(new_known_txs) - # forward transactions seen to bitcoind - @known_txs_var.transitioned.watch - @defer.inlineCallbacks - def _(before, after): - yield deferral.sleep(random.expovariate(1/1)) - for tx_hash in set(after) - set(before): - factory.conn.value.send_tx(tx=after[tx_hash]) - - class Node(p2p.Node): - def handle_shares(self, shares, peer): - if len(shares) > 5: - print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None) - - new_count = 0 - for share in shares: - if share.hash in tracker.items: - #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),) - continue - - new_count += 1 - - #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None) - - tracker.add(share) - - if new_count: - set_best_share() - - if len(shares) > 5: - print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH) - - @defer.inlineCallbacks - def handle_share_hashes(self, hashes, peer): - new_hashes = [x for x in hashes if x not in tracker.items] - if not new_hashes: - return - try: - shares = yield peer.get_shares( - hashes=new_hashes, - parents=0, - stops=[], - ) - except: - log.err(None, 'in handle_share_hashes:') - else: - self.handle_shares(shares, peer) - - def handle_get_shares(self, hashes, parents, stops, peer): - parents = min(parents, 1000//len(hashes)) - stops = set(stops) - shares = [] - for share_hash in hashes: - for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))): - if share.hash in stops: - break - shares.append(share) - print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1]) - return shares - - def handle_bestblock(self, header, peer): - if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target: - raise p2p.PeerMisbehavingError('received block header fails PoW test') - handle_header(header) - - @deferral.retry('Error submitting primary block: (will retry)', 10, 10) - def submit_block_p2p(block): - if factory.conn.value is None: - print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header']))) - raise deferral.RetrySilentlyException() - factory.conn.value.send_block(block=block) - - @deferral.retry('Error submitting block: (will retry)', 10, 10) - @defer.inlineCallbacks - def submit_block_rpc(block, ignore_failure): - if bitcoind_work.value['use_getblocktemplate']: - result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex')) - success = result is None - else: - result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex')) - success = result - success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target - if (not success and success_expected and not ignore_failure) or (success and not success_expected): - print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected) - - def submit_block(block, ignore_failure): - submit_block_p2p(block) - submit_block_rpc(block, ignore_failure) - - @tracker.verified.added.watch - def _(share): - if share.pow_hash <= share.header['bits'].target: - block = share.as_block(tracker, known_txs_var.value) - if block is None: - print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) - return - submit_block(block, ignore_failure=True) - print - print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) - print - def spread(): - if (get_height_rel_highest(share.header['previous_block']) > -5 or - bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]): - broadcast_share(share.hash) - spread() - reactor.callLater(5, spread) # so get_height_rel_highest can update print 'Joining p2pool network using port %i...' % (args.p2pool_port,) @@ -393,91 +173,14 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): except: log.err() - p2p_node = Node( - best_share_hash_func=lambda: best_share_var.value, - port=args.p2pool_port, - net=net, - addr_store=addrs, - connect_addrs=connect_addrs, - max_incoming_conns=args.p2pool_conns, - traffic_happened=traffic_happened, - known_txs_var=known_txs_var, - mining_txs_var=mining_txs_var, - ) - p2p_node.start() - - def forget_old_txs(): - new_known_txs = {} - for peer in p2p_node.peers.itervalues(): - new_known_txs.update(peer.remembered_txs) - new_known_txs.update(mining_txs_var.value) - for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))): - for tx_hash in share.new_transaction_hashes: - if tx_hash in known_txs_var.value: - new_known_txs[tx_hash] = known_txs_var.value[tx_hash] - known_txs_var.set(new_known_txs) - task.LoopingCall(forget_old_txs).start(10) + node.p2p_node = p2pool_node.P2PNode(node, args.p2pool_port, args.p2pool_conns, addrs, connect_addrs) + node.p2p_node.start() def save_addrs(): with open(os.path.join(datadir_path, 'addrs'), 'wb') as f: - f.write(json.dumps(p2p_node.addr_store.items())) + f.write(json.dumps(node.p2p_node.addr_store.items())) task.LoopingCall(save_addrs).start(60) - @best_block_header.changed.watch - def _(header): - for peer in p2p_node.peers.itervalues(): - peer.send_bestblock(header=header) - - @defer.inlineCallbacks - def broadcast_share(share_hash): - shares = [] - for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))): - if share.hash in shared_share_hashes: - break - shared_share_hashes.add(share.hash) - shares.append(share) - - for peer in list(p2p_node.peers.itervalues()): - yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash]) - - # send share when the chain changes to their chain - best_share_var.changed.watch(broadcast_share) - - def save_shares(): - for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)): - ss.add_share(share) - if share.hash in tracker.verified.items: - ss.add_verified_hash(share.hash) - task.LoopingCall(save_shares).start(60) - - @apply - @defer.inlineCallbacks - def download_shares(): - while True: - desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0) - peer2, share_hash = random.choice(desired) - - if len(p2p_node.peers) == 0: - yield deferral.sleep(1) - continue - peer = random.choice(p2p_node.peers.values()) - - print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr) - try: - shares = yield peer.get_shares( - hashes=[share_hash], - parents=500, - stops=[], - ) - except: - log.err(None, 'in download_shares:') - continue - - if not shares: - yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has - continue - p2p_node.handle_shares(shares, peer) - print ' ...success!' print @@ -502,10 +205,8 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1]) - get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net) - - wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share) - web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened, args.donation_percentage) + wb = work.WorkerBridge(node, my_pubkey_hash, args.donation_percentage, merged_urls, args.worker_fee) + web_root = web.get_web_root(wb, datadir_path, bitcoind_warning_var) worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/')) deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0]) @@ -561,7 +262,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages): self.say(self.channel, message) self._remember_message(message) - self.watch_id = tracker.verified.added.watch(new_share) + self.watch_id = node.tracker.verified.added.watch(new_share) self.recent_messages = [] def joined(self, channel): self.in_channel = True @@ -575,7 +276,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): if channel == self.channel: self._remember_message(message) def connectionLost(self, reason): - tracker.verified.added.unwatch(self.watch_id) + node.tracker.verified.added.unwatch(self.watch_id) print 'IRC connection lost:', reason.getErrorMessage() class IRCClientFactory(protocol.ReconnectingClientFactory): protocol = IRCClient @@ -588,13 +289,13 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): while True: yield deferral.sleep(3) try: - height = tracker.get_height(best_share_var.value) + height = node.tracker.get_height(node.best_share_var.value) this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % ( height, - len(tracker.verified.items), - len(tracker.items), - len(p2p_node.peers), - sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming), + len(node.tracker.verified.items), + len(node.tracker.items), + len(node.p2p_node.peers), + sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming), ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '') datums, dt = wb.local_rate_monitor.get_datums_in_last() @@ -603,27 +304,27 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): math.format(int(my_att_s)), math.format_dt(dt), math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95), - math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???', + math.format_dt(2**256 / node.tracker.items[node.best_share_var.value].max_target / my_att_s) if my_att_s and node.best_share_var.value else '???', ) if height > 2: (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts() - stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height)) - real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop) + stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, min(60*60//net.SHARE_PERIOD, height)) + real_att_s = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop) this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % ( shares, stale_orphan_shares, stale_doa_shares, math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95), math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)), - get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL, + node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL, ) this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % ( math.format(int(real_att_s)), 100*stale_prop, - math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s), + math.format_dt(2**256 / node.bitcoind_work.value['bits'].target / real_att_s), ) - for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value): + for warning in p2pool_data.get_warnings(node.tracker, node.best_share_var.value, net, bitcoind_warning_var.value, node.bitcoind_work.value): print >>sys.stderr, '#'*40 print >>sys.stderr, '>>> Warning: ' + warning print >>sys.stderr, '#'*40 diff --git a/p2pool/node.py b/p2pool/node.py new file mode 100644 index 0000000..406787c --- /dev/null +++ b/p2pool/node.py @@ -0,0 +1,281 @@ +import random +import sys + +from twisted.internet import defer, reactor, task +from twisted.python import log + +from p2pool import data as p2pool_data, p2p +from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker +from p2pool.util import deferral, variable + + +class P2PNode(p2p.Node): + def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs): + self.node = node + p2p.Node.__init__(self, + best_share_hash_func=lambda: node.best_share_var.value, + port=p2pool_port, + net=node.net, + addr_store=addrs, + connect_addrs=connect_addrs, + max_incoming_conns=p2pool_conns, + known_txs_var=node.known_txs_var, + mining_txs_var=node.mining_txs_var, + ) + + def handle_shares(self, shares, peer): + if len(shares) > 5: + print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None) + + new_count = 0 + for share in shares: + if share.hash in self.node.tracker.items: + #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),) + continue + + new_count += 1 + + #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None) + + self.node.tracker.add(share) + + if new_count: + self.node.set_best_share() + + if len(shares) > 5: + print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH) + + @defer.inlineCallbacks + def handle_share_hashes(self, hashes, peer): + new_hashes = [x for x in hashes if x not in self.node.tracker.items] + if not new_hashes: + return + try: + shares = yield peer.get_shares( + hashes=new_hashes, + parents=0, + stops=[], + ) + except: + log.err(None, 'in handle_share_hashes:') + else: + self.handle_shares(shares, peer) + + def handle_get_shares(self, hashes, parents, stops, peer): + parents = min(parents, 1000//len(hashes)) + stops = set(stops) + shares = [] + for share_hash in hashes: + for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))): + if share.hash in stops: + break + shares.append(share) + print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1]) + return shares + + def handle_bestblock(self, header, peer): + if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target: + raise p2p.PeerMisbehavingError('received block header fails PoW test') + self.node.handle_header(header) + + @defer.inlineCallbacks + def broadcast_share(self, share_hash): + shares = [] + for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))): + if share.hash in self.shared_share_hashes: + break + self.shared_share_hashes.add(share.hash) + shares.append(share) + + for peer in list(self.peers.itervalues()): + yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash]) + + def start(self): + p2p.Node.start(self) + + self.shared_share_hashes = set(self.node.tracker.items) + self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash)) + + @apply + @defer.inlineCallbacks + def download_shares(): + while True: + desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0) + peer2, share_hash = random.choice(desired) + + if len(self.peers) == 0: + yield deferral.sleep(1) + continue + peer = random.choice(self.peers.values()) + + print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr) + try: + shares = yield peer.get_shares( + hashes=[share_hash], + parents=500, + stops=[], + ) + except: + log.err(None, 'in download_shares:') + continue + + if not shares: + yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has + continue + self.handle_shares(shares, peer) + + + @self.node.best_block_header.changed.watch + def _(header): + for peer in self.peers.itervalues(): + peer.send_bestblock(header=header) + + # send share when the chain changes to their chain + self.node.best_share_var.changed.watch(self.broadcast_share) + + @self.node.tracker.verified.added.watch + def _(share): + if not (share.pow_hash <= share.header['bits'].target): + return + + def spread(): + if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or + self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]): + self.broadcast_share(share.hash) + spread() + reactor.callLater(5, spread) # so get_height_rel_highest can update + + +class Node(object): + def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net): + self.factory = factory + self.bitcoind = bitcoind + self.net = net + + self.tracker = p2pool_data.OkayTracker(self.net) + + for share in shares: + self.tracker.add(share) + + for share_hash in known_verified_share_hashes: + if share_hash in self.tracker.items: + self.tracker.verified.add(self.tracker.items[share_hash]) + + self.p2p_node = None # overwritten externally + + @defer.inlineCallbacks + def start(self): + stop_signal = variable.Event() + self.stop = stop_signal.happened + + # BITCOIND WORK + + self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind))) + @defer.inlineCallbacks + def work_poller(): + while True: + flag = self.factory.new_block.get_deferred() + try: + self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate']))) + except: + log.err() + yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True) + work_poller() + + # PEER WORK + + self.best_block_header = variable.Variable(None) + def handle_header(new_header): + # check that header matches current target + if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target): + return + bitcoind_best_block = self.bitcoind_work.value['previous_block'] + if (self.best_block_header.value is None + or ( + new_header['previous_block'] == bitcoind_best_block and + bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block + ) # new is child of current and previous is current + or ( + bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and + self.best_block_header.value['previous_block'] != bitcoind_best_block + )): # new is current and previous is not a child of current + self.best_block_header.set(new_header) + self.handle_header = handle_header + @defer.inlineCallbacks + def poll_header(): + handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block']))) + self.bitcoind_work.changed.watch(lambda _: poll_header()) + yield deferral.retry('Error while requesting best block header:')(poll_header)() + + # BEST SHARE + + self.known_txs_var = variable.Variable({}) # hash -> tx + self.mining_txs_var = variable.Variable({}) # hash -> tx + self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net) + + self.best_share_var = variable.Variable(None) + self.desired_var = variable.Variable(None) + self.bitcoind_work.changed.watch(lambda _: self.set_best_share()) + self.set_best_share() + + # setup p2p logic and join p2pool network + + # update mining_txs according to getwork results + @self.bitcoind_work.changed.run_and_watch + def _(_=None): + new_mining_txs = {} + new_known_txs = dict(self.known_txs_var.value) + for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']): + new_mining_txs[tx_hash] = tx + new_known_txs[tx_hash] = tx + self.mining_txs_var.set(new_mining_txs) + self.known_txs_var.set(new_known_txs) + # add p2p transactions from bitcoind to known_txs + @self.factory.new_tx.watch + def _(tx): + new_known_txs = dict(self.known_txs_var.value) + new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx + self.known_txs_var.set(new_known_txs) + # forward transactions seen to bitcoind + @self.known_txs_var.transitioned.watch + @defer.inlineCallbacks + def _(before, after): + yield deferral.sleep(random.expovariate(1/1)) + for tx_hash in set(after) - set(before): + self.factory.conn.value.send_tx(tx=after[tx_hash]) + + @self.tracker.verified.added.watch + def _(share): + if not (share.pow_hash <= share.header['bits'].target): + return + + block = share.as_block(self.tracker, self.known_txs_var.value) + if block is None: + print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) + return + helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net) + print + print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash) + print + + def forget_old_txs(): + new_known_txs = {} + if self.p2p_node is not None: + for peer in self.p2p_node.peers.itervalues(): + new_known_txs.update(peer.remembered_txs) + new_known_txs.update(self.mining_txs_var.value) + for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))): + for tx_hash in share.new_transaction_hashes: + if tx_hash in self.known_txs_var.value: + new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash] + self.known_txs_var.set(new_known_txs) + task.LoopingCall(forget_old_txs).start(10) + + def set_best_share(self): + best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value) + + self.best_share_var.set(best) + self.desired_var.set(desired) + + def get_current_txouts(self): + return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net) diff --git a/p2pool/p2p.py b/p2pool/p2p.py index 7d3c860..22a731f 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -552,17 +552,17 @@ class SingleClientFactory(protocol.ReconnectingClientFactory): self.node.lost_conn(proto, reason) class Node(object): - def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, traffic_happened=variable.Event(), known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})): + def __init__(self, best_share_hash_func, port, net, addr_store={}, connect_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000, known_txs_var=variable.Variable({}), mining_txs_var=variable.Variable({})): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net self.addr_store = dict(addr_store) self.connect_addrs = connect_addrs self.preferred_storage = preferred_storage - self.traffic_happened = traffic_happened self.known_txs_var = known_txs_var self.mining_txs_var = mining_txs_var + self.traffic_happened = variable.Event() self.nonce = random.randrange(2**64) self.peers = {} self.bans = {} # address -> end_time diff --git a/p2pool/web.py b/p2pool/web.py index 3eafa1b..ba7e9db 100644 --- a/p2pool/web.py +++ b/p2pool/web.py @@ -44,21 +44,22 @@ def _atomic_write(filename, data): os.remove(filename) os.rename(filename + '.new', filename) -def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, get_stale_counts, my_pubkey_hash, local_rate_monitor, worker_fee, p2p_node, my_share_hashes, pseudoshare_received, share_received, best_share_var, bitcoin_warning_var, traffic_happened, donation_percentage): +def get_web_root(wb, datadir_path, bitcoind_warning_var): + node = wb.node start_time = time.time() web_root = resource.Resource() def get_users(): - height, last = tracker.get_height_and_last(best_share_var.value) - weights, total_weight, donation_weight = tracker.get_cumulative_weights(best_share_var.value, min(height, 720), 65535*2**256) + height, last = node.tracker.get_height_and_last(node.best_share_var.value) + weights, total_weight, donation_weight = node.tracker.get_cumulative_weights(node.best_share_var.value, min(height, 720), 65535*2**256) res = {} for script in sorted(weights, key=lambda s: weights[s]): - res[bitcoin_data.script2_to_address(script, net.PARENT)] = weights[script]/total_weight + res[bitcoin_data.script2_to_address(script, node.net.PARENT)] = weights[script]/total_weight return res def get_current_scaled_txouts(scale, trunc=0): - txouts = get_current_txouts() + txouts = node.get_current_txouts() total = sum(txouts.itervalues()) results = dict((script, value*scale//total) for script, value in txouts.iteritems()) if trunc > 0: @@ -84,15 +85,15 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, total = int(float(total)*1e8) trunc = int(float(trunc)*1e8) return json.dumps(dict( - (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8) + (bitcoin_data.script2_to_address(script, node.net.PARENT), value/1e8) for script, value in get_current_scaled_txouts(total, trunc).iteritems() - if bitcoin_data.script2_to_address(script, net.PARENT) is not None + if bitcoin_data.script2_to_address(script, node.net.PARENT) is not None )) def get_local_rates(): miner_hash_rates = {} miner_dead_hash_rates = {} - datums, dt = local_rate_monitor.get_datums_in_last() + datums, dt = wb.local_rate_monitor.get_datums_in_last() for datum in datums: miner_hash_rates[datum['user']] = miner_hash_rates.get(datum['user'], 0) + datum['work']/dt if datum['dead']: @@ -101,43 +102,43 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, def get_global_stats(): # averaged over last hour - if tracker.get_height(best_share_var.value) < 10: + if node.tracker.get_height(node.best_share_var.value) < 10: return None - lookbehind = min(tracker.get_height(best_share_var.value), 3600//net.SHARE_PERIOD) + lookbehind = min(node.tracker.get_height(node.best_share_var.value), 3600//node.net.SHARE_PERIOD) - nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, lookbehind) - stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind) + nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, lookbehind) + stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind) return dict( pool_nonstale_hash_rate=nonstale_hash_rate, pool_hash_rate=nonstale_hash_rate/(1 - stale_prop), pool_stale_prop=stale_prop, - min_difficulty=bitcoin_data.target_to_difficulty(tracker.items[best_share_var.value].max_target), + min_difficulty=bitcoin_data.target_to_difficulty(node.tracker.items[node.best_share_var.value].max_target), ) def get_local_stats(): - if tracker.get_height(best_share_var.value) < 10: + if node.tracker.get_height(node.best_share_var.value) < 10: return None - lookbehind = min(tracker.get_height(best_share_var.value), 3600//net.SHARE_PERIOD) + lookbehind = min(node.tracker.get_height(node.best_share_var.value), 3600//node.net.SHARE_PERIOD) - global_stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind) + global_stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind) - my_unstale_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes) - my_orphan_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 'orphan') - my_doa_count = sum(1 for share in tracker.get_chain(best_share_var.value, lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 'doa') + my_unstale_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes) + my_orphan_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes and share.share_data['stale_info'] == 'orphan') + my_doa_count = sum(1 for share in node.tracker.get_chain(node.best_share_var.value, lookbehind) if share.hash in wb.my_share_hashes and share.share_data['stale_info'] == 'doa') my_share_count = my_unstale_count + my_orphan_count + my_doa_count my_stale_count = my_orphan_count + my_doa_count my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None my_work = sum(bitcoin_data.target_to_average_attempts(share.target) - for share in tracker.get_chain(best_share_var.value, lookbehind - 1) - if share.hash in my_share_hashes) - actual_time = (tracker.items[best_share_var.value].timestamp - - tracker.items[tracker.get_nth_parent_hash(best_share_var.value, lookbehind - 1)].timestamp) + for share in node.tracker.get_chain(node.best_share_var.value, lookbehind - 1) + if share.hash in wb.my_share_hashes) + actual_time = (node.tracker.items[node.best_share_var.value].timestamp - + node.tracker.items[node.tracker.get_nth_parent_hash(node.best_share_var.value, lookbehind - 1)].timestamp) share_att_s = my_work / actual_time miner_hash_rates, miner_dead_hash_rates = get_local_rates() - (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts() + (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts() return dict( my_hash_rates_in_last_hour=dict( @@ -163,8 +164,8 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, efficiency_if_miner_perfect=(1 - stale_orphan_shares/shares)/(1 - global_stale_prop) if shares else None, # ignores dead shares because those are miner's fault and indicated by pseudoshare rejection efficiency=(1 - (stale_orphan_shares+stale_doa_shares)/shares)/(1 - global_stale_prop) if shares else None, peers=dict( - incoming=sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming), - outgoing=sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming), + incoming=sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming), + outgoing=sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming), ), shares=dict( total=shares, @@ -172,11 +173,11 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, dead=stale_doa_shares, ), uptime=time.time() - start_time, - attempts_to_share=bitcoin_data.target_to_average_attempts(tracker.items[best_share_var.value].max_target), - attempts_to_block=bitcoin_data.target_to_average_attempts(bitcoind_work.value['bits'].target), - block_value=bitcoind_work.value['subsidy']*1e-8, - warnings=p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoin_warning_var.value, bitcoind_work.value), - donation_proportion=donation_percentage/100, + attempts_to_share=bitcoin_data.target_to_average_attempts(node.tracker.items[node.best_share_var.value].max_target), + attempts_to_block=bitcoin_data.target_to_average_attempts(node.bitcoind_work.value['bits'].target), + block_value=node.bitcoind_work.value['subsidy']*1e-8, + warnings=p2pool_data.get_warnings(node.tracker, node.best_share_var.value, node.net, bitcoind_warning_var.value, node.bitcoind_work.value), + donation_proportion=wb.donation_percentage/100, ) class WebInterface(deferred_resource.DeferredResource): @@ -194,18 +195,18 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, res = yield self.func(*self.args) defer.returnValue(json.dumps(res) if self.mime_type == 'application/json' else res) - web_root.putChild('rate', WebInterface(lambda: p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, 720)/(1-p2pool_data.get_average_stale_prop(tracker, best_share_var.value, 720)))) - web_root.putChild('difficulty', WebInterface(lambda: bitcoin_data.target_to_difficulty(tracker.items[best_share_var.value].max_target))) + web_root.putChild('rate', WebInterface(lambda: p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, 720)/(1-p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, 720)))) + web_root.putChild('difficulty', WebInterface(lambda: bitcoin_data.target_to_difficulty(node.tracker.items[node.best_share_var.value].max_target))) web_root.putChild('users', WebInterface(get_users)) - web_root.putChild('user_stales', WebInterface(lambda: dict((bitcoin_data.pubkey_hash_to_address(ph, net.PARENT), prop) for ph, prop in - p2pool_data.get_user_stale_props(tracker, best_share_var.value, tracker.get_height(best_share_var.value)).iteritems()))) - web_root.putChild('fee', WebInterface(lambda: worker_fee)) - web_root.putChild('current_payouts', WebInterface(lambda: dict((bitcoin_data.script2_to_address(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))) + web_root.putChild('user_stales', WebInterface(lambda: dict((bitcoin_data.pubkey_hash_to_address(ph, node.net.PARENT), prop) for ph, prop in + p2pool_data.get_user_stale_props(node.tracker, node.best_share_var.value, node.tracker.get_height(node.best_share_var.value)).iteritems()))) + web_root.putChild('fee', WebInterface(lambda: wb.worker_fee)) + web_root.putChild('current_payouts', WebInterface(lambda: dict((bitcoin_data.script2_to_address(script, node.net.PARENT), value/1e8) for script, value in node.get_current_txouts().iteritems()))) web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain')) web_root.putChild('global_stats', WebInterface(get_global_stats)) web_root.putChild('local_stats', WebInterface(get_local_stats)) - web_root.putChild('peer_addresses', WebInterface(lambda: ['%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port) for peer in p2p_node.peers.itervalues()])) - web_root.putChild('peer_txpool_sizes', WebInterface(lambda: dict(('%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port), peer.remembered_txs_size) for peer in p2p_node.peers.itervalues()))) + web_root.putChild('peer_addresses', WebInterface(lambda: ['%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port) for peer in node.p2p_node.peers.itervalues()])) + web_root.putChild('peer_txpool_sizes', WebInterface(lambda: dict(('%s:%i' % (peer.transport.getPeer().host, peer.transport.getPeer().port), peer.remembered_txs_size) for peer in node.p2p_node.peers.itervalues()))) web_root.putChild('pings', WebInterface(defer.inlineCallbacks(lambda: defer.returnValue( dict([(a, (yield b)) for a, b in [( @@ -213,19 +214,19 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, defer.inlineCallbacks(lambda peer=peer: defer.returnValue( min([(yield peer.do_ping().addCallback(lambda x: x/0.001).addErrback(lambda fail: None)) for i in xrange(3)]) ))() - ) for peer in list(p2p_node.peers.itervalues())] + ) for peer in list(node.p2p_node.peers.itervalues())] ]) )))) - web_root.putChild('peer_versions', WebInterface(lambda: dict(('%s:%i' % peer.addr, peer.other_sub_version) for peer in p2p_node.peers.itervalues()))) - web_root.putChild('payout_addr', WebInterface(lambda: bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT))) + web_root.putChild('peer_versions', WebInterface(lambda: dict(('%s:%i' % peer.addr, peer.other_sub_version) for peer in node.p2p_node.peers.itervalues()))) + web_root.putChild('payout_addr', WebInterface(lambda: bitcoin_data.pubkey_hash_to_address(wb.my_pubkey_hash, node.net.PARENT))) web_root.putChild('recent_blocks', WebInterface(lambda: [dict( ts=s.timestamp, hash='%064x' % s.header_hash, number=pack.IntType(24).unpack(s.share_data['coinbase'][1:4]), share='%064x' % s.hash, - ) for s in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 24*60*60//net.SHARE_PERIOD)) if s.pow_hash <= s.header['bits'].target])) + ) for s in node.tracker.get_chain(node.best_share_var.value, min(node.tracker.get_height(node.best_share_var.value), 24*60*60//node.net.SHARE_PERIOD)) if s.pow_hash <= s.header['bits'].target])) web_root.putChild('uptime', WebInterface(lambda: time.time() - start_time)) - web_root.putChild('stale_rates', WebInterface(lambda: p2pool_data.get_stale_counts(tracker, best_share_var.value, 720, rates=True))) + web_root.putChild('stale_rates', WebInterface(lambda: p2pool_data.get_stale_counts(node.tracker, node.best_share_var.value, 720, rates=True))) new_root = resource.Resource() web_root.putChild('web', new_root) @@ -241,31 +242,31 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, while stat_log and stat_log[0]['time'] < time.time() - 24*60*60: stat_log.pop(0) - lookbehind = 3600//net.SHARE_PERIOD - if tracker.get_height(best_share_var.value) < lookbehind: + lookbehind = 3600//node.net.SHARE_PERIOD + if node.tracker.get_height(node.best_share_var.value) < lookbehind: return None - global_stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, lookbehind) - (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts() + global_stale_prop = p2pool_data.get_average_stale_prop(node.tracker, node.best_share_var.value, lookbehind) + (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts() miner_hash_rates, miner_dead_hash_rates = get_local_rates() stat_log.append(dict( time=time.time(), - pool_hash_rate=p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, lookbehind)/(1-global_stale_prop), + pool_hash_rate=p2pool_data.get_pool_attempts_per_second(node.tracker, node.best_share_var.value, lookbehind)/(1-global_stale_prop), pool_stale_prop=global_stale_prop, local_hash_rates=miner_hash_rates, local_dead_hash_rates=miner_dead_hash_rates, shares=shares, stale_shares=stale_orphan_shares + stale_doa_shares, stale_shares_breakdown=dict(orphan=stale_orphan_shares, doa=stale_doa_shares), - current_payout=get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, + current_payout=node.get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(wb.my_pubkey_hash), 0)*1e-8, peers=dict( - incoming=sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming), - outgoing=sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming), + incoming=sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming), + outgoing=sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming), ), - attempts_to_share=bitcoin_data.target_to_average_attempts(tracker.items[best_share_var.value].max_target), - attempts_to_block=bitcoin_data.target_to_average_attempts(bitcoind_work.value['bits'].target), - block_value=bitcoind_work.value['subsidy']*1e-8, + attempts_to_share=bitcoin_data.target_to_average_attempts(node.tracker.items[node.best_share_var.value].max_target), + attempts_to_block=bitcoin_data.target_to_average_attempts(node.bitcoind_work.value['bits'].target), + block_value=node.bitcoind_work.value['subsidy']*1e-8, )) with open(os.path.join(datadir_path, 'stats'), 'wb') as f: @@ -274,15 +275,15 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, new_root.putChild('log', WebInterface(lambda: stat_log)) def get_share(share_hash_str): - if int(share_hash_str, 16) not in tracker.items: + if int(share_hash_str, 16) not in node.tracker.items: return None - share = tracker.items[int(share_hash_str, 16)] + share = node.tracker.items[int(share_hash_str, 16)] return dict( parent='%064x' % share.previous_hash, - children=['%064x' % x for x in sorted(tracker.reverse.get(share.hash, set()), key=lambda sh: -len(tracker.reverse.get(sh, set())))], # sorted from most children to least children + children=['%064x' % x for x in sorted(node.tracker.reverse.get(share.hash, set()), key=lambda sh: -len(node.tracker.reverse.get(sh, set())))], # sorted from most children to least children local=dict( - verified=share.hash in tracker.verified.items, + verified=share.hash in node.tracker.verified.items, time_first_seen=start_time if share.time_seen == 0 else share.time_seen, peer_first_received_from=share.peer.addr if share.peer is not None else None, ), @@ -290,7 +291,7 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, timestamp=share.timestamp, target=share.target, max_target=share.max_target, - payout_address=bitcoin_data.script2_to_address(share.new_script, net.PARENT), + payout_address=bitcoin_data.script2_to_address(share.new_script, node.net.PARENT), donation=share.share_data['donation']/65535, stale_info=share.share_data['stale_info'], nonce=share.share_data['nonce'], @@ -315,21 +316,21 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, ), ) new_root.putChild('share', WebInterface(lambda share_hash_str: get_share(share_hash_str))) - new_root.putChild('heads', WebInterface(lambda: ['%064x' % x for x in tracker.heads])) - new_root.putChild('verified_heads', WebInterface(lambda: ['%064x' % x for x in tracker.verified.heads])) - new_root.putChild('tails', WebInterface(lambda: ['%064x' % x for t in tracker.tails for x in tracker.reverse.get(t, set())])) - new_root.putChild('verified_tails', WebInterface(lambda: ['%064x' % x for t in tracker.verified.tails for x in tracker.verified.reverse.get(t, set())])) - new_root.putChild('best_share_hash', WebInterface(lambda: '%064x' % best_share_var.value)) + new_root.putChild('heads', WebInterface(lambda: ['%064x' % x for x in node.tracker.heads])) + new_root.putChild('verified_heads', WebInterface(lambda: ['%064x' % x for x in node.tracker.verified.heads])) + new_root.putChild('tails', WebInterface(lambda: ['%064x' % x for t in node.tracker.tails for x in node.tracker.reverse.get(t, set())])) + new_root.putChild('verified_tails', WebInterface(lambda: ['%064x' % x for t in node.tracker.verified.tails for x in node.tracker.verified.reverse.get(t, set())])) + new_root.putChild('best_share_hash', WebInterface(lambda: '%064x' % node.best_share_var.value)) def get_share_data(share_hash_str): - if int(share_hash_str, 16) not in tracker.items: + if int(share_hash_str, 16) not in node.tracker.items: return '' - share = tracker.items[int(share_hash_str, 16)] + share = node.tracker.items[int(share_hash_str, 16)] return p2pool_data.share_type.pack(share.as_share1a()) new_root.putChild('share_data', WebInterface(lambda share_hash_str: get_share_data(share_hash_str), 'application/octet-stream')) new_root.putChild('currency_info', WebInterface(lambda: dict( - symbol=net.PARENT.SYMBOL, - block_explorer_url_prefix=net.PARENT.BLOCK_EXPLORER_URL_PREFIX, - address_explorer_url_prefix=net.PARENT.ADDRESS_EXPLORER_URL_PREFIX, + symbol=node.net.PARENT.SYMBOL, + block_explorer_url_prefix=node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, + address_explorer_url_prefix=node.net.PARENT.ADDRESS_EXPLORER_URL_PREFIX, ))) new_root.putChild('version', WebInterface(lambda: p2pool.__version__)) @@ -392,7 +393,7 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, 'traffic_rate': graph.DataStreamDescription(dataview_descriptions, is_gauge=False, multivalues=True), }, hd_obj) task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))).start(100) - @pseudoshare_received.watch + @wb.pseudoshare_received.watch def _(work, dead, user): t = time.time() hd.datastreams['local_hash_rate'].add_datum(t, work) @@ -402,33 +403,33 @@ def get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, hd.datastreams['miner_hash_rates'].add_datum(t, {user: work}) if dead: hd.datastreams['miner_dead_hash_rates'].add_datum(t, {user: work}) - @share_received.watch + @wb.share_received.watch def _(work, dead): t = time.time() hd.datastreams['local_share_hash_rate'].add_datum(t, work) if dead: hd.datastreams['local_dead_share_hash_rate'].add_datum(t, work) - @traffic_happened.watch + @node.p2p_node.traffic_happened.watch def _(name, bytes): hd.datastreams['traffic_rate'].add_datum(time.time(), {name: bytes}) def add_point(): - if tracker.get_height(best_share_var.value) < 10: + if node.tracker.get_height(node.best_share_var.value) < 10: return None - lookbehind = min(net.CHAIN_LENGTH, 60*60//net.SHARE_PERIOD, tracker.get_height(best_share_var.value)) + lookbehind = min(node.net.CHAIN_LENGTH, 60*60//node.net.SHARE_PERIOD, node.tracker.get_height(node.best_share_var.value)) t = time.time() - hd.datastreams['pool_rates'].add_datum(t, p2pool_data.get_stale_counts(tracker, best_share_var.value, lookbehind, rates=True)) + hd.datastreams['pool_rates'].add_datum(t, p2pool_data.get_stale_counts(node.tracker, node.best_share_var.value, lookbehind, rates=True)) - current_txouts = get_current_txouts() - hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8) + current_txouts = node.get_current_txouts() + hd.datastreams['current_payout'].add_datum(t, current_txouts.get(bitcoin_data.pubkey_hash_to_script2(wb.my_pubkey_hash), 0)*1e-8) miner_hash_rates, miner_dead_hash_rates = get_local_rates() - current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, net.PARENT), amount) for script, amount in current_txouts.iteritems()) + current_txouts_by_address = dict((bitcoin_data.script2_to_address(script, node.net.PARENT), amount) for script, amount in current_txouts.iteritems()) hd.datastreams['current_payouts'].add_datum(t, dict((user, current_txouts_by_address[user]*1e-8) for user in miner_hash_rates if user in current_txouts_by_address)) - hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming)) - hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in p2p_node.peers.itervalues() if not peer.incoming)) + hd.datastreams['incoming_peers'].add_datum(t, sum(1 for peer in node.p2p_node.peers.itervalues() if peer.incoming)) + hd.datastreams['outgoing_peers'].add_datum(t, sum(1 for peer in node.p2p_node.peers.itervalues() if not peer.incoming)) - vs = p2pool_data.get_desired_version_counts(tracker, best_share_var.value, lookbehind) + vs = p2pool_data.get_desired_version_counts(node.tracker, node.best_share_var.value, lookbehind) vs_total = sum(vs.itervalues()) hd.datastreams['desired_versions'].add_datum(t, dict((str(k), v/vs_total) for k, v in vs.iteritems())) task.LoopingCall(add_point).start(5) diff --git a/p2pool/work.py b/p2pool/work.py index b389100..bace060 100644 --- a/p2pool/work.py +++ b/p2pool/work.py @@ -9,29 +9,19 @@ from twisted.internet import defer from twisted.python import log import bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data -from bitcoin import script, worker_interface -from util import jsonrpc, variable, deferral, math, pack +from bitcoin import helper, script, worker_interface +from util import forest, jsonrpc, variable, deferral, math, pack import p2pool, p2pool.data as p2pool_data class WorkerBridge(worker_interface.WorkerBridge): - def __init__(self, my_pubkey_hash, net, donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, worker_fee, p2p_node, submit_block, set_best_share, broadcast_share): + def __init__(self, node, my_pubkey_hash, donation_percentage, merged_urls, worker_fee): worker_interface.WorkerBridge.__init__(self) self.recent_shares_ts_work = [] + self.node = node self.my_pubkey_hash = my_pubkey_hash - self.net = net self.donation_percentage = donation_percentage - self.bitcoind_work = bitcoind_work - self.best_block_header = best_block_header - self.best_share_var = best_share_var - self.tracker = tracker - self.my_share_hashes = my_share_hashes - self.my_doa_share_hashes = my_doa_share_hashes self.worker_fee = worker_fee - self.p2p_node = p2p_node - self.submit_block = submit_block - self.set_best_share = set_best_share - self.broadcast_share = broadcast_share self.pseudoshare_received = variable.Event() self.share_received = variable.Event() @@ -40,16 +30,27 @@ class WorkerBridge(worker_interface.WorkerBridge): self.removed_unstales_var = variable.Variable((0, 0, 0)) self.removed_doa_unstales_var = variable.Variable(0) - @tracker.verified.removed.watch + + self.my_share_hashes = set() + self.my_doa_share_hashes = set() + + self.tracker_view = forest.TrackerView(self.node.tracker, forest.get_attributedelta_type(dict(forest.AttributeDelta.attrs, + my_count=lambda share: 1 if share.hash in self.my_share_hashes else 0, + my_doa_count=lambda share: 1 if share.hash in self.my_doa_share_hashes else 0, + my_orphan_announce_count=lambda share: 1 if share.hash in self.my_share_hashes and share.share_data['stale_info'] == 'orphan' else 0, + my_dead_announce_count=lambda share: 1 if share.hash in self.my_share_hashes and share.share_data['stale_info'] == 'doa' else 0, + ))) + + @self.node.tracker.verified.removed.watch def _(share): - if share.hash in self.my_share_hashes and tracker.is_child_of(share.hash, self.best_share_var.value): + if share.hash in self.my_share_hashes and self.node.tracker.is_child_of(share.hash, self.node.best_share_var.value): assert share.share_data['stale_info'] in [None, 'orphan', 'doa'] # we made these shares in this instance self.removed_unstales_var.set(( self.removed_unstales_var.value[0] + 1, self.removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 'orphan' else 0), self.removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 'doa' else 0), )) - if share.hash in self.my_doa_share_hashes and self.tracker.is_child_of(share.hash, self.best_share_var.value): + if share.hash in self.my_doa_share_hashes and self.node.tracker.is_child_of(share.hash, self.node.best_share_var.value): self.removed_doa_unstales_var.set(self.removed_doa_unstales_var.value + 1) # MERGED WORK @@ -78,9 +79,9 @@ class WorkerBridge(worker_interface.WorkerBridge): self.current_work = variable.Variable(None) def compute_work(): - t = self.bitcoind_work.value - bb = self.best_block_header.value - if bb is not None and bb['previous_block'] == t['previous_block'] and net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target: + t = self.node.bitcoind_work.value + bb = self.node.best_block_header.value + if bb is not None and bb['previous_block'] == t['previous_block'] and self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(bb)) <= t['bits'].target: print 'Skipping from block %x to block %x!' % (bb['previous_block'], bitcoin_data.hash256(bitcoin_data.block_header_type.pack(bb))) t = dict( @@ -92,13 +93,13 @@ class WorkerBridge(worker_interface.WorkerBridge): time=bb['timestamp'] + 600, # better way? transactions=[], merkle_link=bitcoin_data.calculate_merkle_link([None], 0), - subsidy=net.PARENT.SUBSIDY_FUNC(self.bitcoind_work.value['height']), - last_update=self.bitcoind_work.value['last_update'], + subsidy=self.node.net.PARENT.SUBSIDY_FUNC(self.node.bitcoind_work.value['height']), + last_update=self.node.bitcoind_work.value['last_update'], ) self.current_work.set(t) - self.bitcoind_work.changed.watch(lambda _: compute_work()) - self.best_block_header.changed.watch(lambda _: compute_work()) + self.node.bitcoind_work.changed.watch(lambda _: compute_work()) + self.node.best_block_header.changed.watch(lambda _: compute_work()) compute_work() self.new_work_event = variable.Event() @@ -108,13 +109,13 @@ class WorkerBridge(worker_interface.WorkerBridge): if any(before[x] != after[x] for x in ['version', 'previous_block', 'bits']) or (not before['transactions'] and after['transactions']): self.new_work_event.happened() self.merged_work.changed.watch(lambda _: self.new_work_event.happened()) - self.best_share_var.changed.watch(lambda _: self.new_work_event.happened()) + self.node.best_share_var.changed.watch(lambda _: self.new_work_event.happened()) def get_stale_counts(self): '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)''' my_shares = len(self.my_share_hashes) my_doa_shares = len(self.my_doa_share_hashes) - delta = self.tracker.verified.get_delta_to_last(self.best_share_var.value) + delta = self.tracker_view.get_delta_to_last(self.node.best_share_var.value) my_shares_in_chain = delta.my_count + self.removed_unstales_var.value[0] my_doa_shares_in_chain = delta.my_doa_count + self.removed_doa_unstales_var.value orphans_recorded_in_chain = delta.my_orphan_announce_count + self.removed_unstales_var.value[1] @@ -148,7 +149,7 @@ class WorkerBridge(worker_interface.WorkerBridge): pubkey_hash = self.my_pubkey_hash else: try: - pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.net.PARENT) + pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, self.node.net.PARENT) except: # XXX blah pubkey_hash = self.my_pubkey_hash @@ -159,9 +160,9 @@ class WorkerBridge(worker_interface.WorkerBridge): return pubkey_hash, desired_share_target, desired_pseudoshare_target def get_work(self, pubkey_hash, desired_share_target, desired_pseudoshare_target): - if len(self.p2p_node.peers) == 0 and self.net.PERSIST: + if len(self.node.p2p_node.peers) == 0 and self.node.net.PERSIST: raise jsonrpc.Error_for_code(-12345)(u'p2pool is not connected to any peers') - if self.best_share_var.value is None and self.net.PERSIST: + if self.node.best_share_var.value is None and self.node.net.PERSIST: raise jsonrpc.Error_for_code(-12345)(u'p2pool is downloading shares') if time.time() > self.current_work.value['last_update'] + 60: raise jsonrpc.Error_for_code(-12345)(u'lost contact with bitcoind') @@ -183,25 +184,25 @@ class WorkerBridge(worker_interface.WorkerBridge): tx_map = dict(zip(tx_hashes, self.current_work.value['transactions'])) share_type = p2pool_data.NewShare - if self.best_share_var.value is not None: - previous_share = self.tracker.items[self.best_share_var.value] + if self.node.best_share_var.value is not None: + previous_share = self.node.tracker.items[self.node.best_share_var.value] if isinstance(previous_share, p2pool_data.Share): # Share -> NewShare only valid if 85% of hashes in [net.CHAIN_LENGTH*9//10, net.CHAIN_LENGTH] for new version - if self.tracker.get_height(previous_share.hash) < self.net.CHAIN_LENGTH: + if self.node.tracker.get_height(previous_share.hash) < self.node.net.CHAIN_LENGTH: share_type = p2pool_data.Share - elif time.time() < 1351383661 and self.net.NAME == 'bitcoin': + elif time.time() < 1351383661 and self.node.net.NAME == 'bitcoin': share_type = p2pool_data.Share else: - counts = p2pool_data.get_desired_version_counts(self.tracker, - self.tracker.get_nth_parent_hash(previous_share.hash, self.net.CHAIN_LENGTH*9//10), self.net.CHAIN_LENGTH//10) + counts = p2pool_data.get_desired_version_counts(self.node.tracker, + self.node.tracker.get_nth_parent_hash(previous_share.hash, self.node.net.CHAIN_LENGTH*9//10), self.node.net.CHAIN_LENGTH//10) if counts.get(p2pool_data.NewShare.VERSION, 0) < sum(counts.itervalues())*95//100: share_type = p2pool_data.Share if True: share_info, gentx, other_transaction_hashes, get_share = share_type.generate_transaction( - tracker=self.tracker, + tracker=self.node.tracker, share_data=dict( - previous_share_hash=self.best_share_var.value, + previous_share_hash=self.node.best_share_var.value, coinbase=(script.create_push_script([ self.current_work.value['height'], ] + ([mm_data] if mm_data else []) + [ @@ -222,7 +223,7 @@ class WorkerBridge(worker_interface.WorkerBridge): desired_target=desired_share_target, ref_merkle_link=dict(branch=[], index=0), desired_other_transaction_hashes=tx_hashes, - net=self.net, + net=self.node.net, known_txs=tx_map, ) @@ -241,7 +242,7 @@ class WorkerBridge(worker_interface.WorkerBridge): target = max(target, share_info['bits'].target) for aux_work, index, hashes in mm_later: target = max(target, aux_work['target']) - target = math.clip(target, self.net.PARENT.SANE_TARGET_RANGE) + target = math.clip(target, self.node.net.PARENT.SANE_TARGET_RANGE) getwork_time = time.time() lp_count = self.new_work_event.times @@ -250,7 +251,7 @@ class WorkerBridge(worker_interface.WorkerBridge): print 'New work for worker! Difficulty: %.06f Share difficulty: %.06f Total block value: %.6f %s including %i transactions' % ( bitcoin_data.target_to_difficulty(target), bitcoin_data.target_to_difficulty(share_info['bits'].target), - self.current_work.value['subsidy']*1e-8, self.net.PARENT.SYMBOL, + self.current_work.value['subsidy']*1e-8, self.node.net.PARENT.SYMBOL, len(self.current_work.value['transactions']), ) @@ -267,13 +268,13 @@ class WorkerBridge(worker_interface.WorkerBridge): def got_response(header, request): header_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)) - pow_hash = self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) + pow_hash = self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) try: if pow_hash <= header['bits'].target or p2pool.DEBUG: - self.submit_block(dict(header=header, txs=transactions), ignore_failure=False) + helper.submit_block(dict(header=header, txs=transactions), False, self.node.factory, self.node.bitcoind, self.node.bitcoind_work, self.node.net) if pow_hash <= header['bits'].target: print - print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash) + print 'GOT BLOCK FROM MINER! Passing to bitcoind! %s%064x' % (self.node.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, header_hash) print except: log.err(None, 'Error while processing potential block:') @@ -326,14 +327,14 @@ class WorkerBridge(worker_interface.WorkerBridge): if not on_time: self.my_doa_share_hashes.add(share.hash) - self.tracker.add(share) + self.node.tracker.add(share) if not p2pool.DEBUG: - self.tracker.verified.add(share) - self.set_best_share() + self.node.tracker.verified.add(share) + self.node.set_best_share() try: if pow_hash <= header['bits'].target or p2pool.DEBUG: - self.broadcast_share(share.hash) + self.node.p2p_node.broadcast_share(share.hash) except: log.err(None, 'Error forwarding block solution:')