history mempool counter.
[electrum-server.git] / backends / libbitcoin / history.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 import threading
4 import multimap
5 import time
6
7 class MemoryPoolBuffer:
8
9     def __init__(self, service, txpool, chain):
10         self.txpool = txpool
11         self.chain = chain
12         # prevout: inpoint
13         self.lookup_input = {}
14         # payment_address: outpoint
15         self.lookup_address = multimap.MultiMap()
16         # transaction timestamps
17         self.timestamps = {}
18
19     def recv_tx(self, tx):
20         tx_hash = bitcoin.hash_transaction(tx)
21         desc = (tx_hash, [], [])
22         for input in tx.inputs:
23             desc[1].append(input.previous_output)
24         for idx, output in enumerate(tx.outputs):
25             address = bitcoin.payment_address()
26             if address.extract(output.output_script):
27                 desc[2].append((idx, address))
28         self.txpool.store(tx,
29             bind(self.confirmed, _1, desc),
30             bind(self.mempool_stored, _1, desc))
31
32     def mempool_stored(self, ec, desc):
33         tx_hash, prevouts, addrs = desc
34         if ec:
35             print "Error storing memory pool transaction", tx_hash, ec
36             return
37         print "Accepted transaction", tx_hash
38         for idx, prevout in enumerate(prevouts):
39             inpoint = bitcoin.input_point()
40             inpoint.hash, inpoint.index = tx_hash, idx
41             self.lookup_input[str(prevout)] = inpoint
42         for idx, address in addrs:
43             outpoint = bitcoin.output_point()
44             outpoint.hash, outpoint.index = tx_hash, idx
45             self.lookup_address[str(address)] = outpoint
46         self.timestamps[str(tx_hash)] = int(time.time())
47
48     def confirmed(self, ec, desc):
49         tx_hash, prevouts, addrs = desc
50         if ec:
51             print "Problem confirming transaction", tx_hash, ec
52             return
53         print "Confirmed", tx_hash
54         for idx, prevout in enumerate(prevouts):
55             inpoint = bitcoin.input_point()
56             inpoint.hash, inpoint.index = tx_hash, idx
57             assert self.lookup_input[str(prevout)] == inpoint
58             del self.lookup_input[str(prevout)]
59         for idx, address in addrs:
60             outpoint = bitcoin.output_point()
61             outpoint.hash, outpoint.index = tx_hash, idx
62             self.lookup_address.delete(str(address), outpoint)
63         del self.timestamps[str(tx_hash)]
64
65     def check(self, output_points, address, handle):
66         class ExtendableDict(dict):
67             pass
68         result = []
69         for outpoint in output_points:
70             if self.lookup_input.has_key(str(outpoint)):
71                 point = self.lookup_input[str(outpoint)]
72                 info = ExtendableDict()
73                 info["tx_hash"] = str(point.hash)
74                 info["index"] = point.index
75                 info["is_input"] = 1
76                 info["timestamp"] = self.timestamps[info["tx_hash"]]
77                 result.append(info)
78         if self.lookup_address.has_key(str(address)):
79             addr_points = self.lookup_address[str(address)]
80             for point in addr_points:
81                 info = ExtendableDict()
82                 info["tx_hash"] = str(point.hash)
83                 info["index"] = point.index
84                 info["is_input"] = 0
85                 info["timestamp"] = self.timestamps[info["tx_hash"]]
86                 result.append(info)
87         handle(result)
88
89 class PaymentEntry:
90
91     def __init__(self, output_point):
92         self.lock = threading.Lock()
93         self.output_point = output_point
94         self.output_loaded = None
95         self.input_point = None
96         self.input_loaded = None
97         self.raw_output_script = None
98
99     def is_loaded(self):
100         with self.lock:
101             if self.output_loaded is None:
102                 return False
103             elif self.has_input() and self.input_loaded is None:
104                 return False
105         return True
106
107     def has_input(self):
108         return self.input_point is not False
109
110 class History:
111
112     def __init__(self, service, chain, txpool, membuf, mempool_counter):
113         self.chain = chain
114         self.txpool = txpool
115         self.membuf = membuf
116         self.mempool_counter = mempool_counter
117         self.lock = threading.Lock()
118         self._stopped = False
119
120     def start(self, address, handle_finish):
121         self.statement = []
122         self.membuf_result = None
123         self.address = address
124         self.handle_finish = handle_finish
125
126         address = bitcoin.payment_address(address)
127         # To begin we fetch all the outputs (payments in)
128         # associated with this address
129         self.chain.fetch_outputs(address,
130             bind(self.check_membuf, _1, _2))
131
132     def stop(self):
133         with self.lock:
134             assert self._stopped == False
135             self._stopped = True
136
137     def stopped(self):
138         with self.lock:
139             return self._stopped
140
141     def stop_on_error(self, ec):
142         if ec:
143             self.handle_finish(None)
144             self.stop()
145         return self.stopped()
146
147     def check_membuf(self, ec, output_points):
148         if self.stop_on_error(ec):
149             return
150         self.membuf.check(output_points, self.address,
151             bind(self.start_loading, _1, output_points))
152
153     def start_loading(self, membuf_result, output_points):
154         # Create a bunch of entry lines which are outputs and
155         # then their corresponding input (if it exists)
156         for outpoint in output_points:
157             entry = PaymentEntry(outpoint)
158             with self.lock:
159                 self.statement.append(entry)
160             # Attempt to fetch the spend of this output
161             self.chain.fetch_spend(outpoint,
162                 bind(self.load_spend, _1, _2, entry))
163             self.load_tx_info(outpoint, entry, False)
164         # Load memory pool transactions
165         with self.lock:
166             self.membuf_result = membuf_result
167         for info in self.membuf_result:
168             self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
169                 bind(self.load_pool_tx, _1, _2, info))
170
171     def load_spend(self, ec, inpoint, entry):
172         # Need a custom self.stop_on_error(...) as a missing spend
173         # is not an error in this case.
174         if not self.stopped() and ec and ec != bitcoin.error.unspent_output:
175             self.handle_finish(None)
176             self.stop()
177         if self.stopped():
178             return
179         with entry.lock:
180             if ec == bitcoin.error.unspent_output:
181                 # This particular entry.output_point
182                 # has not been spent yet
183                 entry.input_point = False
184             else:
185                 entry.input_point = inpoint
186         if ec == bitcoin.error.unspent_output:
187             # Attempt to stop if all the info for the inputs and outputs
188             # has been loaded.
189             self.finish_if_done()
190         else:
191             # We still have to load at least one more payment outwards
192             self.load_tx_info(inpoint, entry, True)
193
194     def finish_if_done(self):
195         with self.lock:
196             # Still have more entries to finish loading, so return
197             if any(not entry.is_loaded() for entry in self.statement):
198                 return
199             # Memory buffer transactions finished loading?
200             if any(not info.has_key("height") for info in self.membuf_result):
201                 return
202         # Whole operation completed successfully! Finish up.
203         result = []
204         for entry in self.statement:
205             if entry.input_point:
206                 # value of the input is simply the inverse of
207                 # the corresponding output
208                 entry.input_loaded["value"] = -entry.output_loaded["value"]
209                 # output should come before the input as it's chronological
210                 result.append(entry.output_loaded)
211                 result.append(entry.input_loaded)
212             else:
213                 # Unspent outputs have a raw_output_script field
214                 assert entry.raw_output_script is not None
215                 entry.output_loaded["raw_output_script"] = \
216                     entry.raw_output_script
217                 result.append(entry.output_loaded)
218         mempool_result = []
219         for info in self.membuf_result:
220             # Lookup prevout in result
221             # Set "value" field
222             if info["is_input"] == 1:
223                 if not self.mempool_counter.has_key(info["tx_hash"]):
224                     if not self.mempool_counter:
225                         count = 0
226                     else:
227                         count = max(self.mempool_counter.values()) + 1
228                     self.mempool_counter[info["tx_hash"]] = count
229                 else:
230                     count = self.mempool_counter[info["tx_hash"]]
231                 info["block_hash"] = "mempool:%s" % count
232                 prevout_tx = None
233                 for prevout_info in result:
234                     if prevout_info["tx_hash"] == info.previous_output.hash:
235                         prevout_tx = prevout_info
236                 assert prevout_tx is not None
237                 info["value"] = -prevout_info["value"]
238             mempool_result.append(info)
239         result.extend(mempool_result)
240         self.handle_finish(result)
241         self.stop()
242
243     def load_tx_info(self, point, entry, is_input):
244         info = {}
245         info["tx_hash"] = str(point.hash)
246         info["index"] = point.index
247         info["is_input"] = 1 if is_input else 0
248         # Before loading the transaction, Stratum requires the hash
249         # of the parent block, so we load the block depth and then
250         # fetch the block header and hash it.
251         self.chain.fetch_transaction_index(point.hash,
252             bind(self.tx_index, _1, _2, _3, entry, info))
253
254     def tx_index(self, ec, block_depth, offset, entry, info):
255         if self.stop_on_error(ec):
256             return
257         info["height"] = block_depth
258         # And now for the block hash
259         self.chain.fetch_block_header_by_depth(block_depth,
260             bind(self.block_header, _1, _2, entry, info))
261
262     def block_header(self, ec, blk_head, entry, info):
263         if self.stop_on_error(ec):
264             return
265         info["timestamp"] = blk_head.timestamp
266         info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
267         tx_hash = bitcoin.hash_digest(info["tx_hash"])
268         # Now load the actual main transaction for this input or output
269         self.chain.fetch_transaction(tx_hash,
270             bind(self.load_chain_tx, _1, _2, entry, info))
271
272     def load_pool_tx(self, ec, tx, info):
273         if self.stop_on_error(ec):
274             return
275         # block_hash = mempool:5
276         # inputs (load from prevtx)
277         # outputs (load from tx)
278         # raw_output_script (load from tx)
279         # height is always None
280         # value (get from finish_if_done)
281         self.load_tx(tx, info)
282         if info["is_input"] == 0:
283             our_output = tx.outputs[info["index"]]
284             info["value"] = our_output.value
285             # Save serialised output script in case this output is unspent
286             info["raw_output_script"] = \
287                 str(bitcoin.save_script(our_output.output_script))
288         else:
289             assert(info["is_input"] == 1)
290             info.previous_output = tx.inputs[info["index"]].previous_output
291         # If all the inputs are loaded
292         if self.inputs_all_loaded(info["inputs"]):
293             # We are the sole input
294             assert(info["is_input"] == 1)
295             self.finish_if_done()
296         create_handler = lambda prevout_index, input_index: \
297             bind(self.load_input_pool_tx, _1, _2,
298                 prevout_index, info, input_index)
299         self.fetch_input_txs(tx, info, create_handler)
300
301     def load_tx(self, tx, info):
302         # List of output addresses
303         outputs = []
304         for tx_out in tx.outputs:
305             address = bitcoin.payment_address()
306             # Attempt to extract address from output script
307             if address.extract(tx_out.output_script):
308                 outputs.append(address.encoded())
309             else:
310                 # ... otherwise append "Unknown"
311                 outputs.append("Unknown")
312         info["outputs"] = outputs
313         # For the inputs, we need the originator address which has to
314         # be looked up in the blockchain.
315         # Create list of Nones and then populate it.
316         # Loading has finished when list is no longer all None.
317         info["inputs"] = [None for i in tx.inputs]
318         # If this transaction was loaded for an input, then we already
319         # have a source address for at least one input.
320         if info["is_input"] == 1:
321             info["inputs"][info["index"]] = self.address
322
323     def fetch_input_txs(self, tx, info, create_handler):
324         # Load the previous_output for every input so we can get
325         # the output address
326         for input_index, tx_input in enumerate(tx.inputs):
327             if info["is_input"] == 1 and info["index"] == input_index:
328                 continue
329             prevout = tx_input.previous_output
330             handler = create_handler(prevout.index, input_index)
331             self.chain.fetch_transaction(prevout.hash, handler)
332
333     def load_chain_tx(self, ec, tx, entry, info):
334         if self.stop_on_error(ec):
335             return
336         self.load_tx(tx, info)
337         if info["is_input"] == 0:
338             our_output = tx.outputs[info["index"]]
339             info["value"] = our_output.value
340             # Save serialised output script in case this output is unspent
341             with entry.lock:
342                 entry.raw_output_script = \
343                     str(bitcoin.save_script(our_output.output_script))
344         # If all the inputs are loaded
345         if self.inputs_all_loaded(info["inputs"]):
346             # We are the sole input
347             assert(info["is_input"] == 1)
348             with entry.lock:
349                 entry.input_loaded = info
350             self.finish_if_done()
351         create_handler = lambda prevout_index, input_index: \
352             bind(self.load_input_chain_tx, _1, _2,
353                 prevout_index, entry, info, input_index)
354         self.fetch_input_txs(tx, info, create_handler)
355
356     def inputs_all_loaded(self, info_inputs):
357         return not [empty_in for empty_in in info_inputs if empty_in is None]
358
359     def load_input_tx(self, tx, output_index, info, input_index):
360         # For our input, we load the previous tx so we can get the
361         # corresponding output.
362         # We need the output to extract the address.
363         script = tx.outputs[output_index].output_script
364         address = bitcoin.payment_address()
365         if address.extract(script):
366             info["inputs"][input_index] = address.encoded()
367         else:
368             info["inputs"][input_index] = "Unknown"
369
370     def load_input_chain_tx(self, ec, tx, output_index,
371                             entry, info, input_index):
372         if self.stop_on_error(ec):
373             return
374         self.load_input_tx(tx, output_index, info, input_index)
375         # If all the inputs are loaded, then we have finished loading
376         # the info for this input-output entry pair
377         if self.inputs_all_loaded(info["inputs"]):
378             with entry.lock:
379                 if info["is_input"] == 1:
380                     entry.input_loaded = info
381                 else:
382                     entry.output_loaded = info
383         self.finish_if_done()
384
385     def load_input_pool_tx(self, ec, tx, output_index, info, input_index):
386         if self.stop_on_error(ec):
387             return
388         self.load_input_tx(tx, output_index, info, input_index)
389         if not [inp for inp in info["inputs"] if inp is None]:
390             # No more inputs left to load
391             # This info has finished loading
392             info["height"] = None
393             info["block_hash"] = "mempool"
394         self.finish_if_done()
395
396 if __name__ == "__main__":
397     ex = bitcoin.satoshi_exporter()
398     tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
399     tx_a = ex.load_transaction(tx_a)
400     assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
401     tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
402     tx_b = ex.load_transaction(tx_b)
403     assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
404
405     def blockchain_started(ec, chain):
406         print "Blockchain initialisation:", ec
407     def finish(result):
408         print "Finish"
409         if result is None:
410             return
411         for line in result:
412             for k, v in line.iteritems():
413                 begin = k + ":"
414                 print begin, " " * (12 - len(begin)), v
415             print
416
417     service = bitcoin.async_service(1)
418     prefix = "/home/genjix/libbitcoin/database.old"
419     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
420     txpool = bitcoin.transaction_pool(service, chain)
421     local_service = bitcoin.AsyncService()
422     membuf = MemoryPoolBuffer(local_service, txpool, chain)
423     membuf.recv_tx(tx_a)
424     membuf.recv_tx(tx_b)
425     raw_input()
426     address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1EMnecJFwihf2pf4nE2m8fUNFKVRMWKqhR"
427     #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
428     print "Looking up", address
429     mempool_count = {}
430     h = History(local_service, chain, txpool, membuf, mempool_count)
431     h.start(address[0], finish)
432     raw_input()
433     h1 = History(local_service, chain, txpool, membuf, mempool_count)
434     h1.start(address[1], finish)
435     raw_input()
436     print "Stopping..."
437