X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Flibbitcoin%2F__init__.py;h=ef2eb1d81cde7cbc78f89f7db140d8ee4b5fdf9f;hb=4ce69b7ea24ead59ebbcc7ed335ea9762ae3724b;hp=aa13238234461c1e2128c17f65b59a6245276cf0;hpb=dc6b846eb9e11146218abf3f13b8f5c36817bbc7;p=electrum-server.git diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index aa13238..ef2eb1d 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -1,10 +1,144 @@ +import threading +import time + import bitcoin from bitcoin import bind, _1, _2, _3 + from processor import Processor -import threading -import time +import history1 as history +import membuf + + +class HistoryCache: + + def __init__(self): + self.lock = threading.Lock() + self.cache = {} + + def store(self, address, result): + with self.lock: + self.cache[address] = result + + def fetch(self, address): + try: + with self.lock: + return self.cache[address] + except KeyError: + return None + + def clear(self, addresses): + with self.lock: + for address in addresses: + if address in self.cache: + del self.cache[address] + + +class MonitorAddress: + + def __init__(self, processor, cache, backend): + self.processor = processor + self.cache = cache + self.backend = backend + self.lock = threading.Lock() + # key is hash:index, value is address + self.monitor_output = {} + # key is address + self.monitor_address = set() + + backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed) + + def monitor(self, address, result): + for info in result: + if "raw_output_script" not in info: + continue + assert info["is_input"] == 0 + tx_hash = info["tx_hash"] + output_index = info["index"] + outpoint = "%s:%s" % (tx_hash, output_index) + with self.lock: + self.monitor_output[outpoint] = address + with self.lock: + self.monitor_address.add(address) + + def unpack(self, tx): + tx_hash = bitcoin.hash_transaction(tx) + previous_outputs = [] + for input in tx.inputs: + prevout = input.previous_output + prevout = "%s:%s" % (prevout.hash, prevout.index) + previous_outputs.append(prevout) + addrs = [] + for output_index, output in enumerate(tx.outputs): + address = bitcoin.payment_address() + if address.extract(output.output_script): + addrs.append((output_index, str(address))) + return tx_hash, previous_outputs, addrs + + def effect_notify(self, tx, delete_outs): + affected_addrs = set() + tx_hash, previous_outputs, addrs = self.unpack(tx) + for prevout in previous_outputs: + try: + with self.lock: + affected_addrs.add(self.monitor_output[prevout]) + if delete_outs: + del self.monitor_output[prevout] + except KeyError: + pass + for idx, address in addrs: + with self.lock: + if address in self.monitor_address: + affected_addrs.add(address) + self.cache.clear(affected_addrs) + self.notify(affected_addrs) + # Used in confirmed txs + return tx_hash, addrs, affected_addrs + + def tx_stored(self, tx): + self.effect_notify(tx, False) + + def tx_confirmed(self, tx): + tx_hash, addrs, affected_addrs = self.effect_notify(tx, True) + # add new outputs to monitor + for idx, address in addrs: + outpoint = "%s:%s" % (tx_hash, idx) + if address in affected_addrs: + with self.lock: + self.monitor_output[outpoint] = address + + def notify(self, affected_addrs): + service = self.backend.mempool_service + chain = self.backend.blockchain + txpool = self.backend.transaction_pool + memory_buff = self.backend.memory_buffer + for address in affected_addrs: + response = {"id": None, + "method": "blockchain.address.subscribe", + "params": [str(address)]} + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.send_notify, _1, _2, response)) + + def mempool_n(self, result): + assert result is not None + if len(result) == 0: + return None + # mempool:n status + # Order by time, grab last item (latest) + last_info = sorted(result, key=lambda k: k['timestamp'])[-1] + if last_info["block_hash"] == "mempool": + last_id = "mempool:%s" % len(result) + else: + last_id = last_info["block_hash"] + return last_id + + def send_notify(self, ec, result, response): + if ec: + print "Error: Monitor.send_notify()", ec + return + assert len(response["params"]) == 1 + response["params"].append(self.mempool_n(result)) + self.processor.push_response(response) -import history class Backend: @@ -23,7 +157,7 @@ class Backend: db_prefix = "/home/genjix/libbitcoin/database" self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix, self.blockchain_started) - self.poller = bitcoin.poller(self.blockchain) + self.poller = bitcoin.poller(self.mempool_service, self.blockchain) self.transaction_pool = \ bitcoin.transaction_pool(self.mempool_service, self.blockchain) @@ -34,8 +168,10 @@ class Backend: self.poller, self.transaction_pool) self.session.start(self.handle_start) - self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool, - self.blockchain) + self.memory_buffer = \ + membuf.memory_buffer(self.mempool_service.internal_ptr, + self.blockchain.internal_ptr, + self.transaction_pool.internal_ptr) def handle_start(self, ec): if ec: @@ -66,7 +202,7 @@ class Backend: print "Error with new transaction:", ec return tx_hash = bitcoin.hash_transaction(tx) - self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash)) + self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash)) # Re-subscribe to new transactions from node node.subscribe_transaction(bind(self.recv_tx, _1, _2, node)) @@ -76,6 +212,7 @@ class Backend: else: print "Accepted transaction", tx_hash + class GhostValue: def __init__(self): @@ -90,6 +227,7 @@ class GhostValue: self.value = value self.event.set() + class NumblocksSubscribe: def __init__(self, backend, processor): @@ -110,18 +248,18 @@ class NumblocksSubscribe: latest = fork_point + len(arrivals) self.latest.set(latest) response = {"id": None, "method": "blockchain.numblocks.subscribe", - "result": latest} + "params": [latest]} self.processor.push_response(response) self.backend.blockchain.subscribe_reorganize(self.reorganize) def subscribe(self, request): latest = self.latest.get() response = {"id": request["id"], - "method": "blockchain.numblocks.subscribe", "result": latest, "error": None} self.processor.push_response(response) + class AddressGetHistory: def __init__(self, backend, processor): @@ -130,51 +268,74 @@ class AddressGetHistory: def get(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.respond, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.respond, _1, _2, request)) - def respond(self, result, request): - if result is None: + def respond(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} else: response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) + class AddressSubscribe: - def __init__(self, backend, processor): + def __init__(self, backend, processor, cache, monitor): self.backend = backend self.processor = processor + self.cache = cache + self.monitor = monitor - def subscribe(self, session, request): + def subscribe(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.respond, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.construct, _1, _2, request)) - def construct(self, result, request): - if result is None: + def construct(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} + self.processor.push_response(response) return - else: - response = {"id": request["id"], "result": result, "error": None} + last_id = self.monitor.mempool_n(result) + response = {"id": request["id"], "result": last_id, "error": None} self.processor.push_response(response) + address = request["params"][0] + self.monitor.monitor(address, result) + # Cache result for get_history + self.cache.store(address, result) + + def fetch_cached(self, request): + address = request["params"][0] + cached = self.cache.fetch(address) + if cached is None: + return False + response = {"id": request["id"], "result": cached, "error": None} + self.processor.push_response(response) + return True + class BlockchainProcessor(Processor): def __init__(self, config): Processor.__init__(self) + cache = HistoryCache() self.backend = Backend() + monitor = MonitorAddress(self, cache, self.backend) self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) self.address_get_history = AddressGetHistory(self.backend, self) - self.address_subscribe = AddressSubscribe(self.backend, self) + self.address_subscribe = \ + AddressSubscribe(self.backend, self, cache, monitor) def stop(self): self.backend.stop() @@ -184,30 +345,29 @@ class BlockchainProcessor(Processor): if request["method"] == "blockchain.numblocks.subscribe": self.numblocks_subscribe.subscribe(request) elif request["method"] == "blockchain.address.subscribe": - pass + self.address_subscribe.subscribe(request) elif request["method"] == "blockchain.address.get_history": - self.address_get_history.get(request) + if not self.address_subscribe.fetch_cached(request): + self.address_get_history.get(request) elif request["method"] == "blockchain.transaction.broadcast": self.broadcast_transaction(request) def broadcast_transaction(self, request): - raw_tx = bitcoin.data_chunk(str(request["params"])) + raw_tx = bitcoin.data_chunk(str(request["params"][0])) exporter = bitcoin.satoshi_exporter() try: tx = exporter.load_transaction(raw_tx) except RuntimeError: - response = {"id": request["id"], "result": None, - "error": {"message": - "Exception while parsing the transaction data.", - "code": -4}} + response = { + "id": request["id"], + "result": None, + "error": { + "message": "Exception while parsing the transaction data.", + "code": -4, + } + } else: self.backend.protocol.broadcast_transaction(tx) tx_hash = str(bitcoin.hash_transaction(tx)) response = {"id": request["id"], "result": tx_hash, "error": None} self.push_response(response) - - def run(self): - print "Warning: pre-alpha prototype. Full of bugs." - while not self.shared.stopped(): - time.sleep(1) -