From: genjix Date: Thu, 19 Apr 2012 13:29:12 +0000 (+0100) Subject: mempool subscribe works with outputs of transactions. X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=27bd2d1754b5949678d1ee7ff5efad4b3c8bef6f mempool subscribe works with outputs of transactions. --- diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index aa13238..247eae7 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -6,9 +6,125 @@ import time import history -class Backend: +class HistoryCache: def __init__(self): + self.lock = threading.Lock() + self.cache = {} + + def store(self, address, result): + with self.lock: + self.cache[address] = result + + def fetch(self, address): + try: + with self.lock: + return self.cache[address] + except KeyError: + return None + + def clear(self, addresses): + with self.lock: + for address in addresses: + if self.cache.has_key(address): + del self.cache[address] + +class MonitorAddress: + + def __init__(self, processor, cache): + self.processor = processor + self.cache = cache + self.lock = threading.Lock() + # key is hash:index, value is address + self.monitor_output = {} + # key is address + self.monitor_address = set() + # affected + self.affected = {} + + def monitor(self, address, result): + for info in result: + if not info.has_key("raw_output_script"): + continue + assert info["is_input"] == 0 + tx_hash = info["tx_hash"] + output_index = info["index"] + outpoint = "%s:%s" % (tx_hash, output_index) + with self.lock: + self.monitor_output[outpoint] = address + with self.lock: + self.monitor_address.add(address) + + def tx_stored(self, tx_desc): + tx_hash, prevouts, addrs = tx_desc + affected_addrs = set() + for prevout_hash, prevout_index in prevouts: + prevout = "%s:%s" % (prevout_hash, prevout_index) + with self.lock: + if self.monitor_output.has_key(prevout): + affected_addrs.add(self.monitor_output[prevout]) + for idx, address in addrs: + with self.lock: + if address in self.monitor_address: + affected_addrs.add(address) + with self.lock: + self.affected[tx_hash] = affected_addrs + self.cache.clear(affected_addrs) + self.notify(affected_addrs) + + def tx_confirmed(self, tx_desc): + tx_hash, prevouts, addrs = tx_desc + with self.lock: + affected_addrs = self.affected[tx_hash] + del self.affected[tx_hash] + self.cache.clear(affected_addrs) + self.notify(affected_addrs) + # add new outputs to monitor + for idx, address in addrs: + outpoint = "%s:%s" % (tx_hash, idx) + with self.lock: + if address in affected_addrs: + self.monitor_output[outpoint] = address + # delete spent outpoints + for prevout_hash, prevout_index in prevouts: + prevout = "%s:%s" % (prevout_hash, prevout_index) + with self.lock: + if self.monitor_output.has_key(prevout): + del self.monitor_output[prevout] + + def notify(self, affected_addrs): + templ_response = {"id": None, + "method": "blockchain.address.subscribe", + "params": []} + chain = self.backend.blockchain + txpool = self.backend.transaction_pool + membuf = self.backend.pool_buffer + for address in affected_addrs: + response = templ_response.copy() + response["params"].append(address) + history.payment_history(chain, txpool, membuf, address, + bind(self.send_notify, _1, response)) + + def mempool_n(self, result): + assert result is not None + if len(result) == 0: + return None + # mempool:n status + # Order by time, grab last item (latest) + last_info = sorted(result, key=lambda k: k['timestamp'])[-1] + if last_info["block_hash"] == "mempool": + last_id = "mempool:%s" % len(result) + else: + last_id = last_info["block_hash"] + return last_id + + def send_notify(self, result, response): + response["params"].append(self.mempool_n(result)) + self.processor.push_response(response) + +class Backend: + + def __init__(self, monitor): # Create 3 thread-pools each with 1 thread self.network_service = bitcoin.async_service(1) self.disk_service = bitcoin.async_service(1) @@ -34,8 +150,9 @@ class Backend: self.poller, self.transaction_pool) self.session.start(self.handle_start) - self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool, - self.blockchain) + self.pool_buffer = \ + history.MemoryPoolBuffer(self.transaction_pool, + self.blockchain, monitor) def handle_start(self, ec): if ec: @@ -146,35 +263,56 @@ class AddressGetHistory: class AddressSubscribe: - def __init__(self, backend, processor): + def __init__(self, backend, processor, cache, monitor): self.backend = backend self.processor = processor + self.cache = cache + self.monitor = monitor - def subscribe(self, session, request): + self.backend.pool_buffer.cheat = self + + def subscribe(self, request): address = str(request["params"][0]) chain = self.backend.blockchain txpool = self.backend.transaction_pool membuf = self.backend.pool_buffer history.payment_history(chain, txpool, membuf, address, - bind(self.respond, _1, request)) + bind(self.construct, _1, request)) def construct(self, result, request): if result is None: response = {"id": request["id"], "result": None, "error": {"message": "Error", "code": -4}} return - else: - response = {"id": request["id"], "result": result, "error": None} + last_id = self.monitor.mempool_n(result) + response = {"id": request["id"], "result": last_id, "error": None} + self.processor.push_response(response) + address = request["params"][0] + self.monitor.monitor(address, result) + # Cache result for get_history + self.cache.store(address, result) + + def fetch_cached(self, request): + address = request["params"][0] + cached = self.cache.fetch(address) + if cached is None: + return False + response = {"id": request["id"], "result": cached, "error": None} self.processor.push_response(response) + return True class BlockchainProcessor(Processor): def __init__(self, config): Processor.__init__(self) - self.backend = Backend() + cache = HistoryCache() + monitor = MonitorAddress(self, cache) + self.backend = Backend(monitor) + monitor.backend = self.backend self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) self.address_get_history = AddressGetHistory(self.backend, self) - self.address_subscribe = AddressSubscribe(self.backend, self) + self.address_subscribe = \ + AddressSubscribe(self.backend, self, cache, monitor) def stop(self): self.backend.stop() @@ -184,9 +322,10 @@ class BlockchainProcessor(Processor): if request["method"] == "blockchain.numblocks.subscribe": self.numblocks_subscribe.subscribe(request) elif request["method"] == "blockchain.address.subscribe": - pass + self.address_subscribe.subscribe(request) elif request["method"] == "blockchain.address.get_history": - self.address_get_history.get(request) + if not self.address_subscribe.fetch_cached(request): + self.address_get_history.get(request) elif request["method"] == "blockchain.transaction.broadcast": self.broadcast_transaction(request) diff --git a/backends/libbitcoin/history.py b/backends/libbitcoin/history.py index 0967535..4c225c2 100644 --- a/backends/libbitcoin/history.py +++ b/backends/libbitcoin/history.py @@ -27,9 +27,10 @@ expiry_queue = ExpiryQueue() class MemoryPoolBuffer: - def __init__(self, txpool, chain): + def __init__(self, txpool, chain, monitor): self.txpool = txpool self.chain = chain + self.monitor = monitor # prevout: inpoint self.lookup_input = {} # payment_address: outpoint @@ -38,20 +39,22 @@ class MemoryPoolBuffer: self.timestamps = {} def recv_tx(self, tx, handle_store): - tx_hash = bitcoin.hash_transaction(tx) + tx_hash = str(bitcoin.hash_transaction(tx)) desc = (tx_hash, [], []) for input in tx.inputs: - desc[1].append(input.previous_output) + prevout = input.previous_output + desc[1].append((str(prevout.hash), prevout.index)) for idx, output in enumerate(tx.outputs): address = bitcoin.payment_address() if address.extract(output.output_script): - desc[2].append((idx, address)) + desc[2].append((idx, str(address))) self.txpool.store(tx, bind(self.confirmed, _1, desc), bind(self.mempool_stored, _1, desc, handle_store)) def mempool_stored(self, ec, desc, handle_store): tx_hash, prevouts, addrs = desc + tx_hash = bitcoin.hash_digest(tx_hash) if ec: handle_store(ec) return @@ -65,9 +68,11 @@ class MemoryPoolBuffer: self.lookup_address[str(address)] = outpoint self.timestamps[str(tx_hash)] = int(time.time()) handle_store(ec) + self.monitor.tx_stored(desc) def confirmed(self, ec, desc): tx_hash, prevouts, addrs = desc + tx_hash = bitcoin.hash_digest(tx_hash) if ec: print "Problem confirming transaction", tx_hash, ec return @@ -82,6 +87,7 @@ class MemoryPoolBuffer: outpoint.hash, outpoint.index = tx_hash, idx self.lookup_address.delete(str(address), outpoint) del self.timestamps[str(tx_hash)] + self.monitor.tx_confirmed(desc) def check(self, output_points, address, handle): class ExtendableDict(dict): @@ -171,6 +177,9 @@ class History: bind(self.start_loading, _1, output_points)) def start_loading(self, membuf_result, output_points): + if len(membuf_result) == 0 and len(output_points) == 0: + self.handle_finish([]) + self.stopped() # Create a bunch of entry lines which are outputs and # then their corresponding input (if it exists) for outpoint in output_points: @@ -432,15 +441,21 @@ if __name__ == "__main__": print begin, " " * (12 - len(begin)), v print + class FakeMonitor: + def tx_stored(self, tx): + pass + def tx_confirmed(self, tx): + pass + service = bitcoin.async_service(1) prefix = "/home/genjix/libbitcoin/database.old" chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started) txpool = bitcoin.transaction_pool(service, chain) - membuf = MemoryPoolBuffer(txpool, chain) + membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor()) membuf.recv_tx(tx_a, store_tx) membuf.recv_tx(tx_b, store_tx) raw_input() - address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR" + address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "18auo3rqfsjth3w2H9zyEz467DDFNNpMJP" #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" print "Looking up", address payment_history(chain, txpool, membuf, address[0], finish) diff --git a/processor.py b/processor.py index be61c73..446c37f 100644 --- a/processor.py +++ b/processor.py @@ -40,7 +40,8 @@ class Processor(threading.Thread): pass def push_response(self, response): - self.dispatcher.request_dispatcher.push_response(response) + print "response", response + #self.dispatcher.request_dispatcher.push_response(response)