From 827d26f24ef2148c4eb22bab764a7b2798233334 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 8 Nov 2012 17:47:01 +0400 Subject: [PATCH] do not queue requests that can be answered using the cache --- backends/abe/__init__.py | 92 +++++++++++++++++++++++++++++----------------- processor.py | 5 ++- 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py index dc322fd..78271d0 100644 --- a/backends/abe/__init__.py +++ b/backends/abe/__init__.py @@ -274,13 +274,16 @@ class AbeStore(Datastore_class): raise BaseException('limit reached') return out - def get_history(self, addr): + + def get_history(self, addr, cache_only=False): with self.cache_lock: cached_version = self.tx_cache.get( addr ) if cached_version is not None: return cached_version + if cache_only: return -1 + version, binaddr = decode_check_address(addr) if binaddr is None: return None @@ -399,8 +402,10 @@ class AbeStore(Datastore_class): return txpoints - def get_history2(self, addr): - h = self.get_history(addr) + def get_history2(self, addr, cache_only=False): + h = self.get_history(addr, cache_only) + if cache_only and h==-1: return -1 + out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h) out2 = [] for item in out: @@ -408,9 +413,11 @@ class AbeStore(Datastore_class): return out2 - def get_status(self,addr): + def get_status(self, addr, cache_only=False): # get address status, i.e. the last block for that address. - tx_points = self.get_history(addr) + tx_points = self.get_history(addr, cache_only) + if cache_only and tx_points == -1: return -1 + if not tx_points: status = None else: @@ -421,12 +428,12 @@ class AbeStore(Datastore_class): status = status + ':%d'% len(tx_points) return status - def get_status2(self,addr): + def get_status2(self, addr, cache_only=False): # for 0.5 clients tx_points = self.get_history2(addr) - if not tx_points: - return None + if cache_only and tx_points == -1: return -1 + if not tx_points: return None status = '' for tx in tx_points: status += tx.get('tx_hash') + ':%d:' % tx.get('height') @@ -660,7 +667,14 @@ class BlockchainProcessor(Processor): threading.Timer(10, self.run_store_iteration).start() - def process(self, request): + + def add_request(self, request): + # see if we can get if from cache. if not, add to queue + if self.process( request, cache_only=True) == -1: + self.queue.put(request) + + + def process(self, request, cache_only = False): #print "abe process", request message_id = request['id'] @@ -678,7 +692,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe': try: address = params[0] - result = self.store.get_status(address) + result = self.store.get_status(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -687,7 +701,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe2': try: address = params[0] - result = self.store.get_status2(address) + result = self.store.get_status2(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -696,7 +710,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history': try: address = params[0] - result = self.store.get_history( address ) + result = self.store.get_history( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error @@ -704,40 +718,49 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history2': try: address = params[0] - result = self.store.get_history2( address ) + result = self.store.get_history2( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error elif method == 'blockchain.block.get_header': - try: - height = params[0] - result = self.store.get_block_header( height ) - except BaseException, e: - error = str(e) + ': %d'% height - print "error:", error - + if cache_only: + result = -1 + else: + try: + height = params[0] + result = self.store.get_block_header( height ) + except BaseException, e: + error = str(e) + ': %d'% height + print "error:", error + elif method == 'blockchain.block.get_chunk': - try: - index = params[0] - result = self.store.get_chunk( index ) - except BaseException, e: - error = str(e) + ': %d'% index - print "error:", error - + if cache_only: + result = -1 + else: + try: + index = params[0] + result = self.store.get_chunk( index ) + except BaseException, e: + error = str(e) + ': %d'% index + print "error:", error + elif method == 'blockchain.transaction.broadcast': txo = self.store.send_tx(params[0]) print "sent tx:", txo result = txo elif method == 'blockchain.transaction.get_merkle': - try: - tx_hash = params[0] - result = self.store.get_tx_merkle(tx_hash ) - except BaseException, e: - error = str(e) + ': ' + tx_hash - print "error:", error - + if cache_only: + result = -1 + else: + try: + tx_hash = params[0] + result = self.store.get_tx_merkle(tx_hash ) + except BaseException, e: + error = str(e) + ': ' + tx_hash + print "error:", error + elif method == 'blockchain.transaction.get': try: tx_hash = params[0] @@ -750,6 +773,7 @@ class BlockchainProcessor(Processor): else: error = "unknown method:%s"%method + if cache_only and result == -1: return -1 if error: response = { 'id':message_id, 'error':error } diff --git a/processor.py b/processor.py index 749f383..dfe6128 100644 --- a/processor.py +++ b/processor.py @@ -40,6 +40,9 @@ class Processor(threading.Thread): def process(self, request): pass + def add_request(self, request): + self.queue.put(request) + def push_response(self, response): #print "response", response self.dispatcher.request_dispatcher.push_response(response) @@ -162,7 +165,7 @@ class RequestDispatcher(threading.Thread): print "error: no processor for", prefix return - p.queue.put(request) + p.add_request(request) if method in ['server.version']: session.version = params[0] -- 1.7.1