X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Flibbitcoin%2F__init__.py;h=8e511693e0ece7bf763d5a18283c8d441f1cbd88;hb=ce2fafb656aa4a5ae43bda3617c6bdedcf9c5a84;hp=247eae7b5fdf7f3cb5af034d3eadf3cd0c03e405;hpb=27bd2d1754b5949678d1ee7ff5efad4b3c8bef6f;p=electrum-server.git diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index 247eae7..8e51169 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -4,7 +4,8 @@ from processor import Processor import threading import time -import history +import history1 as history +import membuf class HistoryCache: @@ -31,9 +32,10 @@ class HistoryCache: class MonitorAddress: - def __init__(self, processor, cache): + def __init__(self, processor, cache, backend): self.processor = processor self.cache = cache + self.backend = backend self.lock = threading.Lock() # key is hash:index, value is address self.monitor_output = {} @@ -42,6 +44,8 @@ class MonitorAddress: # affected self.affected = {} + backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed) + def monitor(self, address, result): for info in result: if not info.has_key("raw_output_script"): @@ -55,11 +59,24 @@ class MonitorAddress: with self.lock: self.monitor_address.add(address) - def tx_stored(self, tx_desc): - tx_hash, prevouts, addrs = tx_desc + def unpack(self, tx): + tx_hash = bitcoin.hash_transaction(tx) + previous_outputs = [] + for input in tx.inputs: + prevout = input.previous_output + prevout = "%s:%s" % (prevout.hash, prevout.index) + previous_outputs.append(prevout) + addrs = [] + for output_index, output in enumerate(tx.outputs): + address = bitcoin.payment_address() + if address.extract(output.output_script): + addrs.append((output_index, str(address))) + return tx_hash, previous_outputs, addrs + + def tx_stored(self, tx): affected_addrs = set() - for prevout_hash, prevout_index in prevouts: - prevout = "%s:%s" % (prevout_hash, prevout_index) + tx_hash, previous_outputs, addrs = self.unpack(tx) + for prevout in previous_outputs: with self.lock: if self.monitor_output.has_key(prevout): affected_addrs.add(self.monitor_output[prevout]) @@ -73,7 +90,7 @@ class MonitorAddress: self.notify(affected_addrs) def tx_confirmed(self, tx_desc): - tx_hash, prevouts, addrs = tx_desc + tx_hash, previous_outputs, addrs = self.unpack(tx) with self.lock: affected_addrs = self.affected[tx_hash] del self.affected[tx_hash] @@ -86,8 +103,7 @@ class MonitorAddress: if address in affected_addrs: self.monitor_output[outpoint] = address # delete spent outpoints - for prevout_hash, prevout_index in prevouts: - prevout = "%s:%s" % (prevout_hash, prevout_index) + for prevout in previous_outputs: with self.lock: if self.monitor_output.has_key(prevout): del self.monitor_output[prevout] @@ -96,14 +112,15 @@ class MonitorAddress: templ_response = {"id": None, "method": "blockchain.address.subscribe", "params": []} + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer + memory_buff = self.backend.memory_buffer for address in affected_addrs: response = templ_response.copy() response["params"].append(address) - history.payment_history(chain, txpool, membuf, address, - bind(self.send_notify, _1, response)) + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.send_notify, _1, _2, response)) def mempool_n(self, result): assert result is not None @@ -118,13 +135,16 @@ class MonitorAddress: last_id = last_info["block_hash"] return last_id - def send_notify(self, result, response): + def send_notify(self, ec, result, response): + if ec: + print "Error: Monitor.send_notify()", ec + return response["params"].append(self.mempool_n(result)) self.processor.push_response(response) class Backend: - def __init__(self, monitor): + def __init__(self): # Create 3 thread-pools each with 1 thread self.network_service = bitcoin.async_service(1) self.disk_service = bitcoin.async_service(1) @@ -139,7 +159,7 @@ class Backend: db_prefix = "/home/genjix/libbitcoin/database" self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix, self.blockchain_started) - self.poller = bitcoin.poller(self.blockchain) + self.poller = bitcoin.poller(self.mempool_service, self.blockchain) self.transaction_pool = \ bitcoin.transaction_pool(self.mempool_service, self.blockchain) @@ -150,9 +170,10 @@ class Backend: self.poller, self.transaction_pool) self.session.start(self.handle_start) - self.pool_buffer = \ - history.MemoryPoolBuffer(self.transaction_pool, - self.blockchain, monitor) + self.memory_buffer = \ + membuf.memory_buffer(self.mempool_service.internal_ptr, + self.blockchain.internal_ptr, + self.transaction_pool.internal_ptr) def handle_start(self, ec): if ec: @@ -183,7 +204,7 @@ class Backend: print "Error with new transaction:", ec return tx_hash = bitcoin.hash_transaction(tx) - self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash)) + self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash)) # Re-subscribe to new transactions from node node.subscribe_transaction(bind(self.recv_tx, _1, _2, node)) @@ -247,16 +268,17 @@ class AddressGetHistory: def get(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.respond, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.respond, _1, _2, request)) - def respond(self, result, request): - if result is None: + def respond(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} else: response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) @@ -269,20 +291,20 @@ class AddressSubscribe: self.cache = cache self.monitor = monitor - self.backend.pool_buffer.cheat = self - def subscribe(self, request): address = str(request["params"][0]) + service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool - membuf = self.backend.pool_buffer - history.payment_history(chain, txpool, membuf, address, - bind(self.construct, _1, request)) + memory_buff = self.backend.memory_buffer + history.payment_history(service, chain, txpool, memory_buff, + address, bind(self.construct, _1, _2, request)) - def construct(self, result, request): - if result is None: + def construct(self, ec, result, request): + if ec: response = {"id": request["id"], "result": None, - "error": {"message": "Error", "code": -4}} + "error": {"message": str(ec), "code": -4}} + self.processor.push_response(response) return last_id = self.monitor.mempool_n(result) response = {"id": request["id"], "result": last_id, "error": None} @@ -306,9 +328,8 @@ class BlockchainProcessor(Processor): def __init__(self, config): Processor.__init__(self) cache = HistoryCache() - monitor = MonitorAddress(self, cache) - self.backend = Backend(monitor) - monitor.backend = self.backend + self.backend = Backend() + monitor = MonitorAddress(self, cache, self.backend) self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) self.address_get_history = AddressGetHistory(self.backend, self) self.address_subscribe = \