X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fabe%2F__init__.py;h=4e2341a3734203b7e166d8a871924f28a6e3dd9c;hb=a157f1287914f991a91a3c5c9814a6170bdb6a34;hp=dc322fdd7c3ae7a5d3f2509f6c59e5f3f27660ad;hpb=c2e78253513bbe058f318dbb82f0d5c80b771f2e;p=electrum-server.git diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py index dc322fd..4e2341a 100644 --- a/backends/abe/__init__.py +++ b/backends/abe/__init__.py @@ -274,13 +274,16 @@ class AbeStore(Datastore_class): raise BaseException('limit reached') return out - def get_history(self, addr): + + def get_history(self, addr, cache_only=False): with self.cache_lock: cached_version = self.tx_cache.get( addr ) if cached_version is not None: return cached_version + if cache_only: return -1 + version, binaddr = decode_check_address(addr) if binaddr is None: return None @@ -399,8 +402,10 @@ class AbeStore(Datastore_class): return txpoints - def get_history2(self, addr): - h = self.get_history(addr) + def get_history2(self, addr, cache_only=False): + h = self.get_history(addr, cache_only) + if cache_only and h==-1: return -1 + out = map(lambda x: {'tx_hash':x['tx_hash'], 'height':x['height']}, h) out2 = [] for item in out: @@ -408,9 +413,11 @@ class AbeStore(Datastore_class): return out2 - def get_status(self,addr): + def get_status(self, addr, cache_only=False): # get address status, i.e. the last block for that address. - tx_points = self.get_history(addr) + tx_points = self.get_history(addr, cache_only) + if cache_only and tx_points == -1: return -1 + if not tx_points: status = None else: @@ -421,12 +428,12 @@ class AbeStore(Datastore_class): status = status + ':%d'% len(tx_points) return status - def get_status2(self,addr): + def get_status2(self, addr, cache_only=False): # for 0.5 clients - tx_points = self.get_history2(addr) - if not tx_points: - return None + tx_points = self.get_history2(addr, cache_only) + if cache_only and tx_points == -1: return -1 + if not tx_points: return None status = '' for tx in tx_points: status += tx.get('tx_hash') + ':%d:' % tx.get('height') @@ -475,7 +482,7 @@ class AbeStore(Datastore_class): prev_block_hash, block_height FROM chain_summary - WHERE block_height >= %d AND block_height< %d AND in_longest = 1"""%(index*2016, (index+1)*2016) + WHERE block_height >= %d AND block_height< %d AND in_longest = 1 ORDER BY block_height"""%(index*2016, (index+1)*2016) out = self.safe_sql(sql) msg = '' @@ -572,16 +579,20 @@ class AbeStore(Datastore_class): ds = BCDataStream.BCDataStream() postdata = dumps({"method": 'getrawmempool', 'params': [], 'id':'jsonrpc'}) respdata = urllib.urlopen(store.bitcoind_url, postdata).read() + r = loads(respdata) if r['error'] != None: print r['error'] return mempool_hashes = r.get('result') + num_new_tx = 0 + for tx_hash in mempool_hashes: if tx_hash in store.known_mempool_hashes: continue store.known_mempool_hashes.append(tx_hash) + num_new_tx += 1 postdata = dumps({"method": 'getrawtransaction', 'params': [tx_hash], 'id':'jsonrpc'}) respdata = urllib.urlopen(store.bitcoind_url, postdata).read() @@ -603,6 +614,7 @@ class AbeStore(Datastore_class): store.commit() store.known_mempool_hashes = mempool_hashes + return num_new_tx def send_tx(self,tx): @@ -619,14 +631,22 @@ class AbeStore(Datastore_class): def main_iteration(self): with self.lock: + t1 = time.time() self.catch_up() - self.memorypool_update() + t2 = time.time() + time_catch_up = t2 - t1 + n = self.memorypool_update() + time_mempool = time.time() - t2 height = self.get_block_number( self.chain_id ) - try: self.chunk_cache.pop(height/2016) - except: pass + + with self.cache_lock: + try: + self.chunk_cache.pop(height/2016) + except: + pass block_header = self.get_block_header( height ) - return block_header + return block_header, time_catch_up, time_mempool, n @@ -654,13 +674,20 @@ class BlockchainProcessor(Processor): self.watched_addresses = [] # catch_up first - self.block_header = self.store.main_iteration() + self.block_header, time_catch_up, time_mempool, n = self.store.main_iteration() self.block_number = self.block_header.get('block_height') print "blockchain: %d blocks"%self.block_number threading.Timer(10, self.run_store_iteration).start() - def process(self, request): + + def add_request(self, request): + # see if we can get if from cache. if not, add to queue + if self.process( request, cache_only = True) == -1: + self.queue.put(request) + + + def process(self, request, cache_only = False): #print "abe process", request message_id = request['id'] @@ -678,7 +705,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe': try: address = params[0] - result = self.store.get_status(address) + result = self.store.get_status(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -687,7 +714,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.subscribe2': try: address = params[0] - result = self.store.get_status2(address) + result = self.store.get_status2(address, cache_only) self.watch_address(address) except BaseException, e: error = str(e) + ': ' + address @@ -696,7 +723,7 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history': try: address = params[0] - result = self.store.get_history( address ) + result = self.store.get_history( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error @@ -704,40 +731,49 @@ class BlockchainProcessor(Processor): elif method == 'blockchain.address.get_history2': try: address = params[0] - result = self.store.get_history2( address ) + result = self.store.get_history2( address, cache_only ) except BaseException, e: error = str(e) + ': ' + address print "error:", error elif method == 'blockchain.block.get_header': - try: - height = params[0] - result = self.store.get_block_header( height ) - except BaseException, e: - error = str(e) + ': %d'% height - print "error:", error - + if cache_only: + result = -1 + else: + try: + height = params[0] + result = self.store.get_block_header( height ) + except BaseException, e: + error = str(e) + ': %d'% height + print "error:", error + elif method == 'blockchain.block.get_chunk': - try: - index = params[0] - result = self.store.get_chunk( index ) - except BaseException, e: - error = str(e) + ': %d'% index - print "error:", error - + if cache_only: + result = -1 + else: + try: + index = params[0] + result = self.store.get_chunk( index ) + except BaseException, e: + error = str(e) + ': %d'% index + print "error:", error + elif method == 'blockchain.transaction.broadcast': txo = self.store.send_tx(params[0]) print "sent tx:", txo result = txo elif method == 'blockchain.transaction.get_merkle': - try: - tx_hash = params[0] - result = self.store.get_tx_merkle(tx_hash ) - except BaseException, e: - error = str(e) + ': ' + tx_hash - print "error:", error - + if cache_only: + result = -1 + else: + try: + tx_hash = params[0] + result = self.store.get_tx_merkle(tx_hash ) + except BaseException, e: + error = str(e) + ': ' + tx_hash + print "error:", error + elif method == 'blockchain.transaction.get': try: tx_hash = params[0] @@ -750,6 +786,7 @@ class BlockchainProcessor(Processor): else: error = "unknown method:%s"%method + if cache_only and result == -1: return -1 if error: response = { 'id':message_id, 'error':error } @@ -767,9 +804,7 @@ class BlockchainProcessor(Processor): def run_store_iteration(self): try: - t1 = time.time() - block_header = self.store.main_iteration() - t2 = time.time() - t1 + block_header, time_catch_up, time_mempool, n = self.store.main_iteration() except: traceback.print_exc(file=sys.stdout) print "terminating" @@ -779,9 +814,11 @@ class BlockchainProcessor(Processor): print "exit timer" return + #print "block number: %d (%.3fs) mempool:%d (%.3fs)"%(self.block_number, time_catch_up, n, time_mempool) + if self.block_number != block_header.get('block_height'): self.block_number = block_header.get('block_height') - print "block number: %d (%.3f seconds)"%(self.block_number, t2) + print "block number: %d (%.3fs)"%(self.block_number, time_catch_up) self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] }) if self.block_header != block_header: