mempool subscribe works with outputs of transactions.
authorgenjix <fake@lol.u>
Thu, 19 Apr 2012 13:29:12 +0000 (14:29 +0100)
committergenjix <fake@lol.u>
Thu, 19 Apr 2012 13:29:12 +0000 (14:29 +0100)
backends/libbitcoin/__init__.py
backends/libbitcoin/history.py
processor.py

index aa13238..247eae7 100644 (file)
@@ -6,9 +6,125 @@ import time
 
 import history 
 
-class Backend:
+class HistoryCache:
 
     def __init__(self):
+        self.lock = threading.Lock()
+        self.cache = {}
+
+    def store(self, address, result):
+        with self.lock:
+            self.cache[address] = result
+
+    def fetch(self, address):
+        try:
+            with self.lock:
+                return self.cache[address]
+        except KeyError:
+            return None
+
+    def clear(self, addresses):
+        with self.lock:
+            for address in addresses:
+                if self.cache.has_key(address):
+                    del self.cache[address]
+
+class MonitorAddress:
+
+    def __init__(self, processor, cache):
+        self.processor = processor
+        self.cache = cache
+        self.lock = threading.Lock()
+        # key is hash:index, value is address
+        self.monitor_output = {}
+        # key is address
+        self.monitor_address = set()
+        # affected
+        self.affected = {}
+
+    def monitor(self, address, result):
+        for info in result:
+            if not info.has_key("raw_output_script"):
+                continue
+            assert info["is_input"] == 0
+            tx_hash = info["tx_hash"]
+            output_index = info["index"]
+            outpoint = "%s:%s" % (tx_hash, output_index)
+            with self.lock:
+                self.monitor_output[outpoint] = address
+        with self.lock:
+            self.monitor_address.add(address)
+
+    def tx_stored(self, tx_desc):
+        tx_hash, prevouts, addrs = tx_desc
+        affected_addrs = set()
+        for prevout_hash, prevout_index in prevouts:
+            prevout = "%s:%s" % (prevout_hash, prevout_index)
+            with self.lock:
+                if self.monitor_output.has_key(prevout):
+                    affected_addrs.add(self.monitor_output[prevout])
+        for idx, address in addrs:
+            with self.lock:
+                if address in self.monitor_address:
+                    affected_addrs.add(address)
+        with self.lock:
+            self.affected[tx_hash] = affected_addrs
+        self.cache.clear(affected_addrs)
+        self.notify(affected_addrs)
+
+    def tx_confirmed(self, tx_desc):
+        tx_hash, prevouts, addrs = tx_desc
+        with self.lock:
+            affected_addrs = self.affected[tx_hash]
+            del self.affected[tx_hash]
+        self.cache.clear(affected_addrs)
+        self.notify(affected_addrs)
+        # add new outputs to monitor
+        for idx, address in addrs:
+            outpoint = "%s:%s" % (tx_hash, idx)
+            with self.lock:
+                if address in affected_addrs:
+                    self.monitor_output[outpoint] = address
+        # delete spent outpoints
+        for prevout_hash, prevout_index in prevouts:
+            prevout = "%s:%s" % (prevout_hash, prevout_index)
+            with self.lock:
+                if self.monitor_output.has_key(prevout):
+                    del self.monitor_output[prevout]
+
+    def notify(self, affected_addrs):
+        templ_response = {"id": None,
+                          "method": "blockchain.address.subscribe",
+                          "params": []}
+        chain = self.backend.blockchain
+        txpool = self.backend.transaction_pool
+        membuf = self.backend.pool_buffer
+        for address in affected_addrs:
+            response = templ_response.copy()
+            response["params"].append(address)
+            history.payment_history(chain, txpool, membuf, address,
+                bind(self.send_notify, _1, response))
+
+    def mempool_n(self, result):
+        assert result is not None
+        if len(result) == 0:
+            return None
+        # mempool:n status
+        # Order by time, grab last item (latest)
+        last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
+        if last_info["block_hash"] == "mempool":
+            last_id = "mempool:%s" % len(result)
+        else:
+            last_id = last_info["block_hash"]
+        return last_id
+
+    def send_notify(self, result, response):
+        response["params"].append(self.mempool_n(result))
+        self.processor.push_response(response)
+
+class Backend:
+
+    def __init__(self, monitor):
         # Create 3 thread-pools each with 1 thread
         self.network_service = bitcoin.async_service(1)
         self.disk_service = bitcoin.async_service(1)
@@ -34,8 +150,9 @@ class Backend:
                             self.poller, self.transaction_pool)
         self.session.start(self.handle_start)
 
-        self.pool_buffer = history.MemoryPoolBuffer(self.transaction_pool,
-                                                    self.blockchain)
+        self.pool_buffer = \
+            history.MemoryPoolBuffer(self.transaction_pool,
+                                     self.blockchain, monitor)
 
     def handle_start(self, ec):
         if ec:
@@ -146,35 +263,56 @@ class AddressGetHistory:
 
 class AddressSubscribe:
 
-    def __init__(self, backend, processor):
+    def __init__(self, backend, processor, cache, monitor):
         self.backend = backend
         self.processor = processor
+        self.cache = cache
+        self.monitor = monitor
 
-    def subscribe(self, session, request):
+        self.backend.pool_buffer.cheat = self
+
+    def subscribe(self, request):
         address = str(request["params"][0])
         chain = self.backend.blockchain
         txpool = self.backend.transaction_pool
         membuf = self.backend.pool_buffer
         history.payment_history(chain, txpool, membuf, address,
-            bind(self.respond, _1, request))
+            bind(self.construct, _1, request))
 
     def construct(self, result, request):
         if result is None:
             response = {"id": request["id"], "result": None,
                         "error": {"message": "Error", "code": -4}}
             return
-        else:
-            response = {"id": request["id"], "result": result, "error": None}
+        last_id = self.monitor.mempool_n(result)
+        response = {"id": request["id"], "result": last_id, "error": None}
+        self.processor.push_response(response)
+        address = request["params"][0]
+        self.monitor.monitor(address, result)
+        # Cache result for get_history
+        self.cache.store(address, result)
+
+    def fetch_cached(self, request):
+        address = request["params"][0]
+        cached = self.cache.fetch(address)
+        if cached is None:
+            return False
+        response = {"id": request["id"], "result": cached, "error": None}
         self.processor.push_response(response)
+        return True
 
 class BlockchainProcessor(Processor):
 
     def __init__(self, config):
         Processor.__init__(self)
-        self.backend = Backend()
+        cache = HistoryCache()
+        monitor = MonitorAddress(self, cache)
+        self.backend = Backend(monitor)
+        monitor.backend = self.backend
         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
         self.address_get_history = AddressGetHistory(self.backend, self)
-        self.address_subscribe = AddressSubscribe(self.backend, self)
+        self.address_subscribe = \
+            AddressSubscribe(self.backend, self, cache, monitor)
 
     def stop(self):
         self.backend.stop()
@@ -184,9 +322,10 @@ class BlockchainProcessor(Processor):
         if request["method"] == "blockchain.numblocks.subscribe":
             self.numblocks_subscribe.subscribe(request)
         elif request["method"] == "blockchain.address.subscribe":
-            pass
+            self.address_subscribe.subscribe(request)
         elif request["method"] == "blockchain.address.get_history":
-            self.address_get_history.get(request)
+            if not self.address_subscribe.fetch_cached(request):
+                self.address_get_history.get(request)
         elif request["method"] == "blockchain.transaction.broadcast":
             self.broadcast_transaction(request)
 
index 0967535..4c225c2 100644 (file)
@@ -27,9 +27,10 @@ expiry_queue = ExpiryQueue()
 
 class MemoryPoolBuffer:
 
-    def __init__(self, txpool, chain):
+    def __init__(self, txpool, chain, monitor):
         self.txpool = txpool
         self.chain = chain
+        self.monitor = monitor
         # prevout: inpoint
         self.lookup_input = {}
         # payment_address: outpoint
@@ -38,20 +39,22 @@ class MemoryPoolBuffer:
         self.timestamps = {}
 
     def recv_tx(self, tx, handle_store):
-        tx_hash = bitcoin.hash_transaction(tx)
+        tx_hash = str(bitcoin.hash_transaction(tx))
         desc = (tx_hash, [], [])
         for input in tx.inputs:
-            desc[1].append(input.previous_output)
+            prevout = input.previous_output
+            desc[1].append((str(prevout.hash), prevout.index))
         for idx, output in enumerate(tx.outputs):
             address = bitcoin.payment_address()
             if address.extract(output.output_script):
-                desc[2].append((idx, address))
+                desc[2].append((idx, str(address)))
         self.txpool.store(tx,
             bind(self.confirmed, _1, desc),
             bind(self.mempool_stored, _1, desc, handle_store))
 
     def mempool_stored(self, ec, desc, handle_store):
         tx_hash, prevouts, addrs = desc
+        tx_hash = bitcoin.hash_digest(tx_hash)
         if ec:
             handle_store(ec)
             return
@@ -65,9 +68,11 @@ class MemoryPoolBuffer:
             self.lookup_address[str(address)] = outpoint
         self.timestamps[str(tx_hash)] = int(time.time())
         handle_store(ec)
+        self.monitor.tx_stored(desc)
 
     def confirmed(self, ec, desc):
         tx_hash, prevouts, addrs = desc
+        tx_hash = bitcoin.hash_digest(tx_hash)
         if ec:
             print "Problem confirming transaction", tx_hash, ec
             return
@@ -82,6 +87,7 @@ class MemoryPoolBuffer:
             outpoint.hash, outpoint.index = tx_hash, idx
             self.lookup_address.delete(str(address), outpoint)
         del self.timestamps[str(tx_hash)]
+        self.monitor.tx_confirmed(desc)
 
     def check(self, output_points, address, handle):
         class ExtendableDict(dict):
@@ -171,6 +177,9 @@ class History:
             bind(self.start_loading, _1, output_points))
 
     def start_loading(self, membuf_result, output_points):
+        if len(membuf_result) == 0 and len(output_points) == 0:
+            self.handle_finish([])
+            self.stopped()
         # Create a bunch of entry lines which are outputs and
         # then their corresponding input (if it exists)
         for outpoint in output_points:
@@ -432,15 +441,21 @@ if __name__ == "__main__":
                 print begin, " " * (12 - len(begin)), v
             print
 
+    class FakeMonitor:
+        def tx_stored(self, tx):
+            pass
+        def tx_confirmed(self, tx):
+            pass
+
     service = bitcoin.async_service(1)
     prefix = "/home/genjix/libbitcoin/database.old"
     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
     txpool = bitcoin.transaction_pool(service, chain)
-    membuf = MemoryPoolBuffer(txpool, chain)
+    membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor())
     membuf.recv_tx(tx_a, store_tx)
     membuf.recv_tx(tx_b, store_tx)
     raw_input()
-    address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
+    address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "18auo3rqfsjth3w2H9zyEz467DDFNNpMJP"
     #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
     print "Looking up", address
     payment_history(chain, txpool, membuf, address[0], finish)
index be61c73..446c37f 100644 (file)
@@ -40,7 +40,8 @@ class Processor(threading.Thread):
         pass
 
     def push_response(self, response):
-        self.dispatcher.request_dispatcher.push_response(response)
+        print "response", response
+        #self.dispatcher.request_dispatcher.push_response(response)