X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Flibbitcoin%2F__init__.py;h=e6092f779b9b85ef548daade899a995011ce526d;hb=2e04e2c499984d5a8f510b33ca040058bec59c8c;hp=8e511693e0ece7bf763d5a18283c8d441f1cbd88;hpb=ce2fafb656aa4a5ae43bda3617c6bdedcf9c5a84;p=electrum-server.git diff --git a/backends/libbitcoin/__init__.py b/backends/libbitcoin/__init__.py index 8e51169..e6092f7 100644 --- a/backends/libbitcoin/__init__.py +++ b/backends/libbitcoin/__init__.py @@ -41,8 +41,6 @@ class MonitorAddress: self.monitor_output = {} # key is address self.monitor_address = set() - # affected - self.affected = {} backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed) @@ -73,52 +71,47 @@ class MonitorAddress: addrs.append((output_index, str(address))) return tx_hash, previous_outputs, addrs - def tx_stored(self, tx): + def effect_notify(self, tx, delete_outs): affected_addrs = set() tx_hash, previous_outputs, addrs = self.unpack(tx) for prevout in previous_outputs: - with self.lock: - if self.monitor_output.has_key(prevout): + try: + with self.lock: affected_addrs.add(self.monitor_output[prevout]) + if delete_outs: + del self.monitor_output[prevout] + except KeyError: + pass for idx, address in addrs: with self.lock: if address in self.monitor_address: affected_addrs.add(address) - with self.lock: - self.affected[tx_hash] = affected_addrs self.cache.clear(affected_addrs) self.notify(affected_addrs) + # Used in confirmed txs + return tx_hash, addrs, affected_addrs - def tx_confirmed(self, tx_desc): - tx_hash, previous_outputs, addrs = self.unpack(tx) - with self.lock: - affected_addrs = self.affected[tx_hash] - del self.affected[tx_hash] - self.cache.clear(affected_addrs) - self.notify(affected_addrs) + def tx_stored(self, tx): + self.effect_notify(tx, False) + + def tx_confirmed(self, tx): + tx_hash, addrs, affected_addrs = self.effect_notify(tx, True) # add new outputs to monitor for idx, address in addrs: outpoint = "%s:%s" % (tx_hash, idx) - with self.lock: - if address in affected_addrs: + if address in affected_addrs: + with self.lock: self.monitor_output[outpoint] = address - # delete spent outpoints - for prevout in previous_outputs: - with self.lock: - if self.monitor_output.has_key(prevout): - del self.monitor_output[prevout] def notify(self, affected_addrs): - templ_response = {"id": None, - "method": "blockchain.address.subscribe", - "params": []} service = self.backend.mempool_service chain = self.backend.blockchain txpool = self.backend.transaction_pool memory_buff = self.backend.memory_buffer for address in affected_addrs: - response = templ_response.copy() - response["params"].append(address) + response = {"id": None, + "method": "blockchain.address.subscribe", + "params": [str(address)]} history.payment_history(service, chain, txpool, memory_buff, address, bind(self.send_notify, _1, _2, response)) @@ -139,6 +132,7 @@ class MonitorAddress: if ec: print "Error: Monitor.send_notify()", ec return + assert len(response["params"]) == 1 response["params"].append(self.mempool_n(result)) self.processor.push_response(response) @@ -248,14 +242,13 @@ class NumblocksSubscribe: latest = fork_point + len(arrivals) self.latest.set(latest) response = {"id": None, "method": "blockchain.numblocks.subscribe", - "result": latest} + "params": [latest]} self.processor.push_response(response) self.backend.blockchain.subscribe_reorganize(self.reorganize) def subscribe(self, request): latest = self.latest.get() response = {"id": request["id"], - "method": "blockchain.numblocks.subscribe", "result": latest, "error": None} self.processor.push_response(response) @@ -351,7 +344,7 @@ class BlockchainProcessor(Processor): self.broadcast_transaction(request) def broadcast_transaction(self, request): - raw_tx = bitcoin.data_chunk(str(request["params"])) + raw_tx = bitcoin.data_chunk(str(request["params"][0])) exporter = bitcoin.satoshi_exporter() try: tx = exporter.load_transaction(raw_tx) @@ -366,8 +359,3 @@ class BlockchainProcessor(Processor): response = {"id": request["id"], "result": tx_hash, "error": None} self.push_response(response) - def run(self): - print "Warning: pre-alpha prototype. Full of bugs." - while not self.shared.stopped(): - time.sleep(1) -