mempool subscribe works with outputs of transactions.
[electrum-server.git] / backends / libbitcoin / history.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 import threading
4 import multimap
5 import time
6
7 class ExpiryQueue(threading.Thread):
8
9     def __init__(self):
10         self.lock = threading.Lock()
11         self.items = []
12         threading.Thread.__init__(self)
13         self.daemon = True
14
15     def run(self):
16         # Garbage collection
17         while True:
18             with self.lock:
19                 self.items = [i for i in self.items if not i.stopped()]
20             time.sleep(0.1)
21
22     def add(self, item):
23         with self.lock:
24             self.items.append(item)
25
26 expiry_queue = ExpiryQueue()
27
28 class MemoryPoolBuffer:
29
30     def __init__(self, txpool, chain, monitor):
31         self.txpool = txpool
32         self.chain = chain
33         self.monitor = monitor
34         # prevout: inpoint
35         self.lookup_input = {}
36         # payment_address: outpoint
37         self.lookup_address = multimap.MultiMap()
38         # transaction timestamps
39         self.timestamps = {}
40
41     def recv_tx(self, tx, handle_store):
42         tx_hash = str(bitcoin.hash_transaction(tx))
43         desc = (tx_hash, [], [])
44         for input in tx.inputs:
45             prevout = input.previous_output
46             desc[1].append((str(prevout.hash), prevout.index))
47         for idx, output in enumerate(tx.outputs):
48             address = bitcoin.payment_address()
49             if address.extract(output.output_script):
50                 desc[2].append((idx, str(address)))
51         self.txpool.store(tx,
52             bind(self.confirmed, _1, desc),
53             bind(self.mempool_stored, _1, desc, handle_store))
54
55     def mempool_stored(self, ec, desc, handle_store):
56         tx_hash, prevouts, addrs = desc
57         tx_hash = bitcoin.hash_digest(tx_hash)
58         if ec:
59             handle_store(ec)
60             return
61         for idx, prevout in enumerate(prevouts):
62             inpoint = bitcoin.input_point()
63             inpoint.hash, inpoint.index = tx_hash, idx
64             self.lookup_input[str(prevout)] = inpoint
65         for idx, address in addrs:
66             outpoint = bitcoin.output_point()
67             outpoint.hash, outpoint.index = tx_hash, idx
68             self.lookup_address[str(address)] = outpoint
69         self.timestamps[str(tx_hash)] = int(time.time())
70         handle_store(ec)
71         self.monitor.tx_stored(desc)
72
73     def confirmed(self, ec, desc):
74         tx_hash, prevouts, addrs = desc
75         tx_hash = bitcoin.hash_digest(tx_hash)
76         if ec:
77             print "Problem confirming transaction", tx_hash, ec
78             return
79         print "Confirmed", tx_hash
80         for idx, prevout in enumerate(prevouts):
81             inpoint = bitcoin.input_point()
82             inpoint.hash, inpoint.index = tx_hash, idx
83             assert self.lookup_input[str(prevout)] == inpoint
84             del self.lookup_input[str(prevout)]
85         for idx, address in addrs:
86             outpoint = bitcoin.output_point()
87             outpoint.hash, outpoint.index = tx_hash, idx
88             self.lookup_address.delete(str(address), outpoint)
89         del self.timestamps[str(tx_hash)]
90         self.monitor.tx_confirmed(desc)
91
92     def check(self, output_points, address, handle):
93         class ExtendableDict(dict):
94             pass
95         result = []
96         for outpoint in output_points:
97             if self.lookup_input.has_key(str(outpoint)):
98                 point = self.lookup_input[str(outpoint)]
99                 info = ExtendableDict()
100                 info["tx_hash"] = str(point.hash)
101                 info["index"] = point.index
102                 info["is_input"] = 1
103                 info["timestamp"] = self.timestamps[info["tx_hash"]]
104                 result.append(info)
105         if self.lookup_address.has_key(str(address)):
106             addr_points = self.lookup_address[str(address)]
107             for point in addr_points:
108                 info = ExtendableDict()
109                 info["tx_hash"] = str(point.hash)
110                 info["index"] = point.index
111                 info["is_input"] = 0
112                 info["timestamp"] = self.timestamps[info["tx_hash"]]
113                 result.append(info)
114         handle(result)
115
116 class PaymentEntry:
117
118     def __init__(self, output_point):
119         self.lock = threading.Lock()
120         self.output_point = output_point
121         self.output_loaded = None
122         self.input_point = None
123         self.input_loaded = None
124         self.raw_output_script = None
125
126     def is_loaded(self):
127         with self.lock:
128             if self.output_loaded is None:
129                 return False
130             elif self.has_input() and self.input_loaded is None:
131                 return False
132         return True
133
134     def has_input(self):
135         return self.input_point is not False
136
137 class History:
138
139     def __init__(self, chain, txpool, membuf):
140         self.chain = chain
141         self.txpool = txpool
142         self.membuf = membuf
143         self.lock = threading.Lock()
144         self._stopped = False
145
146     def start(self, address, handle_finish):
147         self.statement = []
148         self.membuf_result = None
149         self.address = address
150         self.handle_finish = handle_finish
151
152         address = bitcoin.payment_address(address)
153         # To begin we fetch all the outputs (payments in)
154         # associated with this address
155         self.chain.fetch_outputs(address,
156             bind(self.check_membuf, _1, _2))
157
158     def stop(self):
159         with self.lock:
160             assert self._stopped == False
161             self._stopped = True
162
163     def stopped(self):
164         with self.lock:
165             return self._stopped
166
167     def stop_on_error(self, ec):
168         if ec:
169             self.handle_finish(None)
170             self.stop()
171         return self.stopped()
172
173     def check_membuf(self, ec, output_points):
174         if self.stop_on_error(ec):
175             return
176         self.membuf.check(output_points, self.address,
177             bind(self.start_loading, _1, output_points))
178
179     def start_loading(self, membuf_result, output_points):
180         if len(membuf_result) == 0 and len(output_points) == 0:
181             self.handle_finish([])
182             self.stopped()
183         # Create a bunch of entry lines which are outputs and
184         # then their corresponding input (if it exists)
185         for outpoint in output_points:
186             entry = PaymentEntry(outpoint)
187             with self.lock:
188                 self.statement.append(entry)
189             # Attempt to fetch the spend of this output
190             self.chain.fetch_spend(outpoint,
191                 bind(self.load_spend, _1, _2, entry))
192             self.load_tx_info(outpoint, entry, False)
193         # Load memory pool transactions
194         with self.lock:
195             self.membuf_result = membuf_result
196         for info in self.membuf_result:
197             self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]),
198                 bind(self.load_pool_tx, _1, _2, info))
199
200     def load_spend(self, ec, inpoint, entry):
201         # Need a custom self.stop_on_error(...) as a missing spend
202         # is not an error in this case.
203         if not self.stopped() and ec and ec != bitcoin.error.unspent_output:
204             self.handle_finish(None)
205             self.stop()
206         if self.stopped():
207             return
208         with entry.lock:
209             if ec == bitcoin.error.unspent_output:
210                 # This particular entry.output_point
211                 # has not been spent yet
212                 entry.input_point = False
213             else:
214                 entry.input_point = inpoint
215         if ec == bitcoin.error.unspent_output:
216             # Attempt to stop if all the info for the inputs and outputs
217             # has been loaded.
218             self.finish_if_done()
219         else:
220             # We still have to load at least one more payment outwards
221             self.load_tx_info(inpoint, entry, True)
222
223     def finish_if_done(self):
224         with self.lock:
225             # Still have more entries to finish loading, so return
226             if any(not entry.is_loaded() for entry in self.statement):
227                 return
228             # Memory buffer transactions finished loading?
229             if any(not info.has_key("height") for info in self.membuf_result):
230                 return
231         # Whole operation completed successfully! Finish up.
232         result = []
233         for entry in self.statement:
234             if entry.input_point:
235                 # value of the input is simply the inverse of
236                 # the corresponding output
237                 entry.input_loaded["value"] = -entry.output_loaded["value"]
238                 # output should come before the input as it's chronological
239                 result.append(entry.output_loaded)
240                 result.append(entry.input_loaded)
241             else:
242                 # Unspent outputs have a raw_output_script field
243                 assert entry.raw_output_script is not None
244                 entry.output_loaded["raw_output_script"] = \
245                     entry.raw_output_script
246                 result.append(entry.output_loaded)
247         mempool_result = []
248         for info in self.membuf_result:
249             # Lookup prevout in result
250             # Set "value" field
251             if info["is_input"] == 1:
252                 prevout_tx = None
253                 for prevout_info in result:
254                     if prevout_info["tx_hash"] == info.previous_output.hash:
255                         prevout_tx = prevout_info
256                 assert prevout_tx is not None
257                 info["value"] = -prevout_info["value"]
258             mempool_result.append(info)
259         result.extend(mempool_result)
260         self.handle_finish(result)
261         self.stop()
262
263     def load_tx_info(self, point, entry, is_input):
264         info = {}
265         info["tx_hash"] = str(point.hash)
266         info["index"] = point.index
267         info["is_input"] = 1 if is_input else 0
268         # Before loading the transaction, Stratum requires the hash
269         # of the parent block, so we load the block depth and then
270         # fetch the block header and hash it.
271         self.chain.fetch_transaction_index(point.hash,
272             bind(self.tx_index, _1, _2, _3, entry, info))
273
274     def tx_index(self, ec, block_depth, offset, entry, info):
275         if self.stop_on_error(ec):
276             return
277         info["height"] = block_depth
278         # And now for the block hash
279         self.chain.fetch_block_header_by_depth(block_depth,
280             bind(self.block_header, _1, _2, entry, info))
281
282     def block_header(self, ec, blk_head, entry, info):
283         if self.stop_on_error(ec):
284             return
285         info["timestamp"] = blk_head.timestamp
286         info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
287         tx_hash = bitcoin.hash_digest(info["tx_hash"])
288         # Now load the actual main transaction for this input or output
289         self.chain.fetch_transaction(tx_hash,
290             bind(self.load_chain_tx, _1, _2, entry, info))
291
292     def load_pool_tx(self, ec, tx, info):
293         if self.stop_on_error(ec):
294             return
295         # block_hash = mempool:5
296         # inputs (load from prevtx)
297         # outputs (load from tx)
298         # raw_output_script (load from tx)
299         # height is always None
300         # value (get from finish_if_done)
301         self.load_tx(tx, info)
302         if info["is_input"] == 0:
303             our_output = tx.outputs[info["index"]]
304             info["value"] = our_output.value
305             # Save serialised output script in case this output is unspent
306             info["raw_output_script"] = \
307                 str(bitcoin.save_script(our_output.output_script))
308         else:
309             assert(info["is_input"] == 1)
310             info.previous_output = tx.inputs[info["index"]].previous_output
311         # If all the inputs are loaded
312         if self.inputs_all_loaded(info["inputs"]):
313             # We are the sole input
314             assert(info["is_input"] == 1)
315             self.finish_if_done()
316         create_handler = lambda prevout_index, input_index: \
317             bind(self.load_input_pool_tx, _1, _2,
318                 prevout_index, info, input_index)
319         self.fetch_input_txs(tx, info, create_handler)
320
321     def load_tx(self, tx, info):
322         # List of output addresses
323         outputs = []
324         for tx_out in tx.outputs:
325             address = bitcoin.payment_address()
326             # Attempt to extract address from output script
327             if address.extract(tx_out.output_script):
328                 outputs.append(address.encoded())
329             else:
330                 # ... otherwise append "Unknown"
331                 outputs.append("Unknown")
332         info["outputs"] = outputs
333         # For the inputs, we need the originator address which has to
334         # be looked up in the blockchain.
335         # Create list of Nones and then populate it.
336         # Loading has finished when list is no longer all None.
337         info["inputs"] = [None for i in tx.inputs]
338         # If this transaction was loaded for an input, then we already
339         # have a source address for at least one input.
340         if info["is_input"] == 1:
341             info["inputs"][info["index"]] = self.address
342
343     def fetch_input_txs(self, tx, info, create_handler):
344         # Load the previous_output for every input so we can get
345         # the output address
346         for input_index, tx_input in enumerate(tx.inputs):
347             if info["is_input"] == 1 and info["index"] == input_index:
348                 continue
349             prevout = tx_input.previous_output
350             handler = create_handler(prevout.index, input_index)
351             self.chain.fetch_transaction(prevout.hash, handler)
352
353     def load_chain_tx(self, ec, tx, entry, info):
354         if self.stop_on_error(ec):
355             return
356         self.load_tx(tx, info)
357         if info["is_input"] == 0:
358             our_output = tx.outputs[info["index"]]
359             info["value"] = our_output.value
360             # Save serialised output script in case this output is unspent
361             with entry.lock:
362                 entry.raw_output_script = \
363                     str(bitcoin.save_script(our_output.output_script))
364         # If all the inputs are loaded
365         if self.inputs_all_loaded(info["inputs"]):
366             # We are the sole input
367             assert(info["is_input"] == 1)
368             with entry.lock:
369                 entry.input_loaded = info
370             self.finish_if_done()
371         create_handler = lambda prevout_index, input_index: \
372             bind(self.load_input_chain_tx, _1, _2,
373                 prevout_index, entry, info, input_index)
374         self.fetch_input_txs(tx, info, create_handler)
375
376     def inputs_all_loaded(self, info_inputs):
377         return not [empty_in for empty_in in info_inputs if empty_in is None]
378
379     def load_input_tx(self, tx, output_index, info, input_index):
380         # For our input, we load the previous tx so we can get the
381         # corresponding output.
382         # We need the output to extract the address.
383         script = tx.outputs[output_index].output_script
384         address = bitcoin.payment_address()
385         if address.extract(script):
386             info["inputs"][input_index] = address.encoded()
387         else:
388             info["inputs"][input_index] = "Unknown"
389
390     def load_input_chain_tx(self, ec, tx, output_index,
391                             entry, info, input_index):
392         if self.stop_on_error(ec):
393             return
394         self.load_input_tx(tx, output_index, info, input_index)
395         # If all the inputs are loaded, then we have finished loading
396         # the info for this input-output entry pair
397         if self.inputs_all_loaded(info["inputs"]):
398             with entry.lock:
399                 if info["is_input"] == 1:
400                     entry.input_loaded = info
401                 else:
402                     entry.output_loaded = info
403         self.finish_if_done()
404
405     def load_input_pool_tx(self, ec, tx, output_index, info, input_index):
406         if self.stop_on_error(ec):
407             return
408         self.load_input_tx(tx, output_index, info, input_index)
409         if not [inp for inp in info["inputs"] if inp is None]:
410             # No more inputs left to load
411             # This info has finished loading
412             info["height"] = None
413             info["block_hash"] = "mempool"
414         self.finish_if_done()
415
416 def payment_history(chain, txpool, membuf, address, handle_finish):
417     h = History(chain, txpool, membuf)
418     expiry_queue.add(h)
419     h.start(address, handle_finish)
420
421 if __name__ == "__main__":
422     ex = bitcoin.satoshi_exporter()
423     tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
424     tx_a = ex.load_transaction(tx_a)
425     assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
426     tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
427     tx_b = ex.load_transaction(tx_b)
428     assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
429
430     def blockchain_started(ec, chain):
431         print "Blockchain initialisation:", ec
432     def store_tx(ec):
433         print "Tx", ec
434     def finish(result):
435         print "Finish"
436         if result is None:
437             return
438         for line in result:
439             for k, v in line.iteritems():
440                 begin = k + ":"
441                 print begin, " " * (12 - len(begin)), v
442             print
443
444     class FakeMonitor:
445         def tx_stored(self, tx):
446             pass
447         def tx_confirmed(self, tx):
448             pass
449
450     service = bitcoin.async_service(1)
451     prefix = "/home/genjix/libbitcoin/database.old"
452     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
453     txpool = bitcoin.transaction_pool(service, chain)
454     membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor())
455     membuf.recv_tx(tx_a, store_tx)
456     membuf.recv_tx(tx_b, store_tx)
457     raw_input()
458     address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "18auo3rqfsjth3w2H9zyEz467DDFNNpMJP"
459     #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
460     print "Looking up", address
461     payment_history(chain, txpool, membuf, address[0], finish)
462     payment_history(chain, txpool, membuf, address[1], finish)
463     raw_input()
464     print "Stopping..."
465