Fix reorg issues
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import threading
2 import time
3
4 import bitcoin
5 from bitcoin import bind, _1, _2, _3
6
7 from processor import Processor
8 import history1 as history
9 import membuf
10
11
12 class HistoryCache:
13
14     def __init__(self):
15         self.lock = threading.Lock()
16         self.cache = {}
17
18     def store(self, address, result):
19         with self.lock:
20             self.cache[address] = result
21
22     def fetch(self, address):
23         try:
24             with self.lock:
25                 return self.cache[address]
26         except KeyError:
27             return None
28
29     def clear(self, addresses):
30         with self.lock:
31             for address in addresses:
32                 if address in self.cache:
33                     del self.cache[address]
34
35
36 class MonitorAddress:
37
38     def __init__(self, processor, cache, backend):
39         self.processor = processor
40         self.cache = cache
41         self.backend = backend
42         self.lock = threading.Lock()
43         # key is hash:index, value is address
44         self.monitor_output = {}
45         # key is address
46         self.monitor_address = set()
47
48         backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
49
50     def monitor(self, address, result):
51         for info in result:
52             if "raw_output_script" not in info:
53                 continue
54             assert info["is_input"] == 0
55             tx_hash = info["tx_hash"]
56             output_index = info["index"]
57             outpoint = "%s:%s" % (tx_hash, output_index)
58             with self.lock:
59                 self.monitor_output[outpoint] = address
60         with self.lock:
61             self.monitor_address.add(address)
62
63     def unpack(self, tx):
64         tx_hash = bitcoin.hash_transaction(tx)
65         previous_outputs = []
66         for input in tx.inputs:
67             prevout = input.previous_output
68             prevout = "%s:%s" % (prevout.hash, prevout.index)
69             previous_outputs.append(prevout)
70         addrs = []
71         for output_index, output in enumerate(tx.outputs):
72             address = bitcoin.payment_address()
73             if address.extract(output.output_script):
74                 addrs.append((output_index, str(address)))
75         return tx_hash, previous_outputs, addrs
76
77     def effect_notify(self, tx, delete_outs):
78         affected_addrs = set()
79         tx_hash, previous_outputs, addrs = self.unpack(tx)
80         for prevout in previous_outputs:
81             try:
82                 with self.lock:
83                     affected_addrs.add(self.monitor_output[prevout])
84                     if delete_outs:
85                         del self.monitor_output[prevout]
86             except KeyError:
87                 pass
88         for idx, address in addrs:
89             with self.lock:
90                 if address in self.monitor_address:
91                     affected_addrs.add(address)
92         self.cache.clear(affected_addrs)
93         self.notify(affected_addrs)
94         # Used in confirmed txs
95         return tx_hash, addrs, affected_addrs
96
97     def tx_stored(self, tx):
98         self.effect_notify(tx, False)
99
100     def tx_confirmed(self, tx):
101         tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
102         # add new outputs to monitor
103         for idx, address in addrs:
104             outpoint = "%s:%s" % (tx_hash, idx)
105             if address in affected_addrs:
106                 with self.lock:
107                     self.monitor_output[outpoint] = address
108
109     def notify(self, affected_addrs):
110         service = self.backend.mempool_service
111         chain = self.backend.blockchain
112         txpool = self.backend.transaction_pool
113         memory_buff = self.backend.memory_buffer
114         for address in affected_addrs:
115             response = {"id": None,
116                         "method": "blockchain.address.subscribe",
117                         "params": [str(address)]}
118             history.payment_history(service, chain, txpool, memory_buff, address,
119                                     bind(self.send_notify, _1, _2, response))
120
121     def mempool_n(self, result):
122         assert result is not None
123         if len(result) == 0:
124             return None
125         # mempool:n status
126         # Order by time, grab last item (latest)
127         last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
128         if last_info["block_hash"] == "mempool":
129             last_id = "mempool:%s" % len(result)
130         else:
131             last_id = last_info["block_hash"]
132         return last_id
133
134     def send_notify(self, ec, result, response):
135         if ec:
136             print "Error: Monitor.send_notify()", ec
137             return
138         assert len(response["params"]) == 1
139         response["params"].append(self.mempool_n(result))
140         self.processor.push_response(response)
141
142
143 class Backend:
144
145     def __init__(self):
146         # Create 3 thread-pools each with 1 thread
147         self.network_service = bitcoin.async_service(1)
148         self.disk_service = bitcoin.async_service(1)
149         self.mempool_service = bitcoin.async_service(1)
150
151         self.hosts = bitcoin.hosts(self.network_service)
152         self.handshake = bitcoin.handshake(self.network_service)
153         self.network = bitcoin.network(self.network_service)
154         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
155                                          self.handshake, self.network)
156
157         db_prefix = "/home/genjix/libbitcoin/database"
158         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
159                                                  self.blockchain_started)
160         self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
161         self.transaction_pool = \
162             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
163
164         self.protocol.subscribe_channel(self.monitor_tx)
165         self.session = \
166             bitcoin.session(self.network_service, self.hosts, self.handshake,
167                             self.network, self.protocol, self.blockchain,
168                             self.poller, self.transaction_pool)
169         self.session.start(self.handle_start)
170
171         self.memory_buffer = \
172             membuf.memory_buffer(self.mempool_service.internal_ptr,
173                                  self.blockchain.internal_ptr,
174                                  self.transaction_pool.internal_ptr)
175
176     def handle_start(self, ec):
177         if ec:
178             print "Error starting backend:", ec
179
180     def blockchain_started(self, ec, chain):
181         print "Blockchain initialisation:", ec
182
183     def stop(self):
184         self.session.stop(self.handle_stop)
185
186     def handle_stop(self, ec):
187         if ec:
188             print "Error stopping backend:", ec
189         print "Stopped backend"
190
191     def monitor_tx(self, node):
192         # We will be notified here when connected to new bitcoin nodes
193         # Here we subscribe to new transactions from them which we
194         # add to the transaction_pool. That way we can track which
195         # transactions we are interested in.
196         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
197         # Re-subscribe to next new node
198         self.protocol.subscribe_channel(self.monitor_tx)
199
200     def recv_tx(self, ec, tx, node):
201         if ec:
202             print "Error with new transaction:", ec
203             return
204         tx_hash = bitcoin.hash_transaction(tx)
205         self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
206         # Re-subscribe to new transactions from node
207         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
208
209     def store_tx(self, ec, tx_hash):
210         if ec:
211             print "Error storing memory pool transaction", tx_hash, ec
212         else:
213             print "Accepted transaction", tx_hash
214
215
216 class GhostValue:
217
218     def __init__(self):
219         self.event = threading.Event()
220         self.value = None
221
222     def get(self):
223         self.event.wait()
224         return self.value
225
226     def set(self, value):
227         self.value = value
228         self.event.set()
229
230
231 class NumblocksSubscribe:
232
233     def __init__(self, backend, processor):
234         self.backend = backend
235         self.processor = processor
236         self.lock = threading.Lock()
237         self.backend.blockchain.subscribe_reorganize(self.reorganize)
238         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
239         self.latest = GhostValue()
240
241     def set_last_depth(self, ec, last_depth):
242         if ec:
243             print "Error retrieving last depth", ec
244         else:
245             self.latest.set(last_depth)
246
247     def reorganize(self, ec, fork_point, arrivals, replaced):
248         latest = fork_point + len(arrivals)
249         self.latest.set(latest)
250         response = {"id": None, "method": "blockchain.numblocks.subscribe",
251                     "params": [latest]}
252         self.processor.push_response(response)
253         self.backend.blockchain.subscribe_reorganize(self.reorganize)
254
255     def subscribe(self, request):
256         latest = self.latest.get()
257         response = {"id": request["id"],
258                     "result": latest,
259                     "error": None}
260         self.processor.push_response(response)
261
262
263 class AddressGetHistory:
264
265     def __init__(self, backend, processor):
266         self.backend = backend
267         self.processor = processor
268
269     def get(self, request):
270         address = str(request["params"][0])
271         service = self.backend.mempool_service
272         chain = self.backend.blockchain
273         txpool = self.backend.transaction_pool
274         memory_buff = self.backend.memory_buffer
275         history.payment_history(service, chain, txpool, memory_buff, address,
276                                 bind(self.respond, _1, _2, request))
277
278     def respond(self, ec, result, request):
279         if ec:
280             response = {"id": request["id"], "result": None,
281                         "error": {"message": str(ec), "code": -4}}
282         else:
283             response = {"id": request["id"], "result": result, "error": None}
284         self.processor.push_response(response)
285
286
287 class AddressSubscribe:
288
289     def __init__(self, backend, processor, cache, monitor):
290         self.backend = backend
291         self.processor = processor
292         self.cache = cache
293         self.monitor = monitor
294
295     def subscribe(self, request):
296         address = str(request["params"][0])
297         service = self.backend.mempool_service
298         chain = self.backend.blockchain
299         txpool = self.backend.transaction_pool
300         memory_buff = self.backend.memory_buffer
301         history.payment_history(service, chain, txpool, memory_buff, address,
302                                 bind(self.construct, _1, _2, request))
303
304     def construct(self, ec, result, request):
305         if ec:
306             response = {"id": request["id"], "result": None,
307                         "error": {"message": str(ec), "code": -4}}
308             self.processor.push_response(response)
309             return
310         last_id = self.monitor.mempool_n(result)
311         response = {"id": request["id"], "result": last_id, "error": None}
312         self.processor.push_response(response)
313         address = request["params"][0]
314         self.monitor.monitor(address, result)
315         # Cache result for get_history
316         self.cache.store(address, result)
317
318     def fetch_cached(self, request):
319         address = request["params"][0]
320         cached = self.cache.fetch(address)
321         if cached is None:
322             return False
323         response = {"id": request["id"], "result": cached, "error": None}
324         self.processor.push_response(response)
325         return True
326
327
328 class BlockchainProcessor(Processor):
329
330     def __init__(self, config):
331         Processor.__init__(self)
332         cache = HistoryCache()
333         self.backend = Backend()
334         monitor = MonitorAddress(self, cache, self.backend)
335         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
336         self.address_get_history = AddressGetHistory(self.backend, self)
337         self.address_subscribe = \
338             AddressSubscribe(self.backend, self, cache, monitor)
339
340     def stop(self):
341         self.backend.stop()
342
343     def process(self, request):
344         print "New request (lib)", request
345         if request["method"] == "blockchain.numblocks.subscribe":
346             self.numblocks_subscribe.subscribe(request)
347         elif request["method"] == "blockchain.address.subscribe":
348             self.address_subscribe.subscribe(request)
349         elif request["method"] == "blockchain.address.get_history":
350             if not self.address_subscribe.fetch_cached(request):
351                 self.address_get_history.get(request)
352         elif request["method"] == "blockchain.transaction.broadcast":
353             self.broadcast_transaction(request)
354
355     def broadcast_transaction(self, request):
356         raw_tx = bitcoin.data_chunk(str(request["params"][0]))
357         exporter = bitcoin.satoshi_exporter()
358         try:
359             tx = exporter.load_transaction(raw_tx)
360         except RuntimeError:
361             response = {
362                 "id": request["id"],
363                 "result": None,
364                 "error": {
365                     "message": "Exception while parsing the transaction data.",
366                     "code": -4,
367                 }
368             }
369         else:
370             self.backend.protocol.broadcast_transaction(tx)
371             tx_hash = str(bitcoin.hash_transaction(tx))
372             response = {"id": request["id"], "result": tx_hash, "error": None}
373         self.push_response(response)