do not queue requests that can be answered using the cache
[electrum-server.git] / backends / abe / __init__.py
index c60688b..78271d0 100644 (file)
@@ -71,7 +71,8 @@ class AbeStore(Datastore_class):
 
         self.address_queue = Queue()
 
-        self.lock = threading.Lock()
+        self.lock = threading.Lock()        # for the database
+        self.cache_lock = threading.Lock()  # for the cache
         self.last_tx_id = 0
         self.known_mempool_hashes = []
 
@@ -109,9 +110,11 @@ class AbeStore(Datastore_class):
                 continue
 
             address = hash_to_address(chr(self.addrtype), _hash)
-            if self.tx_cache.has_key(address):
-                print "cache: invalidating", address
-                self.tx_cache.pop(address)
+            with self.cache_lock:
+                if self.tx_cache.has_key(address):
+                    print "cache: invalidating", address
+                    self.tx_cache.pop(address)
+
             self.address_queue.put(address)
 
         outrows = self.get_tx_outputs(txid, False)
@@ -122,9 +125,11 @@ class AbeStore(Datastore_class):
                 continue
 
             address = hash_to_address(chr(self.addrtype), _hash)
-            if self.tx_cache.has_key(address):
-                print "cache: invalidating", address
-                self.tx_cache.pop(address)
+            with self.cache_lock:
+                if self.tx_cache.has_key(address):
+                    print "cache: invalidating", address
+                    self.tx_cache.pop(address)
+
             self.address_queue.put(address)
 
     def safe_sql(self,sql, params=(), lock=True):
@@ -269,13 +274,16 @@ class AbeStore(Datastore_class):
             raise BaseException('limit reached')
         return out
 
-    def get_history(self, addr):
 
-        with self.lock:
+
+    def get_history(self, addr, cache_only=False):
+        with self.cache_lock:
             cached_version = self.tx_cache.get( addr )
             if cached_version is not None:
                 return cached_version
 
+        if cache_only: return -1
+
         version, binaddr = decode_check_address(addr)
         if binaddr is None:
             return None
@@ -389,13 +397,15 @@ class AbeStore(Datastore_class):
         # cache result
         # do not cache mempool results because statuses are ambiguous
         if not address_has_mempool:
-            with self.lock:
+            with self.cache_lock:
                 self.tx_cache[addr] = txpoints
         
         return txpoints
 
-    def get_history2(self, addr):
-        h = self.get_history(addr)
+    def get_history2(self, addr, cache_only=False):
+        h = self.get_history(addr, cache_only)
+        if cache_only and h==-1: return -1
+
         out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h)
         out2 = []
         for item in out:
@@ -403,9 +413,11 @@ class AbeStore(Datastore_class):
         return out2
 
 
-    def get_status(self,addr):
+    def get_status(self, addr, cache_only=False):
         # get address status, i.e. the last block for that address.
-        tx_points = self.get_history(addr)
+        tx_points = self.get_history(addr, cache_only)
+        if cache_only and tx_points == -1: return -1
+
         if not tx_points:
             status = None
         else:
@@ -416,12 +428,12 @@ class AbeStore(Datastore_class):
                 status = status + ':%d'% len(tx_points)
         return status
 
-    def get_status2(self,addr):
+    def get_status2(self, addr, cache_only=False):
         # for 0.5 clients
         tx_points = self.get_history2(addr)
-        if not tx_points:
-            return None
+        if cache_only and tx_points == -1: return -1
 
+        if not tx_points: return None
         status = ''
         for tx in tx_points:
             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
@@ -454,7 +466,7 @@ class AbeStore(Datastore_class):
         
 
     def get_chunk(self, index):
-        with self.lock:
+        with self.cache_lock:
             msg = self.chunk_cache.get(index)
             if msg: return msg
 
@@ -486,7 +498,7 @@ class AbeStore(Datastore_class):
             #print "hash", encode(Hash(msg.decode('hex')))
             #if h.get('block_height')==1:break
 
-        with self.lock:
+        with self.cache_lock:
             self.chunk_cache[index] = msg
         print "get_chunk", index, len(msg)
         return msg
@@ -655,7 +667,14 @@ class BlockchainProcessor(Processor):
 
         threading.Timer(10, self.run_store_iteration).start()
 
-    def process(self, request):
+
+    def add_request(self, request):
+        # see if we can get if from cache. if not, add to queue
+        if self.process( request, cache_only=True) == -1:
+            self.queue.put(request)
+
+
+    def process(self, request, cache_only = False):
         #print "abe process", request
 
         message_id = request['id']
@@ -673,7 +692,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.subscribe':
             try:
                 address = params[0]
-                result = self.store.get_status(address)
+                result = self.store.get_status(address, cache_only)
                 self.watch_address(address)
             except BaseException, e:
                 error = str(e) + ': ' + address
@@ -682,7 +701,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.subscribe2':
             try:
                 address = params[0]
-                result = self.store.get_status2(address)
+                result = self.store.get_status2(address, cache_only)
                 self.watch_address(address)
             except BaseException, e:
                 error = str(e) + ': ' + address
@@ -691,7 +710,7 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.get_history':
             try:
                 address = params[0]
-                result = self.store.get_history( address ) 
+                result = self.store.get_history( address, cache_only )
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print "error:", error
@@ -699,40 +718,49 @@ class BlockchainProcessor(Processor):
         elif method == 'blockchain.address.get_history2':
             try:
                 address = params[0]
-                result = self.store.get_history2( address ) 
+                result = self.store.get_history2( address, cache_only )
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print "error:", error
 
         elif method == 'blockchain.block.get_header':
-            try:
-                height = params[0]
-                result = self.store.get_block_header( height ) 
-            except BaseException, e:
-                error = str(e) + ': %d'% height
-                print "error:", error
-
+            if cache_only: 
+                result = -1
+            else:
+                try:
+                    height = params[0]
+                    result = self.store.get_block_header( height ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% height
+                    print "error:", error
+                    
         elif method == 'blockchain.block.get_chunk':
-            try:
-                index = params[0]
-                result = self.store.get_chunk( index ) 
-            except BaseException, e:
-                error = str(e) + ': %d'% index
-                print "error:", error
-
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    index = params[0]
+                    result = self.store.get_chunk( index ) 
+                except BaseException, e:
+                    error = str(e) + ': %d'% index
+                    print "error:", error
+                    
         elif method == 'blockchain.transaction.broadcast':
             txo = self.store.send_tx(params[0])
             print "sent tx:", txo
             result = txo 
 
         elif method == 'blockchain.transaction.get_merkle':
-            try:
-                tx_hash = params[0]
-                result = self.store.get_tx_merkle(tx_hash ) 
-            except BaseException, e:
-                error = str(e) + ': ' + tx_hash
-                print "error:", error
-
+            if cache_only:
+                result = -1
+            else:
+                try:
+                    tx_hash = params[0]
+                    result = self.store.get_tx_merkle(tx_hash ) 
+                except BaseException, e:
+                    error = str(e) + ': ' + tx_hash
+                    print "error:", error
+                    
         elif method == 'blockchain.transaction.get':
             try:
                 tx_hash = params[0]
@@ -745,6 +773,7 @@ class BlockchainProcessor(Processor):
         else:
             error = "unknown method:%s"%method
 
+        if cache_only and result == -1: return -1
 
         if error:
             response = { 'id':message_id, 'error':error }