X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fabe%2F__init__.py;h=78271d026c3e6fbd5a1703bffce0dcc87b03dfd0;hb=827d26f24ef2148c4eb22bab764a7b2798233334;hp=c60688bf261ab06206e4ae699448633ef2b6ca0b;hpb=c293b5a5d4e71f555981ba934ec53dedcce36fea;p=electrum-server.git diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py index c60688b..78271d0 100644 --- a/backends/abe/__init__.py +++ b/backends/abe/__init__.py @@ -71,7 +71,8 @@ class AbeStore(Datastore_class): self.address_queue = Queue() - self.lock = threading.Lock() + self.lock = threading.Lock() # for the database + self.cache_lock = threading.Lock() # for the cache self.last_tx_id = 0 self.known_mempool_hashes = [] @@ -109,9 +110,11 @@ class AbeStore(Datastore_class): continue address = hash_to_address(chr(self.addrtype), _hash) - if self.tx_cache.has_key(address): - print "cache: invalidating", address - self.tx_cache.pop(address) + with self.cache_lock: + if self.tx_cache.has_key(address): + print "cache: invalidating", address + self.tx_cache.pop(address) + self.address_queue.put(address) outrows = self.get_tx_outputs(txid, False) @@ -122,9 +125,11 @@ class AbeStore(Datastore_class): continue address = hash_to_address(chr(self.addrtype), _hash) - if self.tx_cache.has_key(address): - print "cache: invalidating", address - self.tx_cache.pop(address) + with self.cache_lock: + if self.tx_cache.has_key(address): + print "cache: invalidating", address + self.tx_cache.pop(address) + self.address_queue.put(address) def safe_sql(self,sql, params=(), lock=True): @@ -269,13 +274,16 @@ class AbeStore(Datastore_class): raise BaseException('limit reached') return out - def get_history(self, addr): - with self.lock: + + def get_history(self, addr, cache_only=False): + with self.cache_lock: cached_version = self.tx_cache.get( addr ) if cached_version is not None: return cached_version + if cache_only: return -1 + version, binaddr = decode_check_address(addr) if binaddr is None: return None @@ -389,13 +397,15 @@ class AbeStore(Datastore_class): # cache result # do not cache mempool results because statuses are ambiguous if not address_has_mempool: - with self.lock: + with self.cache_lock: self.tx_cache[addr] = txpoints return txpoints - def get_history2(self, addr): - h = self.get_history(addr) + def get_history2(self, addr, cache_only=False): + h = self.get_history(addr, cache_only) + if cache_only and h==-1: return -1 + out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h) out2 = [] for item in out: @@ -403,9 +413,11 @@ class AbeStore(Datastore_class): return out2 - def get_status(self,addr): + def get_status(self, addr, cache_only=False): # get address status, i.e. the last block for that address. - tx_points = self.get_history(addr) + tx_points = self.get_history(addr, cache_only) + if cache_only and tx_points == -1: return -1 + if not tx_points: status = None else: @@ -416,12 +428,12 @@ class AbeStore(Datastore_class): status = status + ':%d'% len(tx_points) return status - def get_status2(self,addr): + def get_status2(self, addr, cache_only=False): # for 0.5 clients tx_points = self.get_history2(addr) - if not tx_points: - return None + if cache_only and tx_points == -1: return -1 + if not tx_points: return None status = '' for tx in tx_points: status += tx.get('tx_hash') + ':%d:' % tx.get('height') @@ -454,7 +466,7 @@ class AbeStore(Datastore_class): def get_chunk(self, index): - with self.lock: + with self.cache_lock: msg = self.chunk_cache.get(index) if msg: return msg @@ -486,7 +498,7 @@ class AbeStore(Datastore_class): #print "hash", encode(Hash(msg.decode('hex'))) #if h.get('block_height')==1:break - with self.lock: + with self.cache_lock: self.chunk_cache[index] = msg print "get_chunk", index, len(msg) return msg @@ -655,7 +667,14 @@ class BlockchainProcessor(Processor): threading.Timer(10, self.run_store_iteration).start() - def process(self, request): + + def add_request(self, request): + # see if we can get if from cache. if not, add to queue + if self.process( request, cache_only=True) == -1: + self.queue.put(request) + + + def process(self, request, cache_only = False): #print "abe process", request message_id = request['id'] @@ -673,7 +692,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe': try: address = params[0] - result = self.store.get_status(address) + result = self.store.get_status(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -682,7 +701,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe2': try: address = params[0] - result = self.store.get_status2(address) + result = self.store.get_status2(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -691,7 +710,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history': try: address = params[0] - result = self.store.get_history( address ) + result = self.store.get_history( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error @@ -699,40 +718,49 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history2': try: address = params[0] - result = self.store.get_history2( address ) + result = self.store.get_history2( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error elif method == 'blockchain.block.get_header': - try: - height = params[0] - result = self.store.get_block_header( height ) - except BaseException, e: - error = str(e) + ': %d'% height - print "error:", error - + if cache_only: + result = -1 + else: + try: + height = params[0] + result = self.store.get_block_header( height ) + except BaseException, e: + error = str(e) + ': %d'% height + print "error:", error + elif method == 'blockchain.block.get_chunk': - try: - index = params[0] - result = self.store.get_chunk( index ) - except BaseException, e: - error = str(e) + ': %d'% index - print "error:", error - + if cache_only: + result = -1 + else: + try: + index = params[0] + result = self.store.get_chunk( index ) + except BaseException, e: + error = str(e) + ': %d'% index + print "error:", error + elif method == 'blockchain.transaction.broadcast': txo = self.store.send_tx(params[0]) print "sent tx:", txo result = txo elif method == 'blockchain.transaction.get_merkle': - try: - tx_hash = params[0] - result = self.store.get_tx_merkle(tx_hash ) - except BaseException, e: - error = str(e) + ': ' + tx_hash - print "error:", error - + if cache_only: + result = -1 + else: + try: + tx_hash = params[0] + result = self.store.get_tx_merkle(tx_hash ) + except BaseException, e: + error = str(e) + ': ' + tx_hash + print "error:", error + elif method == 'blockchain.transaction.get': try: tx_hash = params[0] @@ -745,6 +773,7 @@ class BlockchainProcessor(Processor): else: error = "unknown method:%s"%method + if cache_only and result == -1: return -1 if error: response = { 'id':message_id, 'error':error }