X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Flibbitcoin%2Fhistory.py;h=4c225c2fbed37f84ecf3efc6486e034a0e38f1c1;hb=27bd2d1754b5949678d1ee7ff5efad4b3c8bef6f;hp=683d34574aabccad7f9b195718abe913e7e29667;hpb=f6554c8e465df545abd61e0938198262be1abab6;p=electrum-server.git diff --git a/backends/libbitcoin/history.py b/backends/libbitcoin/history.py index 683d345..4c225c2 100644 --- a/backends/libbitcoin/history.py +++ b/backends/libbitcoin/history.py @@ -4,11 +4,33 @@ import threading import multimap import time +class ExpiryQueue(threading.Thread): + + def __init__(self): + self.lock = threading.Lock() + self.items = [] + threading.Thread.__init__(self) + self.daemon = True + + def run(self): + # Garbage collection + while True: + with self.lock: + self.items = [i for i in self.items if not i.stopped()] + time.sleep(0.1) + + def add(self, item): + with self.lock: + self.items.append(item) + +expiry_queue = ExpiryQueue() + class MemoryPoolBuffer: - def __init__(self, service, txpool, chain): + def __init__(self, txpool, chain, monitor): self.txpool = txpool self.chain = chain + self.monitor = monitor # prevout: inpoint self.lookup_input = {} # payment_address: outpoint @@ -16,25 +38,26 @@ class MemoryPoolBuffer: # transaction timestamps self.timestamps = {} - def recv_tx(self, tx): - tx_hash = bitcoin.hash_transaction(tx) + def recv_tx(self, tx, handle_store): + tx_hash = str(bitcoin.hash_transaction(tx)) desc = (tx_hash, [], []) for input in tx.inputs: - desc[1].append(input.previous_output) + prevout = input.previous_output + desc[1].append((str(prevout.hash), prevout.index)) for idx, output in enumerate(tx.outputs): address = bitcoin.payment_address() if address.extract(output.output_script): - desc[2].append((idx, address)) + desc[2].append((idx, str(address))) self.txpool.store(tx, bind(self.confirmed, _1, desc), - bind(self.mempool_stored, _1, desc)) + bind(self.mempool_stored, _1, desc, handle_store)) - def mempool_stored(self, ec, desc): + def mempool_stored(self, ec, desc, handle_store): tx_hash, prevouts, addrs = desc + tx_hash = bitcoin.hash_digest(tx_hash) if ec: - print "Error storing memory pool transaction", tx_hash, ec + handle_store(ec) return - print "Accepted transaction", tx_hash for idx, prevout in enumerate(prevouts): inpoint = bitcoin.input_point() inpoint.hash, inpoint.index = tx_hash, idx @@ -44,9 +67,12 @@ class MemoryPoolBuffer: outpoint.hash, outpoint.index = tx_hash, idx self.lookup_address[str(address)] = outpoint self.timestamps[str(tx_hash)] = int(time.time()) + handle_store(ec) + self.monitor.tx_stored(desc) def confirmed(self, ec, desc): tx_hash, prevouts, addrs = desc + tx_hash = bitcoin.hash_digest(tx_hash) if ec: print "Problem confirming transaction", tx_hash, ec return @@ -61,6 +87,7 @@ class MemoryPoolBuffer: outpoint.hash, outpoint.index = tx_hash, idx self.lookup_address.delete(str(address), outpoint) del self.timestamps[str(tx_hash)] + self.monitor.tx_confirmed(desc) def check(self, output_points, address, handle): class ExtendableDict(dict): @@ -76,13 +103,14 @@ class MemoryPoolBuffer: info["timestamp"] = self.timestamps[info["tx_hash"]] result.append(info) if self.lookup_address.has_key(str(address)): - point = self.lookup_address[str(address)] - info = ExtendableDict() - info["tx_hash"] = str(point.hash) - info["index"] = point.index - info["is_input"] = 0 - info["timestamp"] = self.timestamps[info["tx_hash"]] - result.append(info) + addr_points = self.lookup_address[str(address)] + for point in addr_points: + info = ExtendableDict() + info["tx_hash"] = str(point.hash) + info["index"] = point.index + info["is_input"] = 0 + info["timestamp"] = self.timestamps[info["tx_hash"]] + result.append(info) handle(result) class PaymentEntry: @@ -108,16 +136,16 @@ class PaymentEntry: class History: - def __init__(self, service, chain, txpool, membuf): + def __init__(self, chain, txpool, membuf): self.chain = chain self.txpool = txpool self.membuf = membuf self.lock = threading.Lock() - self.statement = [] - self.membuf_result = None self._stopped = False def start(self, address, handle_finish): + self.statement = [] + self.membuf_result = None self.address = address self.handle_finish = handle_finish @@ -149,6 +177,9 @@ class History: bind(self.start_loading, _1, output_points)) def start_loading(self, membuf_result, output_points): + if len(membuf_result) == 0 and len(output_points) == 0: + self.handle_finish([]) + self.stopped() # Create a bunch of entry lines which are outputs and # then their corresponding input (if it exists) for outpoint in output_points: @@ -379,8 +410,14 @@ class History: # No more inputs left to load # This info has finished loading info["height"] = None + info["block_hash"] = "mempool" self.finish_if_done() +def payment_history(chain, txpool, membuf, address, handle_finish): + h = History(chain, txpool, membuf) + expiry_queue.add(h) + h.start(address, handle_finish) + if __name__ == "__main__": ex = bitcoin.satoshi_exporter() tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000") @@ -392,6 +429,8 @@ if __name__ == "__main__": def blockchain_started(ec, chain): print "Blockchain initialisation:", ec + def store_tx(ec): + print "Tx", ec def finish(result): print "Finish" if result is None: @@ -402,21 +441,25 @@ if __name__ == "__main__": print begin, " " * (12 - len(begin)), v print + class FakeMonitor: + def tx_stored(self, tx): + pass + def tx_confirmed(self, tx): + pass + service = bitcoin.async_service(1) prefix = "/home/genjix/libbitcoin/database.old" chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started) txpool = bitcoin.transaction_pool(service, chain) - local_service = bitcoin.AsyncService() - membuf = MemoryPoolBuffer(local_service, txpool, chain) - membuf.recv_tx(tx_a) - membuf.recv_tx(tx_b) + membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor()) + membuf.recv_tx(tx_a, store_tx) + membuf.recv_tx(tx_b, store_tx) raw_input() - #address = bitcoin.payment_address("1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D") - address = "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR" + address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "18auo3rqfsjth3w2H9zyEz467DDFNNpMJP" #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE" print "Looking up", address - h = History(local_service, chain, txpool, membuf) - h.start(address, finish) + payment_history(chain, txpool, membuf, address[0], finish) + payment_history(chain, txpool, membuf, address[1], finish) raw_input() print "Stopping..."