Merge branch 'master' of github.com:spesmilo/electrum-server
[electrum-server.git] / backends / libbitcoin / history.py
1 import threading
2 import time
3
4 import bitcoin
5 from bitcoin import bind, _1, _2, _3
6 import multimap
7
8
9 class ExpiryQueue(threading.Thread):
10
11     def __init__(self):
12         self.lock = threading.Lock()
13         self.items = []
14         threading.Thread.__init__(self)
15         self.daemon = True
16
17     def run(self):
18         # Garbage collection
19         while True:
20             with self.lock:
21                 self.items = [i for i in self.items if not i.stopped()]
22             time.sleep(0.1)
23
24     def add(self, item):
25         with self.lock:
26             self.items.append(item)
27
28
29 expiry_queue = ExpiryQueue()
30
31
32 class MemoryPoolBuffer:
33
34     def __init__(self, txpool, chain, monitor):
35         self.txpool = txpool
36         self.chain = chain
37         self.monitor = monitor
38         # prevout: inpoint
39         self.lookup_input = {}
40         # payment_address: outpoint
41         self.lookup_address = multimap.MultiMap()
42         # transaction timestamps
43         self.timestamps = {}
44
45     def recv_tx(self, tx, handle_store):
46         tx_hash = str(bitcoin.hash_transaction(tx))
47         desc = (tx_hash, [], [])
48         for input in tx.inputs:
49             prevout = input.previous_output
50             desc[1].append((str(prevout.hash), prevout.index))
51         for idx, output in enumerate(tx.outputs):
52             address = bitcoin.payment_address()
53             if address.extract(output.output_script):
54                 desc[2].append((idx, str(address)))
55         self.txpool.store(
56             tx,
57             bind(self.confirmed, _1, desc),
58             bind(self.mempool_stored, _1, desc, handle_store)
59         )
60
61     def mempool_stored(self, ec, desc, handle_store):
62         tx_hash, prevouts, addrs = desc
63         if ec:
64             handle_store(ec)
65             return
66         for idx, prevout in enumerate(prevouts):
67             #inpoint = bitcoin.input_point()
68             #inpoint.hash, inpoint.index = tx_hash, idx
69             prevout = "%s:%s" % prevout
70             self.lookup_input[prevout] = tx_hash, idx
71         for idx, address in addrs:
72             #outpoint = bitcoin.output_point()
73             #outpoint.hash, outpoint.index = tx_hash, idx
74             self.lookup_address[str(address)] = tx_hash, idx
75         self.timestamps[tx_hash] = int(time.time())
76         handle_store(ec)
77         self.monitor.tx_stored(desc)
78
79     def confirmed(self, ec, desc):
80         tx_hash, prevouts, addrs = desc
81         if ec:
82             print "Problem confirming transaction", tx_hash, ec
83             return
84         print "Confirmed", tx_hash
85         for idx, prevout in enumerate(prevouts):
86             #inpoint = bitcoin.input_point()
87             #inpoint.hash, inpoint.index = tx_hash, idx
88             prevout = "%s:%s" % prevout
89             assert self.lookup_input[prevout] == (tx_hash, idx)
90             del self.lookup_input[prevout]
91         for idx, address in addrs:
92             #outpoint = bitcoin.output_point()
93             #outpoint.hash, outpoint.index = tx_hash, idx
94             outpoint = tx_hash, idx
95             self.lookup_address.delete(str(address), outpoint)
96         del self.timestamps[tx_hash]
97         self.monitor.tx_confirmed(desc)
98
99     def check(self, output_points, address, handle):
100         class ExtendableDict(dict):
101             pass
102         result = []
103         for outpoint in output_points:
104             if str(outpoint) in self.lookup_input:
105                 point = self.lookup_input[str(outpoint)]
106                 info = ExtendableDict()
107                 info["tx_hash"] = point[0]
108                 info["index"] = point[1]
109                 info["is_input"] = 1
110                 info["timestamp"] = self.timestamps[info["tx_hash"]]
111                 result.append(info)
112         if str(address) in self.lookup_address:
113             addr_points = self.lookup_address[str(address)]
114             for point in addr_points:
115                 info = ExtendableDict()
116                 info["tx_hash"] = str(point.hash)
117                 info["index"] = point.index
118                 info["is_input"] = 0
119                 info["timestamp"] = self.timestamps[info["tx_hash"]]
120                 result.append(info)
121         handle(result)
122
123
124 class PaymentEntry:
125
126     def __init__(self, output_point):
127         self.lock = threading.Lock()
128         self.output_point = output_point
129         self.output_loaded = None
130         self.input_point = None
131         self.input_loaded = None
132         self.raw_output_script = None
133
134     def is_loaded(self):
135         with self.lock:
136             if self.output_loaded is None:
137                 return False
138             elif self.has_input() and self.input_loaded is None:
139                 return False
140         return True
141
142     def has_input(self):
143         return self.input_point is not False
144
145
146 class History:
147
148     def __init__(self, chain, txpool, membuf):
149         self.chain = chain
150         self.txpool = txpool
151         self.membuf = membuf
152         self.lock = threading.Lock()
153         self._stopped = False
154
155     def start(self, address, handle_finish):
156         self.statement = []
157         self.membuf_result = None
158         self.address = address
159         self.handle_finish = handle_finish
160
161         address = bitcoin.payment_address(address)
162         # To begin we fetch all the outputs (payments in)
163         # associated with this address
164         self.chain.fetch_outputs(address, bind(self.check_membuf, _1, _2))
165
166     def stop(self):
167         with self.lock:
168             assert self._stopped is False
169             self._stopped = True
170
171     def stopped(self):
172         with self.lock:
173             return self._stopped
174
175     def stop_on_error(self, ec):
176         if ec:
177             self.handle_finish(None)
178             self.stop()
179         return self.stopped()
180
181     def check_membuf(self, ec, output_points):
182         if self.stop_on_error(ec):
183             return
184         self.membuf.check(output_points, self.address, bind(self.start_loading, _1, output_points))
185
186     def start_loading(self, membuf_result, output_points):
187         if len(membuf_result) == 0 and len(output_points) == 0:
188             self.handle_finish([])
189             self.stopped()
190         # Create a bunch of entry lines which are outputs and
191         # then their corresponding input (if it exists)
192         for outpoint in output_points:
193             entry = PaymentEntry(outpoint)
194             with self.lock:
195                 self.statement.append(entry)
196             # Attempt to fetch the spend of this output
197             self.chain.fetch_spend(outpoint, bind(self.load_spend, _1, _2, entry))
198             self.load_tx_info(outpoint, entry, False)
199         # Load memory pool transactions
200         with self.lock:
201             self.membuf_result = membuf_result
202         for info in self.membuf_result:
203             self.txpool.fetch(bitcoin.hash_digest(info["tx_hash"]), bind(self.load_pool_tx, _1, _2, info))
204
205     def load_spend(self, ec, inpoint, entry):
206         # Need a custom self.stop_on_error(...) as a missing spend
207         # is not an error in this case.
208         if not self.stopped() and ec and ec != bitcoin.error.unspent_output:
209             self.handle_finish(None)
210             self.stop()
211         if self.stopped():
212             return
213         with entry.lock:
214             if ec == bitcoin.error.unspent_output:
215                 # This particular entry.output_point
216                 # has not been spent yet
217                 entry.input_point = False
218             else:
219                 entry.input_point = inpoint
220         if ec == bitcoin.error.unspent_output:
221             # Attempt to stop if all the info for the inputs and outputs
222             # has been loaded.
223             self.finish_if_done()
224         else:
225             # We still have to load at least one more payment outwards
226             self.load_tx_info(inpoint, entry, True)
227
228     def finish_if_done(self):
229         with self.lock:
230             # Still have more entries to finish loading, so return
231             if any(not entry.is_loaded() for entry in self.statement):
232                 return
233             # Memory buffer transactions finished loading?
234             if any("height" not in info for info in self.membuf_result):
235                 return
236         # Whole operation completed successfully! Finish up.
237         result = []
238         for entry in self.statement:
239             if entry.input_point:
240                 # value of the input is simply the inverse of
241                 # the corresponding output
242                 entry.input_loaded["value"] = -entry.output_loaded["value"]
243                 # output should come before the input as it's chronological
244                 result.append(entry.output_loaded)
245                 result.append(entry.input_loaded)
246             else:
247                 # Unspent outputs have a raw_output_script field
248                 assert entry.raw_output_script is not None
249                 entry.output_loaded["raw_output_script"] = \
250                     entry.raw_output_script
251                 result.append(entry.output_loaded)
252         mempool_result = []
253         for info in self.membuf_result:
254             # Lookup prevout in result
255             # Set "value" field
256             if info["is_input"] == 1:
257                 prevout_tx = None
258                 for prevout_info in result:
259                     if prevout_info["tx_hash"] == info.previous_output.hash:
260                         prevout_tx = prevout_info
261                 assert prevout_tx is not None
262                 info["value"] = -prevout_info["value"]
263             mempool_result.append(info)
264         result.extend(mempool_result)
265         self.handle_finish(result)
266         self.stop()
267
268     def load_tx_info(self, point, entry, is_input):
269         info = {}
270         info["tx_hash"] = str(point.hash)
271         info["index"] = point.index
272         info["is_input"] = 1 if is_input else 0
273         # Before loading the transaction, Stratum requires the hash
274         # of the parent block, so we load the block depth and then
275         # fetch the block header and hash it.
276         self.chain.fetch_transaction_index(point.hash, bind(self.tx_index, _1, _2, _3, entry, info))
277
278     def tx_index(self, ec, block_depth, offset, entry, info):
279         if self.stop_on_error(ec):
280             return
281         info["height"] = block_depth
282         # And now for the block hash
283         self.chain.fetch_block_header_by_depth(block_depth, bind(self.block_header, _1, _2, entry, info))
284
285     def block_header(self, ec, blk_head, entry, info):
286         if self.stop_on_error(ec):
287             return
288         info["timestamp"] = blk_head.timestamp
289         info["block_hash"] = str(bitcoin.hash_block_header(blk_head))
290         tx_hash = bitcoin.hash_digest(info["tx_hash"])
291         # Now load the actual main transaction for this input or output
292         self.chain.fetch_transaction(tx_hash, bind(self.load_chain_tx, _1, _2, entry, info))
293
294     def load_pool_tx(self, ec, tx, info):
295         if self.stop_on_error(ec):
296             return
297         # block_hash = mempool:5
298         # inputs (load from prevtx)
299         # outputs (load from tx)
300         # raw_output_script (load from tx)
301         # height is always None
302         # value (get from finish_if_done)
303         self.load_tx(tx, info)
304         if info["is_input"] == 0:
305             our_output = tx.outputs[info["index"]]
306             info["value"] = our_output.value
307             # Save serialised output script in case this output is unspent
308             info["raw_output_script"] = \
309                 str(bitcoin.save_script(our_output.output_script))
310         else:
311             assert(info["is_input"] == 1)
312             info.previous_output = tx.inputs[info["index"]].previous_output
313         # If all the inputs are loaded
314         if self.inputs_all_loaded(info["inputs"]):
315             # We are the sole input
316             assert(info["is_input"] == 1)
317             # No more inputs left to load
318             # This info has finished loading
319             info["height"] = None
320             info["block_hash"] = "mempool"
321             self.finish_if_done()
322         create_handler = lambda prevout_index, input_index: \
323             bind(self.load_input_pool_tx, _1, _2, prevout_index, info, input_index)
324         self.fetch_input_txs(tx, info, create_handler)
325
326     def load_tx(self, tx, info):
327         # List of output addresses
328         outputs = []
329         for tx_out in tx.outputs:
330             address = bitcoin.payment_address()
331             # Attempt to extract address from output script
332             if address.extract(tx_out.output_script):
333                 outputs.append(address.encoded())
334             else:
335                 # ... otherwise append "Unknown"
336                 outputs.append("Unknown")
337         info["outputs"] = outputs
338         # For the inputs, we need the originator address which has to
339         # be looked up in the blockchain.
340         # Create list of Nones and then populate it.
341         # Loading has finished when list is no longer all None.
342         info["inputs"] = [None for i in tx.inputs]
343         # If this transaction was loaded for an input, then we already
344         # have a source address for at least one input.
345         if info["is_input"] == 1:
346             info["inputs"][info["index"]] = self.address
347
348     def fetch_input_txs(self, tx, info, create_handler):
349         # Load the previous_output for every input so we can get
350         # the output address
351         for input_index, tx_input in enumerate(tx.inputs):
352             if info["is_input"] == 1 and info["index"] == input_index:
353                 continue
354             prevout = tx_input.previous_output
355             handler = create_handler(prevout.index, input_index)
356             self.chain.fetch_transaction(prevout.hash, handler)
357
358     def load_chain_tx(self, ec, tx, entry, info):
359         if self.stop_on_error(ec):
360             return
361         self.load_tx(tx, info)
362         if info["is_input"] == 0:
363             our_output = tx.outputs[info["index"]]
364             info["value"] = our_output.value
365             # Save serialised output script in case this output is unspent
366             with entry.lock:
367                 entry.raw_output_script = \
368                     str(bitcoin.save_script(our_output.output_script))
369         # If all the inputs are loaded
370         if self.inputs_all_loaded(info["inputs"]):
371             # We are the sole input
372             assert(info["is_input"] == 1)
373             with entry.lock:
374                 entry.input_loaded = info
375             self.finish_if_done()
376         create_handler = lambda prevout_index, input_index: \
377             bind(self.load_input_chain_tx, _1, _2, prevout_index, entry, info, input_index)
378         self.fetch_input_txs(tx, info, create_handler)
379
380     def inputs_all_loaded(self, info_inputs):
381         return not [empty_in for empty_in in info_inputs if empty_in is None]
382
383     def load_input_tx(self, tx, output_index, info, input_index):
384         # For our input, we load the previous tx so we can get the
385         # corresponding output.
386         # We need the output to extract the address.
387         script = tx.outputs[output_index].output_script
388         address = bitcoin.payment_address()
389         if address.extract(script):
390             info["inputs"][input_index] = address.encoded()
391         else:
392             info["inputs"][input_index] = "Unknown"
393
394     def load_input_chain_tx(self, ec, tx, output_index,
395                             entry, info, input_index):
396         if self.stop_on_error(ec):
397             return
398         self.load_input_tx(tx, output_index, info, input_index)
399         # If all the inputs are loaded, then we have finished loading
400         # the info for this input-output entry pair
401         if self.inputs_all_loaded(info["inputs"]):
402             with entry.lock:
403                 if info["is_input"] == 1:
404                     entry.input_loaded = info
405                 else:
406                     entry.output_loaded = info
407         self.finish_if_done()
408
409     def load_input_pool_tx(self, ec, tx, output_index, info, input_index):
410         if self.stop_on_error(ec):
411             return
412         self.load_input_tx(tx, output_index, info, input_index)
413         if not [inp for inp in info["inputs"] if inp is None]:
414             # No more inputs left to load
415             # This info has finished loading
416             info["height"] = None
417             info["block_hash"] = "mempool"
418         self.finish_if_done()
419
420
421 def payment_history(chain, txpool, membuf, address, handle_finish):
422     h = History(chain, txpool, membuf)
423     expiry_queue.add(h)
424     h.start(address, handle_finish)
425
426
427 if __name__ == "__main__":
428     ex = bitcoin.satoshi_exporter()
429     tx_a = bitcoin.data_chunk("0100000003d0406a31f628e18f5d894b2eaf4af719906dc61be4fb433a484ed870f6112d15000000008b48304502210089c11db8c1524d8839243803ac71e536f3d876e8265bbb3bc4a722a5d0bd40aa022058c3e59a7842ef1504b1c2ce048f9af2d69bbf303401dced1f68b38d672098a10141046060f6c8e355b94375eec2cc1d231f8044e811552d54a7c4b36fe8ee564861d07545c6c9d5b9f60d16e67d683b93486c01d3bd3b64d142f48af70bb7867d0ffbffffffff6152ed1552b1f2635317cea7be06615a077fc0f4aa62795872836c4182ca0f25000000008b48304502205f75a468ddb08070d235f76cb94c3f3e2a75e537bc55d087cc3e2a1559b7ac9b022100b17e4c958aaaf9b93359f5476aa5ed438422167e294e7207d5cfc105e897ed91014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff01c52914dcb0f3d8822e5a9e3374e5893a7b6033c9cfce5a8e5e6a1b3222a5cb010000008c4930460221009561f7206cc98f40f3eab5f3308b12846d76523bd07b5f058463f387694452b2022100b2684ec201760fa80b02954e588f071e46d0ff16562c1ab393888416bf8fcc44014104a7108ec63464d6735302085124f3b7a06aa8f9363eab1f85f49a21689b286eb80fbabda7f838d9b6bff8550b377ad790b41512622518801c5230463dbbff6001ffffffff02407e0f00000000001976a914c3b98829108923c41b3c1ba6740ecb678752fd5e88ac40420f00000000001976a914424648ea6548cc1c4ea707c7ca58e6131791785188ac00000000")
430     tx_a = ex.load_transaction(tx_a)
431     assert bitcoin.hash_transaction(tx_a) == "e72e4f025695446cfd5c5349d1720beb38801f329a00281f350cb7e847153397"
432     tx_b = bitcoin.data_chunk("0100000001e269f0d74b8e6849233953715bc0be3ba6727afe0bc5000d015758f9e67dde34000000008c4930460221008e305e3fdf4420203a8cced5be20b73738a3b51186dfda7c6294ee6bebe331b7022100c812ded044196132f5e796dbf4b566b6ee3246cc4915eca3cf07047bcdf24a9301410493b6ce24182a58fc3bd0cbee0ddf5c282e00c0c10b1293c7a3567e95bfaaf6c9a431114c493ba50398ad0a82df06254605d963d6c226db615646fadd083ddfd9ffffffff020f9c1208000000001976a91492fffb2cb978d539b6bcd12c968b263896c6aacf88ac8e3f7600000000001976a914654dc745e9237f86b5fcdfd7e01165af2d72909588ac00000000")
433     tx_b = ex.load_transaction(tx_b)
434     assert bitcoin.hash_transaction(tx_b) == "acfda6dbf4ae1b102326bfb7c9541702d5ebb0339bc57bd74d36746855be8eac"
435
436     def blockchain_started(ec, chain):
437         print "Blockchain initialisation:", ec
438
439     def store_tx(ec):
440         print "Tx", ec
441
442     def finish(result):
443         print "Finish"
444         if result is None:
445             return
446         for line in result:
447             for k, v in line.iteritems():
448                 begin = k + ":"
449                 print begin, " " * (12 - len(begin)), v
450             print
451
452     class FakeMonitor:
453         def tx_stored(self, tx):
454             pass
455
456         def tx_confirmed(self, tx):
457             pass
458
459     service = bitcoin.async_service(1)
460     prefix = "/home/genjix/libbitcoin/database"
461     chain = bitcoin.bdb_blockchain(service, prefix, blockchain_started)
462     txpool = bitcoin.transaction_pool(service, chain)
463     membuf = MemoryPoolBuffer(txpool, chain, FakeMonitor())
464     membuf.recv_tx(tx_a, store_tx)
465     membuf.recv_tx(tx_b, store_tx)
466
467     txdat = bitcoin.data_chunk("0100000001d6cad920a04acd6c0609cd91fe4dafa1f3b933ac90e032c78fdc19d98785f2bb010000008b483045022043f8ce02784bd7231cb362a602920f2566c18e1877320bf17d4eabdac1019b2f022100f1fd06c57330683dff50e1b4571fb0cdab9592f36e3d7e98d8ce3f94ce3f255b01410453aa8d5ddef56731177915b7b902336109326f883be759ec9da9c8f1212c6fa3387629d06e5bf5e6bcc62ec5a70d650c3b1266bb0bcc65ca900cff5311cb958bffffffff0280969800000000001976a9146025cabdbf823949f85595f3d1c54c54cd67058b88ac602d2d1d000000001976a914c55c43631ab14f7c4fd9c5f153f6b9123ec32c8888ac00000000")
468     req = {"id": 110, "params": ["1GULoCDnGjhfSWzHs6zDzBxbKt9DR7uRbt"]}
469     ex = bitcoin.satoshi_exporter()
470     tx = ex.load_transaction(txdat)
471     time.sleep(4)
472     membuf.recv_tx(tx, store_tx)
473
474     raw_input()
475     address = "1Jqu2PVGDvNv4La113hgCJsvRUCDb3W65D", "1GULoCDnGjhfSWzHs6zDzBxbKt9DR7uRbt"
476     #address = "1Pbn3DLXfjqF1fFV9YPdvpvyzejZwkHhZE"
477     print "Looking up", address
478     payment_history(chain, txpool, membuf, address[0], finish)
479     #payment_history(chain, txpool, membuf, address[1], finish)
480     raw_input()
481     print "Stopping..."