X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=blobdiff_plain;f=backends%2Flibbitcoin%2F__init__.py;h=ef2eb1d81cde7cbc78f89f7db140d8ee4b5fdf9f;hp=f1664c9368bff5caf24a48a85765602489eb894f;hb=240db7c28d2e6a0d0078c42f81b2b110615e88c6;hpb=4565e5be9d30d3d28f9319f613cd5311081fc4d7 diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index f1664c9..ef2eb1d 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -1,12 +1,14 @@ -import bitcoin -from bitcoin import bind, _1, _2, _3 -from processor import Processor import threading import time +import bitcoin +from bitcoin import bind, _1, _2, _3 + +from processor import Processor import history1 as history import membuf + class HistoryCache: def __init__(self): @@ -27,9 +29,10 @@ class HistoryCache: def clear(self, addresses): with self.lock: for address in addresses: - if self.cache.has_key(address): + if address in self.cache: del self.cache[address] + class MonitorAddress: def __init__(self, processor, cache, backend): @@ -41,14 +44,12 @@ class MonitorAddress: self.monitor_output = {} # key is address self.monitor_address = set() - # affected - self.affected = {} backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed) def monitor(self, address, result): for info in result: - if not info.has_key("raw_output_script"): + if "raw_output_script" not in info: continue assert info["is_input"] == 0 tx_hash = info["tx_hash"] @@ -73,40 +74,37 @@ class MonitorAddress: addrs.append((output_index, str(address))) return tx_hash, previous_outputs, addrs - def tx_stored(self, tx): + def effect_notify(self, tx, delete_outs): affected_addrs = set() tx_hash, previous_outputs, addrs = self.unpack(tx) for prevout in previous_outputs: - with self.lock: - if self.monitor_output.has_key(prevout): + try: + with self.lock: affected_addrs.add(self.monitor_output[prevout]) + if delete_outs: + del self.monitor_output[prevout] + except KeyError: + pass for idx, address in addrs: with self.lock: if address in self.monitor_address: affected_addrs.add(address) - with self.lock: - self.affected[tx_hash] = affected_addrs self.cache.clear(affected_addrs) self.notify(affected_addrs) + # Used in confirmed txs + return tx_hash, addrs, affected_addrs + + def tx_stored(self, tx): + self.effect_notify(tx, False) def tx_confirmed(self, tx): - tx_hash, previous_outputs, addrs = self.unpack(tx) - with self.lock: - affected_addrs = self.affected[tx_hash] - del self.affected[tx_hash] - self.cache.clear(affected_addrs) - self.notify(affected_addrs) + tx_hash, addrs, affected_addrs = self.effect_notify(tx, True) # add new outputs to monitor for idx, address in addrs: outpoint = "%s:%s" % (tx_hash, idx) - with self.lock: - if address in affected_addrs: + if address in affected_addrs: + with self.lock: self.monitor_output[outpoint] = address - # delete spent outpoints - for prevout in previous_outputs: - with self.lock: - if self.monitor_output.has_key(prevout): - del self.monitor_output[prevout] def notify(self, affected_addrs): service = self.backend.mempool_service @@ -117,8 +115,8 @@ class MonitorAddress: response = {"id": None, "method": "blockchain.address.subscribe", "params": [str(address)]} - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.send_notify, _1, _2, response)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.send_notify, _1, _2, response)) def mempool_n(self, result): assert result is not None @@ -141,6 +139,7 @@ class MonitorAddress: response["params"].append(self.mempool_n(result)) self.processor.push_response(response) + class Backend: def __init__(self): @@ -213,6 +212,7 @@ class Backend: else: print "Accepted transaction", tx_hash + class GhostValue: def __init__(self): @@ -227,6 +227,7 @@ class GhostValue: self.value = value self.event.set() + class NumblocksSubscribe: def __init__(self, backend, processor): @@ -247,18 +248,18 @@ class NumblocksSubscribe: latest = fork_point + len(arrivals) self.latest.set(latest) response = {"id": None, "method": "blockchain.numblocks.subscribe", - "result": latest} + "params": [latest]} self.processor.push_response(response) self.backend.blockchain.subscribe_reorganize(self.reorganize) def subscribe(self, request): latest = self.latest.get() response = {"id": request["id"], - "method": "blockchain.numblocks.subscribe", "result": latest, "error": None} self.processor.push_response(response) + class AddressGetHistory: def __init__(self, backend, processor): @@ -271,8 +272,8 @@ class AddressGetHistory: chain = self.backend.blockchain txpool = self.backend.transaction_pool memory_buff = self.backend.memory_buffer - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.respond, _1, _2, request)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.respond, _1, _2, request)) def respond(self, ec, result, request): if ec: @@ -282,6 +283,7 @@ class AddressGetHistory: response = {"id": request["id"], "result": result, "error": None} self.processor.push_response(response) + class AddressSubscribe: def __init__(self, backend, processor, cache, monitor): @@ -296,8 +298,8 @@ class AddressSubscribe: chain = self.backend.blockchain txpool = self.backend.transaction_pool memory_buff = self.backend.memory_buffer - history.payment_history(service, chain, txpool, memory_buff, - address, bind(self.construct, _1, _2, request)) + history.payment_history(service, chain, txpool, memory_buff, address, + bind(self.construct, _1, _2, request)) def construct(self, ec, result, request): if ec: @@ -322,6 +324,7 @@ class AddressSubscribe: self.processor.push_response(response) return True + class BlockchainProcessor(Processor): def __init__(self, config): @@ -355,18 +358,16 @@ class BlockchainProcessor(Processor): try: tx = exporter.load_transaction(raw_tx) except RuntimeError: - response = {"id": request["id"], "result": None, - "error": {"message": - "Exception while parsing the transaction data.", - "code": -4}} + response = { + "id": request["id"], + "result": None, + "error": { + "message": "Exception while parsing the transaction data.", + "code": -4, + } + } else: self.backend.protocol.broadcast_transaction(tx) tx_hash = str(bitcoin.hash_transaction(tx)) response = {"id": request["id"], "result": tx_hash, "error": None} self.push_response(response) - - def run(self): - print "Warning: pre-alpha prototype. Full of bugs." - while not self.shared.stopped(): - time.sleep(1) -