blockchain.address.get_history memory pool transactions.
authorgenjix <fake@lol.u>
Sun, 15 Apr 2012 17:25:47 +0000 (18:25 +0100)
committergenjix <fake@lol.u>
Sun, 15 Apr 2012 17:25:47 +0000 (18:25 +0100)
backends/libbitcoin/history.py
backends/libbitcoin/mempool_buffer.py [deleted file]
backends/libbitcoin/multimap.py

index ca629a8..683d345 100644 (file)
@@ -1,6 +1,89 @@
 import bitcoin
-from bitcoin import _1, _2, _3
+from bitcoin import bind, _1, _2, _3
 import threading
+import multimap
+import time
+
+class MemoryPoolBuffer:
+
+    def __init__(self, service, txpool, chain):
+        self.txpool = txpool
+        self.chain = chain
+        # prevout: inpoint
+        self.lookup_input = {}
+        # payment_address: outpoint
+        self.lookup_address = multimap.MultiMap()
+        # transaction timestamps
+        self.timestamps = {}
+
+    def recv_tx(self, tx):
+        tx_hash = bitcoin.hash_transaction(tx)
+        desc = (tx_hash, [], [])
+        for input in tx.inputs:
+            desc[1].append(input.previous_output)
+        for idx, output in enumerate(tx.outputs):
+            address = bitcoin.payment_address()
+            if address.extract(output.output_script):
+                desc[2].append((idx, address))
+        self.txpool.store(tx,
+            bind(self.confirmed, _1, desc),
+            bind(self.mempool_stored, _1, desc))
+
+    def mempool_stored(self, ec, desc):
+        tx_hash, prevouts, addrs = desc
+        if ec:
+            print "Error storing memory pool transaction", tx_hash, ec
+            return
+        print "Accepted transaction", tx_hash
+        for idx, prevout in enumerate(prevouts):
+            inpoint = bitcoin.input_point()
+            inpoint.hash, inpoint.index = tx_hash, idx
+            self.lookup_input[str(prevout)] = inpoint
+        for idx, address in addrs:
+            outpoint = bitcoin.output_point()
+            outpoint.hash, outpoint.index = tx_hash, idx
+            self.lookup_address[str(address)] = outpoint
+        self.timestamps[str(tx_hash)] = int(time.time())
+
+    def confirmed(self, ec, desc):
+        tx_hash, prevouts, addrs = desc
+        if ec:
+            print "Problem confirming transaction", tx_hash, ec
+            return
+        print "Confirmed", tx_hash
+        for idx, prevout in enumerate(prevouts):
+            inpoint = bitcoin.input_point()
+            inpoint.hash, inpoint.index = tx_hash, idx
+            assert self.lookup_input[str(prevout)] == inpoint
+            del self.lookup_input[str(prevout)]
+        for idx, address in addrs:
+            outpoint = bitcoin.output_point()
+            outpoint.hash, outpoint.index = tx_hash, idx
+            self.lookup_address.delete(str(address), outpoint)
+        del self.timestamps[str(tx_hash)]
+
+    def check(self, output_points, address, handle):
+        class ExtendableDict(dict):
+            pass
+        result = []
+        for outpoint in output_points:
+            if self.lookup_input.has_key(str(outpoint)):
+                point = self.lookup_input[str(outpoint)]
+                info = ExtendableDict()
+                info["tx_hash"] = str(point.hash)
+                info["index"] = point.index
+                info["is_input"] = 1
+                info["timestamp"] = self.timestamps[info["tx_hash"]]
+                result.append(info)
+        if self.lookup_address.has_key(str(address)):
+            point = self.lookup_address[str(address)]
+            info = ExtendableDict()
+            info["tx_hash"] = str(point.hash)
+            info["index"] = point.index
+            info["is_input"] = 0
+            info["timestamp"] = self.timestamps[info["tx_hash"]]
+            result.append(info)
+        handle(result)
 
 class PaymentEntry:
 
@@ -25,11 +108,13 @@ class PaymentEntry:
 
 class History:
 
-    def __init__(self, service, chain):
-        self.wrap = bitcoin.Strand(service).wrap
+    def __init__(self, service, chain, txpool, membuf):
         self.chain = chain
+        self.txpool = txpool
+        self.membuf = membuf
         self.lock = threading.Lock()
         self.statement = []
+        self.membuf_result = None
         self._stopped = False
 
     def start(self, address, handle_finish):
@@ -40,7 +125,7 @@ class History:
         # To begin we fetch all the outputs (payments in)
         # associated with this address
         self.chain.fetch_outputs(address,
-            self.wrap(self.start_loading, _1, _2))
+            bind(self.check_membuf, _1, _2))
 
     def stop(self):
         with self.lock:
@@ -57,9 +142,13 @@ class History:
             self.stop()
         return self.stopped()
 
-    def start_loading(self, ec, output_points):
+    def check_membuf(self, ec, output_points):
         if self.stop_on_error(ec):
             return
+        self.membuf.check(output_points, self.address,
+            bind(self.start_loading, _1, output_points))
+
+    def start_loading(self, membuf_result, output_points):
         # Create a bunch of entry lines which are outputs and
         # then their corresponding input (if it exists)
         for outpoint in output_points:
@@ -68,24 +157,31 @@ class History:
                 self.statement.append(entry)
             # Attempt to fetch the spend of this output
             self.chain.fetch_spend(outpoint,
-                self.wrap(self.load_spend, _1, _2, entry))
+                bind(self.load_spend, _1, _2, entry))
             self.load_tx_info(outpoint, entry, False)
+        # Load memory pool transactions
+        with self.lock:
+            self.membuf_result = membuf_result
+        for info in self.membuf_result:
+            self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
+                bind(self.load_pool_tx, _1, _2, info))
 
     def load_spend(self, ec, inpoint, entry):
         # Need a custom self.stop_on_error(...) as a missing spend
         # is not an error in this case.
-        if ec and ec != bitcoin.error.missing_object:
+        if not self.stopped() and ec and ec != bitcoin.error.unspent_output:
+            self.handle_finish(None)
             self.stop()
         if self.stopped():
             return
         with entry.lock:
-            if ec == bitcoin.error.missing_object:
+            if ec == bitcoin.error.unspent_output:
                 # This particular entry.output_point
                 # has not been spent yet
                 entry.input_point = False
             else:
                 entry.input_point = inpoint
-        if ec == bitcoin.error.missing_object:
+        if ec == bitcoin.error.unspent_output:
             # Attempt to stop if all the info for the inputs and outputs
             # has been loaded.
             self.finish_if_done()
@@ -98,6 +194,9 @@ class History:
             # Still have more entries to finish loading, so return
             if any(not entry.is_loaded() for entry in self.statement):
                 return
+            # Memory buffer transactions finished loading?
+            if any(not info.has_key("height") for info in self.membuf_result):
+                return
         # Whole operation completed successfully! Finish up.
         result = []
         for entry in self.statement:
@@ -114,6 +213,19 @@ class History:
                 entry.output_loaded["raw_output_script"] = \
                     entry.raw_output_script
                 result.append(entry.output_loaded)
+        mempool_result = []
+        for info in self.membuf_result:
+            # Lookup prevout in result
+            # Set "value" field
+            if info["is_input"] == 1:
+                prevout_tx = None
+                for prevout_info in result:
+                    if prevout_info["tx_hash"] == info.previous_output.hash:
+                        prevout_tx = prevout_info
+                assert prevout_tx is not None
+                info["value"] = -prevout_info["value"]
+            mempool_result.append(info)
+        result.extend(mempool_result)
         self.handle_finish(result)
         self.stop()
 
@@ -126,7 +238,7 @@ class History:
         # of the parent block, so we load the block depth and then
         # fetch the block header and hash it.
         self.chain.fetch_transaction_index(point.hash,
-            self.wrap(self.tx_index, _1, _2, _3, entry, info))
+            bind(self.tx_index, _1, _2, _3, entry, info))
 
     def tx_index(self, ec, block_depth, offset, entry, info):
         if self.stop_on_error(ec):
@@ -134,7 +246,7 @@ class History:
         info["height"] = block_depth
         # And now for the block hash
         self.chain.fetch_block_header_by_depth(block_depth,
-            self.wrap(self.block_header, _1, _2, entry, info))
+            bind(self.block_header, _1, _2, entry, info))
 
     def block_header(self, ec, blk_head, entry, info):
         if self.stop_on_error(ec):
@@ -144,11 +256,38 @@ class History:
         tx_hash = bitcoin.hash_digest(info["tx_hash"])
         # Now load the actual main transaction for this input or output
         self.chain.fetch_transaction(tx_hash,
-            self.wrap(self.load_tx, _1, _2, entry, info))
+            bind(self.load_chain_tx, _1, _2, entry, info))
 
-    def load_tx(self, ec, tx, entry, info):
+    def load_pool_tx(self, ec, tx, info):
         if self.stop_on_error(ec):
             return
+        # block_hash = mempool:5
+        # inputs (load from prevtx)
+        # outputs (load from tx)
+        # raw_output_script (load from tx)
+        # height is always None
+        # value (get from finish_if_done)
+        self.load_tx(tx, info)
+        if info["is_input"] == 0:
+            our_output = tx.outputs[info["index"]]
+            info["value"] = our_output.value
+            # Save serialised output script in case this output is unspent
+            info["raw_output_script"] = \
+                str(bitcoin.save_script(our_output.output_script))
+        else:
+            assert(info["is_input"] == 1)
+            info.previous_output = tx.inputs[info["index"]].previous_output
+        # If all the inputs are loaded
+        if self.inputs_all_loaded(info["inputs"]):
+            # We are the sole input
+            assert(info["is_input"] == 1)
+            self.finish_if_done()
+        create_handler = lambda prevout_index, input_index: \
+            bind(self.load_input_pool_tx, _1, _2,
+                prevout_index, info, input_index)
+        self.fetch_input_txs(tx, info, create_handler)
+
+    def load_tx(self, tx, info):
         # List of output addresses
         outputs = []
         for tx_out in tx.outputs:
@@ -169,7 +308,22 @@ class History:
         # have a source address for at least one input.
         if info["is_input"] == 1:
             info["inputs"][info["index"]] = self.address
-        else:
+
+    def fetch_input_txs(self, tx, info, create_handler):
+        # Load the previous_output for every input so we can get
+        # the output address
+        for input_index, tx_input in enumerate(tx.inputs):
+            if info["is_input"] == 1 and info["index"] == input_index:
+                continue
+            prevout = tx_input.previous_output
+            handler = create_handler(prevout.index, input_index)
+            self.chain.fetch_transaction(prevout.hash, handler)
+
+    def load_chain_tx(self, ec, tx, entry, info):
+        if self.stop_on_error(ec):
+            return
+        self.load_tx(tx, info)
+        if info["is_input"] == 0:
             our_output = tx.outputs[info["index"]]
             info["value"] = our_output.value
             # Save serialised output script in case this output is unspent
@@ -183,22 +337,15 @@ class History:
             with entry.lock:
                 entry.input_loaded = info
             self.finish_if_done()
-        # Load the previous_output for every input so we can get
-        # the output address
-        for input_index, tx_input in enumerate(tx.inputs):
-            if info["is_input"] == 1 and info["index"] == input_index:
-                continue
-            prevout = tx_input.previous_output
-            self.chain.fetch_transaction(prevout.hash,
-                self.wrap(self.load_input_tx, _1, _2,
-                     prevout.index, entry, info, input_index))
+        create_handler = lambda prevout_index, input_index: \
+            bind(self.load_input_chain_tx, _1, _2,
+                prevout_index, entry, info, input_index)
+        self.fetch_input_txs(tx, info, create_handler)
 
     def inputs_all_loaded(self, info_inputs):
         return not [empty_in for empty_in in info_inputs if empty_in is None]
 
-    def load_input_tx(self, ec, tx, output_index, entry, info, input_index):
-        if self.stop_on_error(ec):
-            return
+    def load_input_tx(self, tx, output_index, info, input_index):
         # For our input, we load the previous tx so we can get the
         # corresponding output.
         # We need the output to extract the address.
@@ -208,6 +355,12 @@ class History:
             info["inputs"][input_index] = address.encoded()
         else:
             info["inputs"][input_index] = "Unknown"
+
+    def load_input_chain_tx(self, ec, tx, output_index,
+                            entry, info, input_index):
+        if self.stop_on_error(ec):
+            return
+        self.load_input_tx(tx, output_index, info, input_index)
         # If all the inputs are loaded, then we have finished loading
         # the info for this input-output entry pair
         if self.inputs_all_loaded(info["inputs"]):
@@ -218,10 +371,31 @@ class History:
                     entry.output_loaded = info
         self.finish_if_done()
 
+    def load_input_pool_tx(self, ec, tx, output_index, info, input_index):
+        if self.stop_on_error(ec):
+            return
+        self.load_input_tx(tx, output_index, info, input_index)
+        if not [inp for inp in info["inputs"] if inp is None]:
+            # No more inputs left to load
+            # This info has finished loading
+            info["height"] = None
+        self.finish_if_done()
+
 if __name__ == "__main__":
+    ex = bitcoin.satoshi_exporter()
+    tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
+    tx_a = ex.load_transaction(tx_a)
+    assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
+    tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
+    tx_b = ex.load_transaction(tx_b)
+    assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
+
     def blockchain_started(ec, chain):
         print "Blockchain initialisation:", ec
     def finish(result):
+        print "Finish"
+        if result is None:
+            return
         for line in result:
             for k, v in line.iteritems():
                 begin = k + ":"
@@ -229,12 +403,19 @@ if __name__ == "__main__":
             print
 
     service = bitcoin.async_service(1)
-    prefix = "/home/genjix/libbitcoin/database"
+    prefix = "/home/genjix/libbitcoin/database.old"
     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
-    address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
-    print "Looking up", address
+    txpool = bitcoin.transaction_pool(service, chain)
     local_service = bitcoin.AsyncService()
-    h = History(local_service, chain)
+    membuf = MemoryPoolBuffer(local_service, txpool, chain)
+    membuf.recv_tx(tx_a)
+    membuf.recv_tx(tx_b)
+    raw_input()
+    #address = bitcoin.payment_address("1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D")
+    address = "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
+    #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
+    print "Looking up", address
+    h = History(local_service, chain, txpool, membuf)
     h.start(address, finish)
     raw_input()
     print "Stopping..."
diff --git a/backends/libbitcoin/mempool_buffer.py b/backends/libbitcoin/mempool_buffer.py
deleted file mode 100644 (file)
index 3ea5cc6..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-import bitcoin
-from bitcoin import _1, _2
-import multimap
-
-class MemoryPoolBuffer:
-
-    def __init__(self, service, txpool):
-        self.wrap = bitcoin.Strand(service).wrap
-        self.txpool = txpool
-        # prevout: inpoint
-        self.lookup_input = {}
-        # payment_address: outpoint
-        self.lookup_address = multimap.MultiMap()
-
-    def recv_tx(self, tx):
-        tx_hash = bitcoin.hash_transaction(tx)
-        desc = (tx_hash, [], [])
-        for input in tx.inputs:
-            desc[1].append(input.previous_output)
-        for idx, output in enumerate(tx.outputs):
-            address = bitcoin.payment_address()
-            if address.extract(output.output_script):
-                desc[2].append((idx, address))
-        self.txpool.store(tx,
-            self.wrap(self.confirmed, _1, desc),
-            self.wrap(self.mempool_stored, _1, desc))
-
-    def mempool_stored(self, ec, desc):
-        tx_hash, prevouts, addrs = desc
-        if ec:
-            print "Error storing memory pool transaction", tx_hash, ec
-            return
-        print "Accepted transaction", tx_hash
-        for idx, prevout in enumerate(prevouts):
-            inpoint = bitcoin.input_point()
-            inpoint.hash, inpoint.index = tx_hash, idx
-            self.lookup_input[prevout] = inpoint
-        for idx, address in addrs:
-            outpoint = bitcoin.output_point()
-            outpoint.hash, outpoint.index = tx_hash, idx
-            self.lookup_address[str(address)] = outpoint
-
-    def confirmed(self, ec, desc):
-        tx_hash, prevouts, addrs = desc
-        if ec:
-            print "Problem confirming transaction", tx_hash, ec
-            return
-        print "Confirmed", tx_hash
-        for idx, prevout in enumerate(prevouts):
-            inpoint = bitcoin.input_point()
-            inpoint.hash, inpoint.index = tx_hash, idx
-            assert self.lookup_input[prevout] == inpoint
-            del self.lookup_input[prevout]
-        for idx, address in addrs:
-            outpoint = bitcoin.output_point()
-            outpoint.hash, outpoint.index = tx_hash, idx
-            self.lookup_address.delete(str(address), outpoint)
-
-if __name__ == "__main__":
-    ex = bitcoin.satoshi_exporter()
-    tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
-    tx_a = ex.load_transaction(tx_a)
-    assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
-    tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
-    tx_b = ex.load_transaction(tx_b)
-    assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
-
-    def blockchain_started(ec, chain):
-        print "Blockchain initialisation:", ec
-
-    service = bitcoin.async_service(1)
-    prefix = "/home/genjix/libbitcoin/database"
-    chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
-    txpool = bitcoin.transaction_pool(service, chain)
-    local_service = bitcoin.AsyncService()
-    membuf = MemoryPoolBuffer(local_service, txpool)
-    membuf.recv_tx(tx_a)
-    membuf.recv_tx(tx_b)
-    print "Started."
-    raw_input()
-    print "Stopping..."
-
index 02db35f..c0d4dca 100644 (file)
@@ -25,6 +25,9 @@ class MultiMap:
     def __str__(self):
         return str(self.multi)
 
+    def has_key(self, key):
+        return self.multi.has_key(key)
+
 if __name__ == "__main__":
     m = MultiMap()
     m["foo"] = 1