do not queue requests that can be answered using the cache
authorThomasV <thomasv@gitorious>
Thu, 8 Nov 2012 13:47:01 +0000 (17:47 +0400)
committerThomasV <thomasv@gitorious>
Thu, 8 Nov 2012 13:47:01 +0000 (17:47 +0400)
backends/abe/__init__.py
processor.py

index dc322fd..78271d0 100644 (file)
@@ -274,13 +274,16 @@ class AbeStore(Datastore_class):
             raise BaseException('limit reached')
         return out
 
-    def get_history(self, addr):
 
+
+    def get_history(self, addr, cache_only=False):
         with self.cache_lock:
             cached_version = self.tx_cache.get( addr )
             if cached_version is not None:
                 return cached_version
 
+        if cache_only: return -1
+
         version, binaddr = decode_check_address(addr)
         if binaddr is None:
             return None
@@ -399,8 +402,10 @@ class AbeStore(Datastore_class):
         
         return txpoints
 
-    def get_history2(self, addr):
-        h = self.get_history(addr)
+    def get_history2(self, addr, cache_only=False):
+        h = self.get_history(addr, cache_only)
+        if cache_only and h==-1: return -1
+
         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
         out2 = []
         for item in out:
@@ -408,9 +413,11 @@ class AbeStore(Datastore_class):
         return out2
 
 
-    def get_status(self,addr):
+    def get_status(self, addr, cache_only=False):
         # get address status, i.e. the last block for that address.
-        tx_points = self.get_history(addr)
+        tx_points = self.get_history(addr, cache_only)
+        if cache_only and tx_points == -1: return -1
+
         if not tx_points:
             status = None
         else:
@@ -421,12 +428,12 @@ class AbeStore(Datastore_class):
                 status = status + ':%d'% len(tx_points)
         return status
 
-    def get_status2(self,addr):
+    def get_status2(self, addr, cache_only=False):
         # for 0.5 clients
         tx_points = self.get_history2(addr)
-        if not tx_points:
-            return None
+        if cache_only and tx_points == -1: return -1
 
+        if not tx_points: return None
         status = ''
         for tx in tx_points:
             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
@@ -660,7 +667,14 @@ class BlockchainProcessor(Processor):
 
         threading.Timer(10, self.run_store_iteration).start()
 
-    def process(self, request):
+
+    def add_request(self, request):
+        # see if we can get if from cache. if not, add to queue
+        if self.process( request, cache_only=True) == -1:
+            self.queue.put(request)
+
+
+    def process(self, request, cache_only = False):
         #print "abe process", request
 
         message_id = request['id']
@@ -678,7 +692,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.subscribe':
             try:
                 address = params[0]
-                result = self.store.get_status(address)
+                result = self.store.get_status(address, cache_only)
                 self.watch_address(address)
             except BaseException, e:
                 error = str(e) + ': ' + address
@@ -687,7 +701,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.subscribe2':
             try:
                 address = params[0]
-                result = self.store.get_status2(address)
+                result = self.store.get_status2(address, cache_only)
                 self.watch_address(address)
             except BaseException, e:
                 error = str(e) + ': ' + address
@@ -696,7 +710,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.get_history':
             try:
                 address = params[0]
-                result = self.store.get_history( address ) 
+                result = self.store.get_history( address, cache_only )
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print "error:", error
@@ -704,40 +718,49 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.get_history2':
             try:
                 address = params[0]
-                result = self.store.get_history2( address ) 
+                result = self.store.get_history2( address, cache_only )
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print "error:", error
 
         elif method == 'blockchain.block.get_header':
-            try:
-                height = params[0]
-                result = self.store.get_block_header( height ) 
-            except BaseException, e:
-                error = str(e) + ': %d'% height
-                print "error:", error
-
+            if cache_only: 
+                result = -1
+            else:
+                try:
+                    height = params[0]
+                    result = self.store.get_block_header( height ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% height
+                    print "error:", error
+                    
         elif method == 'blockchain.block.get_chunk':
-            try:
-                index = params[0]
-                result = self.store.get_chunk( index ) 
-            except BaseException, e:
-                error = str(e) + ': %d'% index
-                print "error:", error
-
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    index = params[0]
+                    result = self.store.get_chunk( index ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% index
+                    print "error:", error
+                    
         elif method == 'blockchain.transaction.broadcast':
             txo = self.store.send_tx(params[0])
             print "sent tx:", txo
             result = txo 
 
         elif method == 'blockchain.transaction.get_merkle':
-            try:
-                tx_hash = params[0]
-                result = self.store.get_tx_merkle(tx_hash ) 
-            except BaseException, e:
-                error = str(e) + ': ' + tx_hash
-                print "error:", error
-
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    tx_hash = params[0]
+                    result = self.store.get_tx_merkle(tx_hash ) 
+                except BaseException, e:
+                    error = str(e) + ': ' + tx_hash
+                    print "error:", error
+                    
         elif method == 'blockchain.transaction.get':
             try:
                 tx_hash = params[0]
@@ -750,6 +773,7 @@ class BlockchainProcessor(Processor):
         else:
             error = "unknown method:%s"%method
 
+        if cache_only and result == -1: return -1
 
         if error:
             response = { 'id':message_id, 'error':error }
index 749f383..dfe6128 100644 (file)
@@ -40,6 +40,9 @@ class Processor(threading.Thread):
     def process(self, request):
         pass
 
+    def add_request(self, request):
+        self.queue.put(request)
+
     def push_response(self, response):
         #print "response", response
         self.dispatcher.request_dispatcher.push_response(response)
@@ -162,7 +165,7 @@ class RequestDispatcher(threading.Thread):
             print "error: no processor for", prefix
             return
 
-        p.queue.put(request)
+        p.add_request(request)
 
         if method in ['server.version']:
             session.version = params[0]