From: genjix Date: Sun, 15 Apr 2012 17:25:47 +0000 (+0100) Subject: blockchain.address.get_history memory pool transactions. X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=f6554c8e465df545abd61e0938198262be1abab6 blockchain.address.get_history memory pool transactions. --- diff --git a/backends/libbitcoin/history.py b/backends/libbitcoin/history.py index ca629a8..683d345 100644 --- a/backends/libbitcoin/history.py +++ b/backends/libbitcoin/history.py @@ -1,6 +1,89 @@ import bitcoin -from bitcoin import _1, _2, _3 +from bitcoin import bind, _1, _2, _3 import threading +import multimap +import time + +class MemoryPoolBuffer: + + def __init__(self, service, txpool, chain): + self.txpool = txpool + self.chain = chain + # prevout: inpoint + self.lookup_input = {} + # payment_address: outpoint + self.lookup_address = multimap.MultiMap() + # transaction timestamps + self.timestamps = {} + + def recv_tx(self, tx): + tx_hash = bitcoin.hash_transaction(tx) + desc = (tx_hash, [], []) + for input in tx.inputs: + desc[1].append(input.previous_output) + for idx, output in enumerate(tx.outputs): + address = bitcoin.payment_address() + if address.extract(output.output_script): + desc[2].append((idx, address)) + self.txpool.store(tx, + bind(self.confirmed, _1, desc), + bind(self.mempool_stored, _1, desc)) + + def mempool_stored(self, ec, desc): + tx_hash, prevouts, addrs = desc + if ec: + print "Error storing memory pool transaction", tx_hash, ec + return + print "Accepted transaction", tx_hash + for idx, prevout in enumerate(prevouts): + inpoint = bitcoin.input_point() + inpoint.hash, inpoint.index = tx_hash, idx + self.lookup_input[str(prevout)] = inpoint + for idx, address in addrs: + outpoint = bitcoin.output_point() + outpoint.hash, outpoint.index = tx_hash, idx + self.lookup_address[str(address)] = outpoint + self.timestamps[str(tx_hash)] = int(time.time()) + + def confirmed(self, ec, desc): + tx_hash, prevouts, addrs = desc + if ec: + print "Problem confirming transaction", tx_hash, ec + return + print "Confirmed", tx_hash + for idx, prevout in enumerate(prevouts): + inpoint = bitcoin.input_point() + inpoint.hash, inpoint.index = tx_hash, idx + assert self.lookup_input[str(prevout)] == inpoint + del self.lookup_input[str(prevout)] + for idx, address in addrs: + outpoint = bitcoin.output_point() + outpoint.hash, outpoint.index = tx_hash, idx + self.lookup_address.delete(str(address), outpoint) + del self.timestamps[str(tx_hash)] + + def check(self, output_points, address, handle): + class ExtendableDict(dict): + pass + result = [] + for outpoint in output_points: + if self.lookup_input.has_key(str(outpoint)): + point = self.lookup_input[str(outpoint)] + info = ExtendableDict() + info["tx_hash"] = str(point.hash) + info["index"] = point.index + info["is_input"] = 1 + info["timestamp"] = self.timestamps[info["tx_hash"]] + result.append(info) + if self.lookup_address.has_key(str(address)): + point = self.lookup_address[str(address)] + info = ExtendableDict() + info["tx_hash"] = str(point.hash) + info["index"] = point.index + info["is_input"] = 0 + info["timestamp"] = self.timestamps[info["tx_hash"]] + result.append(info) + handle(result) class PaymentEntry: @@ -25,11 +108,13 @@ class PaymentEntry: class History: - def __init__(self, service, chain): - self.wrap = bitcoin.Strand(service).wrap + def __init__(self, service, chain, txpool, membuf): self.chain = chain + self.txpool = txpool + self.membuf = membuf self.lock = threading.Lock() self.statement = [] + self.membuf_result = None self._stopped = False def start(self, address, handle_finish): @@ -40,7 +125,7 @@ class History: # To begin we fetch all the outputs (payments in) # associated with this address self.chain.fetch_outputs(address, - self.wrap(self.start_loading, _1, _2)) + bind(self.check_membuf, _1, _2)) def stop(self): with self.lock: @@ -57,9 +142,13 @@ class History: self.stop() return self.stopped() - def start_loading(self, ec, output_points): + def check_membuf(self, ec, output_points): if self.stop_on_error(ec): return + self.membuf.check(output_points, self.address, + bind(self.start_loading, _1, output_points)) + + def start_loading(self, membuf_result, output_points): # Create a bunch of entry lines which are outputs and # then their corresponding input (if it exists) for outpoint in output_points: @@ -68,24 +157,31 @@ class History: self.statement.append(entry) # Attempt to fetch the spend of this output self.chain.fetch_spend(outpoint, - self.wrap(self.load_spend, _1, _2, entry)) + bind(self.load_spend, _1, _2, entry)) self.load_tx_info(outpoint, entry, False) + # Load memory pool transactions + with self.lock: + self.membuf_result = membuf_result + for info in self.membuf_result: + self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), + bind(self.load_pool_tx, _1, _2, info)) def load_spend(self, ec, inpoint, entry): # Need a custom self.stop_on_error(...) as a missing spend # is not an error in this case. - if ec and ec != bitcoin.error.missing_object: + if not self.stopped() and ec and ec != bitcoin.error.unspent_output: + self.handle_finish(None) self.stop() if self.stopped(): return with entry.lock: - if ec == bitcoin.error.missing_object: + if ec == bitcoin.error.unspent_output: # This particular entry.output_point # has not been spent yet entry.input_point = False else: entry.input_point = inpoint - if ec == bitcoin.error.missing_object: + if ec == bitcoin.error.unspent_output: # Attempt to stop if all the info for the inputs and outputs # has been loaded. self.finish_if_done() @@ -98,6 +194,9 @@ class History: # Still have more entries to finish loading, so return if any(not entry.is_loaded() for entry in self.statement): return + # Memory buffer transactions finished loading? + if any(not info.has_key("height") for info in self.membuf_result): + return # Whole operation completed successfully! Finish up. result = [] for entry in self.statement: @@ -114,6 +213,19 @@ class History: entry.output_loaded["raw_output_script"] = \ entry.raw_output_script result.append(entry.output_loaded) + mempool_result = [] + for info in self.membuf_result: + # Lookup prevout in result + # Set "value" field + if info["is_input"] == 1: + prevout_tx = None + for prevout_info in result: + if prevout_info["tx_hash"] == info.previous_output.hash: + prevout_tx = prevout_info + assert prevout_tx is not None + info["value"] = -prevout_info["value"] + mempool_result.append(info) + result.extend(mempool_result) self.handle_finish(result) self.stop() @@ -126,7 +238,7 @@ class History: # of the parent block, so we load the block depth and then # fetch the block header and hash it. self.chain.fetch_transaction_index(point.hash, - self.wrap(self.tx_index, _1, _2, _3, entry, info)) + bind(self.tx_index, _1, _2, _3, entry, info)) def tx_index(self, ec, block_depth, offset, entry, info): if self.stop_on_error(ec): @@ -134,7 +246,7 @@ class History: info["height"] = block_depth # And now for the block hash self.chain.fetch_block_header_by_depth(block_depth, - self.wrap(self.block_header, _1, _2, entry, info)) + bind(self.block_header, _1, _2, entry, info)) def block_header(self, ec, blk_head, entry, info): if self.stop_on_error(ec): @@ -144,11 +256,38 @@ class History: tx_hash = bitcoin.hash_digest(info["tx_hash"]) # Now load the actual main transaction for this input or output self.chain.fetch_transaction(tx_hash, - self.wrap(self.load_tx, _1, _2, entry, info)) + bind(self.load_chain_tx, _1, _2, entry, info)) - def load_tx(self, ec, tx, entry, info): + def load_pool_tx(self, ec, tx, info): if self.stop_on_error(ec): return + # block_hash = mempool:5 + # inputs (load from prevtx) + # outputs (load from tx) + # raw_output_script (load from tx) + # height is always None + # value (get from finish_if_done) + self.load_tx(tx, info) + if info["is_input"] == 0: + our_output = tx.outputs[info["index"]] + info["value"] = our_output.value + # Save serialised output script in case this output is unspent + info["raw_output_script"] = \ + str(bitcoin.save_script(our_output.output_script)) + else: + assert(info["is_input"] == 1) + info.previous_output = tx.inputs[info["index"]].previous_output + # If all the inputs are loaded + if self.inputs_all_loaded(info["inputs"]): + # We are the sole input + assert(info["is_input"] == 1) + self.finish_if_done() + create_handler = lambda prevout_index, input_index: \ + bind(self.load_input_pool_tx, _1, _2, + prevout_index, info, input_index) + self.fetch_input_txs(tx, info, create_handler) + + def load_tx(self, tx, info): # List of output addresses outputs = [] for tx_out in tx.outputs: @@ -169,7 +308,22 @@ class History: # have a source address for at least one input. if info["is_input"] == 1: info["inputs"][info["index"]] = self.address - else: + + def fetch_input_txs(self, tx, info, create_handler): + # Load the previous_output for every input so we can get + # the output address + for input_index, tx_input in enumerate(tx.inputs): + if info["is_input"] == 1 and info["index"] == input_index: + continue + prevout = tx_input.previous_output + handler = create_handler(prevout.index, input_index) + self.chain.fetch_transaction(prevout.hash, handler) + + def load_chain_tx(self, ec, tx, entry, info): + if self.stop_on_error(ec): + return + self.load_tx(tx, info) + if info["is_input"] == 0: our_output = tx.outputs[info["index"]] info["value"] = our_output.value # Save serialised output script in case this output is unspent @@ -183,22 +337,15 @@ class History: with entry.lock: entry.input_loaded = info self.finish_if_done() - # Load the previous_output for every input so we can get - # the output address - for input_index, tx_input in enumerate(tx.inputs): - if info["is_input"] == 1 and info["index"] == input_index: - continue - prevout = tx_input.previous_output - self.chain.fetch_transaction(prevout.hash, - self.wrap(self.load_input_tx, _1, _2, - prevout.index, entry, info, input_index)) + create_handler = lambda prevout_index, input_index: \ + bind(self.load_input_chain_tx, _1, _2, + prevout_index, entry, info, input_index) + self.fetch_input_txs(tx, info, create_handler) def inputs_all_loaded(self, info_inputs): return not [empty_in for empty_in in info_inputs if empty_in is None] - def load_input_tx(self, ec, tx, output_index, entry, info, input_index): - if self.stop_on_error(ec): - return + def load_input_tx(self, tx, output_index, info, input_index): # For our input, we load the previous tx so we can get the # corresponding output. # We need the output to extract the address. @@ -208,6 +355,12 @@ class History: info["inputs"][input_index] = address.encoded() else: info["inputs"][input_index] = "Unknown" + + def load_input_chain_tx(self, ec, tx, output_index, + entry, info, input_index): + if self.stop_on_error(ec): + return + self.load_input_tx(tx, output_index, info, input_index) # If all the inputs are loaded, then we have finished loading # the info for this input-output entry pair if self.inputs_all_loaded(info["inputs"]): @@ -218,10 +371,31 @@ class History: entry.output_loaded = info self.finish_if_done() + def load_input_pool_tx(self, ec, tx, output_index, info, input_index): + if self.stop_on_error(ec): + return + self.load_input_tx(tx, output_index, info, input_index) + if not [inp for inp in info["inputs"] if inp is None]: + # No more inputs left to load + # This info has finished loading + info["height"] = None + self.finish_if_done() + if __name__ == "__main__": + ex = bitcoin.satoshi_exporter() + tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000") + tx_a = ex.load_transaction(tx_a) + assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397" + tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000") + tx_b = ex.load_transaction(tx_b) + assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac" + def blockchain_started(ec, chain): print "Blockchain initialisation:", ec def finish(result): + print "Finish" + if result is None: + return for line in result: for k, v in line.iteritems(): begin = k + ":" @@ -229,12 +403,19 @@ if __name__ == "__main__": print service = bitcoin.async_service(1) - prefix = "/home/genjix/libbitcoin/database" + prefix = "/home/genjix/libbitcoin/database.old" chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started) - address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" - print "Looking up", address + txpool = bitcoin.transaction_pool(service, chain) local_service = bitcoin.AsyncService() - h = History(local_service, chain) + membuf = MemoryPoolBuffer(local_service, txpool, chain) + membuf.recv_tx(tx_a) + membuf.recv_tx(tx_b) + raw_input() + #address = bitcoin.payment_address("1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D") + address = "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR" + #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" + print "Looking up", address + h = History(local_service, chain, txpool, membuf) h.start(address, finish) raw_input() print "Stopping..." diff --git a/backends/libbitcoin/mempool_buffer.py b/backends/libbitcoin/mempool_buffer.py deleted file mode 100644 index 3ea5cc6..0000000 --- a/backends/libbitcoin/mempool_buffer.py +++ /dev/null @@ -1,82 +0,0 @@ -import bitcoin -from bitcoin import _1, _2 -import multimap - -class MemoryPoolBuffer: - - def __init__(self, service, txpool): - self.wrap = bitcoin.Strand(service).wrap - self.txpool = txpool - # prevout: inpoint - self.lookup_input = {} - # payment_address: outpoint - self.lookup_address = multimap.MultiMap() - - def recv_tx(self, tx): - tx_hash = bitcoin.hash_transaction(tx) - desc = (tx_hash, [], []) - for input in tx.inputs: - desc[1].append(input.previous_output) - for idx, output in enumerate(tx.outputs): - address = bitcoin.payment_address() - if address.extract(output.output_script): - desc[2].append((idx, address)) - self.txpool.store(tx, - self.wrap(self.confirmed, _1, desc), - self.wrap(self.mempool_stored, _1, desc)) - - def mempool_stored(self, ec, desc): - tx_hash, prevouts, addrs = desc - if ec: - print "Error storing memory pool transaction", tx_hash, ec - return - print "Accepted transaction", tx_hash - for idx, prevout in enumerate(prevouts): - inpoint = bitcoin.input_point() - inpoint.hash, inpoint.index = tx_hash, idx - self.lookup_input[prevout] = inpoint - for idx, address in addrs: - outpoint = bitcoin.output_point() - outpoint.hash, outpoint.index = tx_hash, idx - self.lookup_address[str(address)] = outpoint - - def confirmed(self, ec, desc): - tx_hash, prevouts, addrs = desc - if ec: - print "Problem confirming transaction", tx_hash, ec - return - print "Confirmed", tx_hash - for idx, prevout in enumerate(prevouts): - inpoint = bitcoin.input_point() - inpoint.hash, inpoint.index = tx_hash, idx - assert self.lookup_input[prevout] == inpoint - del self.lookup_input[prevout] - for idx, address in addrs: - outpoint = bitcoin.output_point() - outpoint.hash, outpoint.index = tx_hash, idx - self.lookup_address.delete(str(address), outpoint) - -if __name__ == "__main__": - ex = bitcoin.satoshi_exporter() - tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000") - tx_a = ex.load_transaction(tx_a) - assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397" - tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000") - tx_b = ex.load_transaction(tx_b) - assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac" - - def blockchain_started(ec, chain): - print "Blockchain initialisation:", ec - - service = bitcoin.async_service(1) - prefix = "/home/genjix/libbitcoin/database" - chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started) - txpool = bitcoin.transaction_pool(service, chain) - local_service = bitcoin.AsyncService() - membuf = MemoryPoolBuffer(local_service, txpool) - membuf.recv_tx(tx_a) - membuf.recv_tx(tx_b) - print "Started." - raw_input() - print "Stopping..." - diff --git a/backends/libbitcoin/multimap.py b/backends/libbitcoin/multimap.py index 02db35f..c0d4dca 100644 --- a/backends/libbitcoin/multimap.py +++ b/backends/libbitcoin/multimap.py @@ -25,6 +25,9 @@ class MultiMap: def __str__(self): return str(self.multi) + def has_key(self, key): + return self.multi.has_key(key) + if __name__ == "__main__": m = MultiMap() m["foo"] = 1