mempool subscribe works with outputs of transactions.
[electrum-server.git] / backends / libbitcoin / __init__.py
index aa13238..247eae7 100644 (file)
@@ -6,9 +6,125 @@ import time
 
 import history 
 
-class Backend:
+class HistoryCache:
 
     def __init__(self):
+        self.lock = threading.Lock()
+        self.cache = {}
+
+    def store(self, address, result):
+        with self.lock:
+            self.cache[address] = result
+
+    def fetch(self, address):
+        try:
+            with self.lock:
+                return self.cache[address]
+        except KeyError:
+            return None
+
+    def clear(self, addresses):
+        with self.lock:
+            for address in addresses:
+                if self.cache.has_key(address):
+                    del self.cache[address]
+
+class MonitorAddress:
+
+    def __init__(self, processor, cache):
+        self.processor = processor
+        self.cache = cache
+        self.lock = threading.Lock()
+        # key is hash:index, value is address
+        self.monitor_output = {}
+        # key is address
+        self.monitor_address = set()
+        # affected
+        self.affected = {}
+
+    def monitor(self, address, result):
+        for info in result:
+            if not info.has_key("raw_output_script"):
+                continue
+            assert info["is_input"] == 0
+            tx_hash = info["tx_hash"]
+            output_index = info["index"]
+            outpoint = "%s:%s" % (tx_hash, output_index)
+            with self.lock:
+                self.monitor_output[outpoint] = address
+        with self.lock:
+            self.monitor_address.add(address)
+
+    def tx_stored(self, tx_desc):
+        tx_hash, prevouts, addrs = tx_desc
+        affected_addrs = set()
+        for prevout_hash, prevout_index in prevouts:
+            prevout = "%s:%s" % (prevout_hash, prevout_index)
+            with self.lock:
+                if self.monitor_output.has_key(prevout):
+                    affected_addrs.add(self.monitor_output[prevout])
+        for idx, address in addrs:
+            with self.lock:
+                if address in self.monitor_address:
+                    affected_addrs.add(address)
+        with self.lock:
+            self.affected[tx_hash] = affected_addrs
+        self.cache.clear(affected_addrs)
+        self.notify(affected_addrs)
+
+    def tx_confirmed(self, tx_desc):
+        tx_hash, prevouts, addrs = tx_desc
+        with self.lock:
+            affected_addrs = self.affected[tx_hash]
+            del self.affected[tx_hash]
+        self.cache.clear(affected_addrs)
+        self.notify(affected_addrs)
+        # add new outputs to monitor
+        for idx, address in addrs:
+            outpoint = "%s:%s" % (tx_hash, idx)
+            with self.lock:
+                if address in affected_addrs:
+                    self.monitor_output[outpoint] = address
+        # delete spent outpoints
+        for prevout_hash, prevout_index in prevouts:
+            prevout = "%s:%s" % (prevout_hash, prevout_index)
+            with self.lock:
+                if self.monitor_output.has_key(prevout):
+                    del self.monitor_output[prevout]
+
+    def notify(self, affected_addrs):
+        templ_response = {"id": None,
+                          "method": "blockchain.address.subscribe",
+                          "params": []}
+        chain = self.backend.blockchain
+        txpool = self.backend.transaction_pool
+        membuf = self.backend.pool_buffer
+        for address in affected_addrs:
+            response = templ_response.copy()
+            response["params"].append(address)
+            history.payment_history(chain, txpool, membuf, address,
+                bind(self.send_notify, _1, response))
+
+    def mempool_n(self, result):
+        assert result is not None
+        if len(result) == 0:
+            return None
+        # mempool:n status
+        # Order by time, grab last item (latest)
+        last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
+        if last_info["block_hash"] == "mempool":
+            last_id = "mempool:%s" % len(result)
+        else:
+            last_id = last_info["block_hash"]
+        return last_id
+
+    def send_notify(self, result, response):
+        response["params"].append(self.mempool_n(result))
+        self.processor.push_response(response)
+
+class Backend:
+
+    def __init__(self, monitor):
         # Create 3 thread-pools each with 1 thread
         self.network_service = bitcoin.async_service(1)
         self.disk_service = bitcoin.async_service(1)
@@ -34,8 +150,9 @@ class Backend:
                             self.poller, self.transaction_pool)
         self.session.start(self.handle_start)
 
-        self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool,
-                                                    self.blockchain)
+        self.pool_buffer = \
+            history.MemoryPoolBuffer(self.transaction_pool,
+                                     self.blockchain, monitor)
 
     def handle_start(self, ec):
         if ec:
@@ -146,35 +263,56 @@ class AddressGetHistory:
 
 class AddressSubscribe:
 
-    def __init__(self, backend, processor):
+    def __init__(self, backend, processor, cache, monitor):
         self.backend = backend
         self.processor = processor
+        self.cache = cache
+        self.monitor = monitor
 
-    def subscribe(self, session, request):
+        self.backend.pool_buffer.cheat = self
+
+    def subscribe(self, request):
         address = str(request["params"][0])
         chain = self.backend.blockchain
         txpool = self.backend.transaction_pool
         membuf = self.backend.pool_buffer
         history.payment_history(chain, txpool, membuf, address,
-            bind(self.respond, _1, request))
+            bind(self.construct, _1, request))
 
     def construct(self, result, request):
         if result is None:
             response = {"id": request["id"], "result": None,
                         "error": {"message": "Error", "code": -4}}
             return
-        else:
-            response = {"id": request["id"], "result": result, "error": None}
+        last_id = self.monitor.mempool_n(result)
+        response = {"id": request["id"], "result": last_id, "error": None}
+        self.processor.push_response(response)
+        address = request["params"][0]
+        self.monitor.monitor(address, result)
+        # Cache result for get_history
+        self.cache.store(address, result)
+
+    def fetch_cached(self, request):
+        address = request["params"][0]
+        cached = self.cache.fetch(address)
+        if cached is None:
+            return False
+        response = {"id": request["id"], "result": cached, "error": None}
         self.processor.push_response(response)
+        return True
 
 class BlockchainProcessor(Processor):
 
     def __init__(self, config):
         Processor.__init__(self)
-        self.backend = Backend()
+        cache = HistoryCache()
+        monitor = MonitorAddress(self, cache)
+        self.backend = Backend(monitor)
+        monitor.backend = self.backend
         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
         self.address_get_history = AddressGetHistory(self.backend, self)
-        self.address_subscribe = AddressSubscribe(self.backend, self)
+        self.address_subscribe = \
+            AddressSubscribe(self.backend, self, cache, monitor)
 
     def stop(self):
         self.backend.stop()
@@ -184,9 +322,10 @@ class BlockchainProcessor(Processor):
         if request["method"] == "blockchain.numblocks.subscribe":
             self.numblocks_subscribe.subscribe(request)
         elif request["method"] == "blockchain.address.subscribe":
-            pass
+            self.address_subscribe.subscribe(request)
         elif request["method"] == "blockchain.address.get_history":
-            self.address_get_history.get(request)
+            if not self.address_subscribe.fetch_cached(request):
+                self.address_get_history.get(request)
         elif request["method"] == "blockchain.transaction.broadcast":
             self.broadcast_transaction(request)