Merge branch 'master' of github.com:spesmilo/electrum-server
[electrum-server.git] / backends / libbitcoin / history.py
index f81027a..cc2a7ad 100644 (file)
@@ -1,14 +1,40 @@
+import threading
+import time
+
 import bitcoin
 from bitcoin import bind, _1, _2, _3
-import threading
 import multimap
-import time
+
+
+class ExpiryQueue(threading.Thread):
+
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.items = []
+        threading.Thread.__init__(self)
+        self.daemon = True
+
+    def run(self):
+        # Garbage collection
+        while True:
+            with self.lock:
+                self.items = [i for i in self.items if not i.stopped()]
+            time.sleep(0.1)
+
+    def add(self, item):
+        with self.lock:
+            self.items.append(item)
+
+
+expiry_queue = ExpiryQueue()
+
 
 class MemoryPoolBuffer:
 
-    def __init__(self, service, txpool, chain):
+    def __init__(self, txpool, chain, monitor):
         self.txpool = txpool
         self.chain = chain
+        self.monitor = monitor
         # prevout: inpoint
         self.lookup_input = {}
         # payment_address: outpoint
@@ -16,34 +42,39 @@ class MemoryPoolBuffer:
         # transaction timestamps
         self.timestamps = {}
 
-    def recv_tx(self, tx):
-        tx_hash = bitcoin.hash_transaction(tx)
+    def recv_tx(self, tx, handle_store):
+        tx_hash = str(bitcoin.hash_transaction(tx))
         desc = (tx_hash, [], [])
         for input in tx.inputs:
-            desc[1].append(input.previous_output)
+            prevout = input.previous_output
+            desc[1].append((str(prevout.hash), prevout.index))
         for idx, output in enumerate(tx.outputs):
             address = bitcoin.payment_address()
             if address.extract(output.output_script):
-                desc[2].append((idx, address))
-        self.txpool.store(tx,
+                desc[2].append((idx, str(address)))
+        self.txpool.store(
+            tx,
             bind(self.confirmed, _1, desc),
-            bind(self.mempool_stored, _1, desc))
+            bind(self.mempool_stored, _1, desc, handle_store)
+        )
 
-    def mempool_stored(self, ec, desc):
+    def mempool_stored(self, ec, desc, handle_store):
         tx_hash, prevouts, addrs = desc
         if ec:
-            print "Error storing memory pool transaction", tx_hash, ec
+            handle_store(ec)
             return
-        print "Accepted transaction", tx_hash
         for idx, prevout in enumerate(prevouts):
-            inpoint = bitcoin.input_point()
-            inpoint.hash, inpoint.index = tx_hash, idx
-            self.lookup_input[str(prevout)] = inpoint
+            #inpoint = bitcoin.input_point()
+            #inpoint.hash, inpoint.index = tx_hash, idx
+            prevout = "%s:%s" % prevout
+            self.lookup_input[prevout] = tx_hash, idx
         for idx, address in addrs:
-            outpoint = bitcoin.output_point()
-            outpoint.hash, outpoint.index = tx_hash, idx
-            self.lookup_address[str(address)] = outpoint
-        self.timestamps[str(tx_hash)] = int(time.time())
+            #outpoint = bitcoin.output_point()
+            #outpoint.hash, outpoint.index = tx_hash, idx
+            self.lookup_address[str(address)] = tx_hash, idx
+        self.timestamps[tx_hash] = int(time.time())
+        handle_store(ec)
+        self.monitor.tx_stored(desc)
 
     def confirmed(self, ec, desc):
         tx_hash, prevouts, addrs = desc
@@ -52,30 +83,33 @@ class MemoryPoolBuffer:
             return
         print "Confirmed", tx_hash
         for idx, prevout in enumerate(prevouts):
-            inpoint = bitcoin.input_point()
-            inpoint.hash, inpoint.index = tx_hash, idx
-            assert self.lookup_input[str(prevout)] == inpoint
-            del self.lookup_input[str(prevout)]
+            #inpoint = bitcoin.input_point()
+            #inpoint.hash, inpoint.index = tx_hash, idx
+            prevout = "%s:%s" % prevout
+            assert self.lookup_input[prevout] == (tx_hash, idx)
+            del self.lookup_input[prevout]
         for idx, address in addrs:
-            outpoint = bitcoin.output_point()
-            outpoint.hash, outpoint.index = tx_hash, idx
+            #outpoint = bitcoin.output_point()
+            #outpoint.hash, outpoint.index = tx_hash, idx
+            outpoint = tx_hash, idx
             self.lookup_address.delete(str(address), outpoint)
-        del self.timestamps[str(tx_hash)]
+        del self.timestamps[tx_hash]
+        self.monitor.tx_confirmed(desc)
 
     def check(self, output_points, address, handle):
         class ExtendableDict(dict):
             pass
         result = []
         for outpoint in output_points:
-            if self.lookup_input.has_key(str(outpoint)):
+            if str(outpoint) in self.lookup_input:
                 point = self.lookup_input[str(outpoint)]
                 info = ExtendableDict()
-                info["tx_hash"] = str(point.hash)
-                info["index"] = point.index
+                info["tx_hash"] = point[0]
+                info["index"] = point[1]
                 info["is_input"] = 1
                 info["timestamp"] = self.timestamps[info["tx_hash"]]
                 result.append(info)
-        if self.lookup_address.has_key(str(address)):
+        if str(address) in self.lookup_address:
             addr_points = self.lookup_address[str(address)]
             for point in addr_points:
                 info = ExtendableDict()
@@ -86,6 +120,7 @@ class MemoryPoolBuffer:
                 result.append(info)
         handle(result)
 
+
 class PaymentEntry:
 
     def __init__(self, output_point):
@@ -107,13 +142,13 @@ class PaymentEntry:
     def has_input(self):
         return self.input_point is not False
 
+
 class History:
 
-    def __init__(self, service, chain, txpool, membuf, mempool_counter):
+    def __init__(self, chain, txpool, membuf):
         self.chain = chain
         self.txpool = txpool
         self.membuf = membuf
-        self.mempool_counter = mempool_counter
         self.lock = threading.Lock()
         self._stopped = False
 
@@ -126,12 +161,11 @@ class History:
         address = bitcoin.payment_address(address)
         # To begin we fetch all the outputs (payments in)
         # associated with this address
-        self.chain.fetch_outputs(address,
-            bind(self.check_membuf, _1, _2))
+        self.chain.fetch_outputs(address, bind(self.check_membuf, _1, _2))
 
     def stop(self):
         with self.lock:
-            assert self._stopped == False
+            assert self._stopped is False
             self._stopped = True
 
     def stopped(self):
@@ -147,10 +181,12 @@ class History:
     def check_membuf(self, ec, output_points):
         if self.stop_on_error(ec):
             return
-        self.membuf.check(output_points, self.address,
-            bind(self.start_loading, _1, output_points))
+        self.membuf.check(output_points, self.address, bind(self.start_loading, _1, output_points))
 
     def start_loading(self, membuf_result, output_points):
+        if len(membuf_result) == 0 and len(output_points) == 0:
+            self.handle_finish([])
+            self.stopped()
         # Create a bunch of entry lines which are outputs and
         # then their corresponding input (if it exists)
         for outpoint in output_points:
@@ -158,15 +194,13 @@ class History:
             with self.lock:
                 self.statement.append(entry)
             # Attempt to fetch the spend of this output
-            self.chain.fetch_spend(outpoint,
-                bind(self.load_spend, _1, _2, entry))
+            self.chain.fetch_spend(outpoint, bind(self.load_spend, _1, _2, entry))
             self.load_tx_info(outpoint, entry, False)
         # Load memory pool transactions
         with self.lock:
             self.membuf_result = membuf_result
         for info in self.membuf_result:
-            self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
-                bind(self.load_pool_tx, _1, _2, info))
+            self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), bind(self.load_pool_tx, _1, _2, info))
 
     def load_spend(self, ec, inpoint, entry):
         # Need a custom self.stop_on_error(...) as a missing spend
@@ -197,7 +231,7 @@ class History:
             if any(not entry.is_loaded() for entry in self.statement):
                 return
             # Memory buffer transactions finished loading?
-            if any(not info.has_key("height") for info in self.membuf_result):
+            if any("height" not in info for info in self.membuf_result):
                 return
         # Whole operation completed successfully! Finish up.
         result = []
@@ -220,15 +254,6 @@ class History:
             # Lookup prevout in result
             # Set "value" field
             if info["is_input"] == 1:
-                if not self.mempool_counter.has_key(info["tx_hash"]):
-                    if not self.mempool_counter:
-                        count = 0
-                    else:
-                        count = max(self.mempool_counter.values()) + 1
-                    self.mempool_counter[info["tx_hash"]] = count
-                else:
-                    count = self.mempool_counter[info["tx_hash"]]
-                info["block_hash"] = "mempool:%s" % count
                 prevout_tx = None
                 for prevout_info in result:
                     if prevout_info["tx_hash"] == info.previous_output.hash:
@@ -248,16 +273,14 @@ class History:
         # Before loading the transaction, Stratum requires the hash
         # of the parent block, so we load the block depth and then
         # fetch the block header and hash it.
-        self.chain.fetch_transaction_index(point.hash,
-            bind(self.tx_index, _1, _2, _3, entry, info))
+        self.chain.fetch_transaction_index(point.hash, bind(self.tx_index, _1, _2, _3, entry, info))
 
     def tx_index(self, ec, block_depth, offset, entry, info):
         if self.stop_on_error(ec):
             return
         info["height"] = block_depth
         # And now for the block hash
-        self.chain.fetch_block_header_by_depth(block_depth,
-            bind(self.block_header, _1, _2, entry, info))
+        self.chain.fetch_block_header_by_depth(block_depth, bind(self.block_header, _1, _2, entry, info))
 
     def block_header(self, ec, blk_head, entry, info):
         if self.stop_on_error(ec):
@@ -266,8 +289,7 @@ class History:
         info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
         tx_hash = bitcoin.hash_digest(info["tx_hash"])
         # Now load the actual main transaction for this input or output
-        self.chain.fetch_transaction(tx_hash,
-            bind(self.load_chain_tx, _1, _2, entry, info))
+        self.chain.fetch_transaction(tx_hash, bind(self.load_chain_tx, _1, _2, entry, info))
 
     def load_pool_tx(self, ec, tx, info):
         if self.stop_on_error(ec):
@@ -292,10 +314,13 @@ class History:
         if self.inputs_all_loaded(info["inputs"]):
             # We are the sole input
             assert(info["is_input"] == 1)
+            # No more inputs left to load
+            # This info has finished loading
+            info["height"] = None
+            info["block_hash"] = "mempool"
             self.finish_if_done()
         create_handler = lambda prevout_index, input_index: \
-            bind(self.load_input_pool_tx, _1, _2,
-                prevout_index, info, input_index)
+            bind(self.load_input_pool_tx, _1, _2, prevout_index, info, input_index)
         self.fetch_input_txs(tx, info, create_handler)
 
     def load_tx(self, tx, info):
@@ -349,8 +374,7 @@ class History:
                 entry.input_loaded = info
             self.finish_if_done()
         create_handler = lambda prevout_index, input_index: \
-            bind(self.load_input_chain_tx, _1, _2,
-                prevout_index, entry, info, input_index)
+            bind(self.load_input_chain_tx, _1, _2, prevout_index, entry, info, input_index)
         self.fetch_input_txs(tx, info, create_handler)
 
     def inputs_all_loaded(self, info_inputs):
@@ -393,6 +417,13 @@ class History:
             info["block_hash"] = "mempool"
         self.finish_if_done()
 
+
+def payment_history(chain, txpool, membuf, address, handle_finish):
+    h = History(chain, txpool, membuf)
+    expiry_queue.add(h)
+    h.start(address, handle_finish)
+
+
 if __name__ == "__main__":
     ex = bitcoin.satoshi_exporter()
     tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
@@ -404,6 +435,10 @@ if __name__ == "__main__":
 
     def blockchain_started(ec, chain):
         print "Blockchain initialisation:", ec
+
+    def store_tx(ec):
+        print "Tx", ec
+
     def finish(result):
         print "Finish"
         if result is None:
@@ -414,24 +449,33 @@ if __name__ == "__main__":
                 print begin, " " * (12 - len(begin)), v
             print
 
+    class FakeMonitor:
+        def tx_stored(self, tx):
+            pass
+
+        def tx_confirmed(self, tx):
+            pass
+
     service = bitcoin.async_service(1)
-    prefix = "/home/genjix/libbitcoin/database.old"
+    prefix = "/home/genjix/libbitcoin/database"
     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
     txpool = bitcoin.transaction_pool(service, chain)
-    local_service = bitcoin.AsyncService()
-    membuf = MemoryPoolBuffer(local_service, txpool, chain)
-    membuf.recv_tx(tx_a)
-    membuf.recv_tx(tx_b)
+    membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor())
+    membuf.recv_tx(tx_a, store_tx)
+    membuf.recv_tx(tx_b, store_tx)
+
+    txdat = bitcoin.data_chunk("0100000001d6cad920a04acd6c0609cd91fe4dafa1f3b933ac90e032c78fdc19d98785f2bb010000008b483045022043f8ce02784bd7231cb362a602920f2566c18e1877320bf17d4eabdac1019b2f022100f1fd06c57330683dff50e1b4571fb0cdab9592f36e3d7e98d8ce3f94ce3f255b01410453aa8d5ddef56731177915b7b902336109326f883be759ec9da9c8f1212c6fa3387629d06e5bf5e6bcc62ec5a70d650c3b1266bb0bcc65ca900cff5311cb958bffffffff0280969800000000001976a9146025cabdbf823949f85595f3d1c54c54cd67058b88ac602d2d1d000000001976a914c55c43631ab14f7c4fd9c5f153f6b9123ec32c8888ac00000000")
+    req = {"id": 110, "params": ["1GULoCDnGjhfSWzHs6zDzBxbKt9DR7uRbt"]}
+    ex = bitcoin.satoshi_exporter()
+    tx = ex.load_transaction(txdat)
+    time.sleep(4)
+    membuf.recv_tx(tx, store_tx)
+
     raw_input()
-    address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
+    address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1GULoCDnGjhfSWzHs6zDzBxbKt9DR7uRbt"
     #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
     print "Looking up", address
-    mempool_count = {}
-    h = History(local_service, chain, txpool, membuf, mempool_count)
-    h.start(address[0], finish)
-    raw_input()
-    h1 = History(local_service, chain, txpool, membuf, mempool_count)
-    h1.start(address[1], finish)
+    payment_history(chain, txpool, membuf, address[0], finish)
+    #payment_history(chain, txpool, membuf, address[1], finish)
     raw_input()
     print "Stopping..."
-