X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Flibbitcoin%2F__init__.py;h=247eae7b5fdf7f3cb5af034d3eadf3cd0c03e405;hb=27bd2d1754b5949678d1ee7ff5efad4b3c8bef6f;hp=470cca42336682b1a63873897cc0be00dcbd09f1;hpb=01a63932e6871169f2a63b41c7744d189355c19a;p=electrum-server.git diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index 470cca4..247eae7 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -1,13 +1,130 @@ import bitcoin +from bitcoin import bind, _1, _2, _3 from processor import Processor import threading import time -import composed +import history -class Backend: +class HistoryCache: def __init__(self): + self.lock = threading.Lock() + self.cache = {} + + def store(self, address, result): + with self.lock: + self.cache[address] = result + + def fetch(self, address): + try: + with self.lock: + return self.cache[address] + except KeyError: + return None + + def clear(self, addresses): + with self.lock: + for address in addresses: + if self.cache.has_key(address): + del self.cache[address] + +class MonitorAddress: + + def __init__(self, processor, cache): + self.processor = processor + self.cache = cache + self.lock = threading.Lock() + # key is hash:index, value is address + self.monitor_output = {} + # key is address + self.monitor_address = set() + # affected + self.affected = {} + + def monitor(self, address, result): + for info in result: + if not info.has_key("raw_output_script"): + continue + assert info["is_input"] == 0 + tx_hash = info["tx_hash"] + output_index = info["index"] + outpoint = "%s:%s" % (tx_hash, output_index) + with self.lock: + self.monitor_output[outpoint] = address + with self.lock: + self.monitor_address.add(address) + + def tx_stored(self, tx_desc): + tx_hash, prevouts, addrs = tx_desc + affected_addrs = set() + for prevout_hash, prevout_index in prevouts: + prevout = "%s:%s" % (prevout_hash, prevout_index) + with self.lock: + if self.monitor_output.has_key(prevout): + affected_addrs.add(self.monitor_output[prevout]) + for idx, address in addrs: + with self.lock: + if address in self.monitor_address: + affected_addrs.add(address) + with self.lock: + self.affected[tx_hash] = affected_addrs + self.cache.clear(affected_addrs) + self.notify(affected_addrs) + + def tx_confirmed(self, tx_desc): + tx_hash, prevouts, addrs = tx_desc + with self.lock: + affected_addrs = self.affected[tx_hash] + del self.affected[tx_hash] + self.cache.clear(affected_addrs) + self.notify(affected_addrs) + # add new outputs to monitor + for idx, address in addrs: + outpoint = "%s:%s" % (tx_hash, idx) + with self.lock: + if address in affected_addrs: + self.monitor_output[outpoint] = address + # delete spent outpoints + for prevout_hash, prevout_index in prevouts: + prevout = "%s:%s" % (prevout_hash, prevout_index) + with self.lock: + if self.monitor_output.has_key(prevout): + del self.monitor_output[prevout] + + def notify(self, affected_addrs): + templ_response = {"id": None, + "method": "blockchain.address.subscribe", + "params": []} + chain = self.backend.blockchain + txpool = self.backend.transaction_pool + membuf = self.backend.pool_buffer + for address in affected_addrs: + response = templ_response.copy() + response["params"].append(address) + history.payment_history(chain, txpool, membuf, address, + bind(self.send_notify, _1, response)) + + def mempool_n(self, result): + assert result is not None + if len(result) == 0: + return None + # mempool:n status + # Order by time, grab last item (latest) + last_info = sorted(result, key=lambda k: k['timestamp'])[-1] + if last_info["block_hash"] == "mempool": + last_id = "mempool:%s" % len(result) + else: + last_id = last_info["block_hash"] + return last_id + + def send_notify(self, result, response): + response["params"].append(self.mempool_n(result)) + self.processor.push_response(response) + +class Backend: + + def __init__(self, monitor): # Create 3 thread-pools each with 1 thread self.network_service = bitcoin.async_service(1) self.disk_service = bitcoin.async_service(1) @@ -20,7 +137,8 @@ class Backend: self.handshake, self.network) db_prefix = "/home/genjix/libbitcoin/database" - self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix) + self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix, + self.blockchain_started) self.poller = bitcoin.poller(self.blockchain) self.transaction_pool = \ bitcoin.transaction_pool(self.mempool_service, self.blockchain) @@ -32,10 +150,17 @@ class Backend: self.poller, self.transaction_pool) self.session.start(self.handle_start) + self.pool_buffer = \ + history.MemoryPoolBuffer(self.transaction_pool, + self.blockchain, monitor) + def handle_start(self, ec): if ec: print "Error starting backend:", ec + def blockchain_started(self, ec, chain): + print "Blockchain initialisation:", ec + def stop(self): self.session.stop(self.handle_stop) @@ -49,8 +174,7 @@ class Backend: # Here we subscribe to new transactions from them which we # add to the transaction_pool. That way we can track which # transactions we are interested in. - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + node.subscribe_transaction(bind(self.recv_tx, _1, _2, node)) # Re-subscribe to next new node self.protocol.subscribe_channel(self.monitor_tx) @@ -59,27 +183,16 @@ class Backend: print "Error with new transaction:", ec return tx_hash = bitcoin.hash_transaction(tx) - # If we want to ignore this transaction, we can set - # the 2 handlers to be null handlers that do nothing. - self.transaction_pool.store(tx, - bitcoin.bind(self.tx_confirmed, bitcoin._1, tx_hash), - bitcoin.bind(self.handle_mempool_store, bitcoin._1, tx_hash)) + self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash)) # Re-subscribe to new transactions from node - node.subscribe_transaction( - bitcoin.bind(self.recv_tx, bitcoin._1, bitcoin._2, node)) + node.subscribe_transaction(bind(self.recv_tx, _1, _2, node)) - def handle_mempool_store(self, ec, tx_hash): + def store_tx(self, ec, tx_hash): if ec: print "Error storing memory pool transaction", tx_hash, ec else: print "Accepted transaction", tx_hash - def tx_confirmed(self, ec, tx_hash): - if ec: - print "Problem confirming transaction", tx_hash, ec - else: - print "Confirmed", tx_hash - class GhostValue: def __init__(self): @@ -113,10 +226,18 @@ class NumblocksSubscribe: def reorganize(self, ec, fork_point, arrivals, replaced): latest = fork_point + len(arrivals) self.latest.set(latest) - response = {"method": "numblocks.subscribe", "result": latest} + response = {"id": None, "method": "blockchain.numblocks.subscribe", + "result": latest} self.processor.push_response(response) self.backend.blockchain.subscribe_reorganize(self.reorganize) + def subscribe(self, request): + latest = self.latest.get() + response = {"id": request["id"], + "method": "blockchain.numblocks.subscribe", + "result": latest, + "error": None} + self.processor.push_response(response) class AddressGetHistory: @@ -125,38 +246,87 @@ class AddressGetHistory: self.processor = processor def get(self, request): - address = str(request["params"]) - composed.payment_history(self.backend.blockchain, address, - bitcoin.bind(self.respond, request, bitcoin._1)) - - def respond(self, request, result): - response = {"id": request["id"], "method": request["method"], - "params": request["params"], "result": result} + address = str(request["params"][0]) + chain = self.backend.blockchain + txpool = self.backend.transaction_pool + membuf = self.backend.pool_buffer + history.payment_history(chain, txpool, membuf, address, + bind(self.respond, _1, request)) + + def respond(self, result, request): + if result is None: + response = {"id": request["id"], "result": None, + "error": {"message": "Error", "code": -4}} + else: + response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) +class AddressSubscribe: + + def __init__(self, backend, processor, cache, monitor): + self.backend = backend + self.processor = processor + self.cache = cache + self.monitor = monitor + + self.backend.pool_buffer.cheat = self + + def subscribe(self, request): + address = str(request["params"][0]) + chain = self.backend.blockchain + txpool = self.backend.transaction_pool + membuf = self.backend.pool_buffer + history.payment_history(chain, txpool, membuf, address, + bind(self.construct, _1, request)) + + def construct(self, result, request): + if result is None: + response = {"id": request["id"], "result": None, + "error": {"message": "Error", "code": -4}} + return + last_id = self.monitor.mempool_n(result) + response = {"id": request["id"], "result": last_id, "error": None} + self.processor.push_response(response) + address = request["params"][0] + self.monitor.monitor(address, result) + # Cache result for get_history + self.cache.store(address, result) + + def fetch_cached(self, request): + address = request["params"][0] + cached = self.cache.fetch(address) + if cached is None: + return False + response = {"id": request["id"], "result": cached, "error": None} + self.processor.push_response(response) + return True class BlockchainProcessor(Processor): def __init__(self, config): Processor.__init__(self) - self.backend = Backend() + cache = HistoryCache() + monitor = MonitorAddress(self, cache) + self.backend = Backend(monitor) + monitor.backend = self.backend self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) self.address_get_history = AddressGetHistory(self.backend, self) + self.address_subscribe = \ + AddressSubscribe(self.backend, self, cache, monitor) def stop(self): self.backend.stop() def process(self, request): print "New request (lib)", request - if request["method"] == "numblocks.subscribe": - self.numblocks_subscribe.subscribe(session, request) - elif request["method"] == "address.get_history": - self.address_get_history.get(request) - elif request["method"] == "server.banner": - self.push_response({"id": request["id"], - "method": request["method"], "params": request["params"], - "result": "libbitcoin using python-bitcoin bindings"}) - elif request["method"] == "transaction.broadcast": + if request["method"] == "blockchain.numblocks.subscribe": + self.numblocks_subscribe.subscribe(request) + elif request["method"] == "blockchain.address.subscribe": + self.address_subscribe.subscribe(request) + elif request["method"] == "blockchain.address.get_history": + if not self.address_subscribe.fetch_cached(request): + self.address_get_history.get(request) + elif request["method"] == "blockchain.transaction.broadcast": self.broadcast_transaction(request) def broadcast_transaction(self, request): @@ -165,16 +335,14 @@ class BlockchainProcessor(Processor): try: tx = exporter.load_transaction(raw_tx) except RuntimeError: - response = {"id": request["id"], "method": request["method"], - "params": request["params"], "result": None, + response = {"id": request["id"], "result": None, "error": {"message": "Exception while parsing the transaction data.", "code": -4}} else: self.backend.protocol.broadcast_transaction(tx) tx_hash = str(bitcoin.hash_transaction(tx)) - response = {"id": request["id"], "method": request["method"], - "params": request["params"], "result": tx_hash} + response = {"id": request["id"], "result": tx_hash, "error": None} self.push_response(response) def run(self):