20eae9b8fc0af27b1960ebe6746b4b2a9aa939ba
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
4 import threading
5 import time
6
7 import history1 as history
8 import membuf
9
10 class HistoryCache:
11
12     def __init__(self):
13         self.lock = threading.Lock()
14         self.cache = {}
15
16     def store(self, address, result):
17         with self.lock:
18             self.cache[address] = result
19
20     def fetch(self, address):
21         try:
22             with self.lock:
23                 return self.cache[address]
24         except KeyError:
25             return None
26
27     def clear(self, addresses):
28         with self.lock:
29             for address in addresses:
30                 if self.cache.has_key(address):
31                     del self.cache[address]
32
33 class MonitorAddress:
34
35     def __init__(self, processor, cache, backend):
36         self.processor = processor
37         self.cache = cache
38         self.backend = backend
39         self.lock = threading.Lock()
40         # key is hash:index, value is address
41         self.monitor_output = {}
42         # key is address
43         self.monitor_address = set()
44
45         backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
46
47     def monitor(self, address, result):
48         for info in result:
49             if not info.has_key("raw_output_script"):
50                 continue
51             assert info["is_input"] == 0
52             tx_hash = info["tx_hash"]
53             output_index = info["index"]
54             outpoint = "%s:%s" % (tx_hash, output_index)
55             with self.lock:
56                 self.monitor_output[outpoint] = address
57         with self.lock:
58             self.monitor_address.add(address)
59
60     def unpack(self, tx):
61         tx_hash = bitcoin.hash_transaction(tx)
62         previous_outputs = []
63         for input in tx.inputs:
64             prevout = input.previous_output
65             prevout = "%s:%s" % (prevout.hash, prevout.index)
66             previous_outputs.append(prevout)
67         addrs = []
68         for output_index, output in enumerate(tx.outputs):
69             address = bitcoin.payment_address()
70             if address.extract(output.output_script):
71                 addrs.append((output_index, str(address)))
72         return tx_hash, previous_outputs, addrs
73
74     def effect_notify(self, tx, delete_outs):
75         affected_addrs = set()
76         tx_hash, previous_outputs, addrs = self.unpack(tx)
77         for prevout in previous_outputs:
78             try:
79                 with self.lock:
80                     affected_addrs.add(self.monitor_output[prevout])
81                     if delete_outs:
82                         del self.monitor_output[prevout]
83             except KeyError:
84                 pass
85         for idx, address in addrs:
86             with self.lock:
87                 if address in self.monitor_address:
88                     affected_addrs.add(address)
89         self.cache.clear(affected_addrs)
90         self.notify(affected_addrs)
91         # Used in confirmed txs
92         return tx_hash, addrs, affected_addrs
93
94     def tx_stored(self, tx):
95         self.effect_notify(tx, False)
96
97     def tx_confirmed(self, tx):
98         tx_hash, addrs, affected_addrs = self.effect_notify(tx, True)
99         # add new outputs to monitor
100         for idx, address in addrs:
101             outpoint = "%s:%s" % (tx_hash, idx)
102             if address in affected_addrs:
103                 with self.lock:
104                     self.monitor_output[outpoint] = address
105
106     def notify(self, affected_addrs):
107         service = self.backend.mempool_service
108         chain = self.backend.blockchain
109         txpool = self.backend.transaction_pool
110         memory_buff = self.backend.memory_buffer
111         for address in affected_addrs:
112             response = {"id": None,
113                         "method": "blockchain.address.subscribe",
114                         "params": [str(address)]}
115             history.payment_history(service, chain, txpool, memory_buff,
116                 address, bind(self.send_notify, _1, _2, response))
117
118     def mempool_n(self, result):
119         assert result is not None
120         if len(result) == 0:
121             return None
122         # mempool:n status
123         # Order by time, grab last item (latest)
124         last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
125         if last_info["block_hash"] == "mempool":
126             last_id = "mempool:%s" % len(result)
127         else:
128             last_id = last_info["block_hash"]
129         return last_id
130
131     def send_notify(self, ec, result, response):
132         if ec:
133             print "Error: Monitor.send_notify()", ec
134             return
135         assert len(response["params"]) == 1
136         response["params"].append(self.mempool_n(result))
137         self.processor.push_response(response)
138
139 class Backend:
140
141     def __init__(self):
142         # Create 3 thread-pools each with 1 thread
143         self.network_service = bitcoin.async_service(1)
144         self.disk_service = bitcoin.async_service(1)
145         self.mempool_service = bitcoin.async_service(1)
146
147         self.hosts = bitcoin.hosts(self.network_service)
148         self.handshake = bitcoin.handshake(self.network_service)
149         self.network = bitcoin.network(self.network_service)
150         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
151                                          self.handshake, self.network)
152
153         db_prefix = "/home/genjix/libbitcoin/database"
154         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
155                                                  self.blockchain_started)
156         self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
157         self.transaction_pool = \
158             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
159
160         self.protocol.subscribe_channel(self.monitor_tx)
161         self.session = \
162             bitcoin.session(self.network_service, self.hosts, self.handshake,
163                             self.network, self.protocol, self.blockchain,
164                             self.poller, self.transaction_pool)
165         self.session.start(self.handle_start)
166
167         self.memory_buffer = \
168             membuf.memory_buffer(self.mempool_service.internal_ptr,
169                                  self.blockchain.internal_ptr,
170                                  self.transaction_pool.internal_ptr)
171
172     def handle_start(self, ec):
173         if ec:
174             print "Error starting backend:", ec
175
176     def blockchain_started(self, ec, chain):
177         print "Blockchain initialisation:", ec
178
179     def stop(self):
180         self.session.stop(self.handle_stop)
181
182     def handle_stop(self, ec):
183         if ec:
184             print "Error stopping backend:", ec
185         print "Stopped backend"
186
187     def monitor_tx(self, node):
188         # We will be notified here when connected to new bitcoin nodes
189         # Here we subscribe to new transactions from them which we
190         # add to the transaction_pool. That way we can track which
191         # transactions we are interested in.
192         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
193         # Re-subscribe to next new node
194         self.protocol.subscribe_channel(self.monitor_tx)
195
196     def recv_tx(self, ec, tx, node):
197         if ec:
198             print "Error with new transaction:", ec
199             return
200         tx_hash = bitcoin.hash_transaction(tx)
201         self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
202         # Re-subscribe to new transactions from node
203         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
204
205     def store_tx(self, ec, tx_hash):
206         if ec:
207             print "Error storing memory pool transaction", tx_hash, ec
208         else:
209             print "Accepted transaction", tx_hash
210
211 class GhostValue:
212
213     def __init__(self):
214         self.event = threading.Event()
215         self.value = None
216
217     def get(self):
218         self.event.wait()
219         return self.value
220
221     def set(self, value):
222         self.value = value
223         self.event.set()
224
225 class NumblocksSubscribe:
226
227     def __init__(self, backend, processor):
228         self.backend = backend
229         self.processor = processor
230         self.lock = threading.Lock()
231         self.backend.blockchain.subscribe_reorganize(self.reorganize)
232         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
233         self.latest = GhostValue()
234
235     def set_last_depth(self, ec, last_depth):
236         if ec:
237             print "Error retrieving last depth", ec
238         else:
239             self.latest.set(last_depth)
240
241     def reorganize(self, ec, fork_point, arrivals, replaced):
242         latest = fork_point + len(arrivals)
243         self.latest.set(latest)
244         response = {"id": None, "method": "blockchain.numblocks.subscribe",
245                     "params": [latest]}
246         self.processor.push_response(response)
247         self.backend.blockchain.subscribe_reorganize(self.reorganize)
248
249     def subscribe(self, request):
250         latest = self.latest.get()
251         response = {"id": request["id"],
252                     "result": latest,
253                     "error": None}
254         self.processor.push_response(response)
255
256 class AddressGetHistory:
257
258     def __init__(self, backend, processor):
259         self.backend = backend
260         self.processor = processor
261
262     def get(self, request):
263         address = str(request["params"][0])
264         service = self.backend.mempool_service
265         chain = self.backend.blockchain
266         txpool = self.backend.transaction_pool
267         memory_buff = self.backend.memory_buffer
268         history.payment_history(service, chain, txpool, memory_buff,
269             address, bind(self.respond, _1, _2, request))
270
271     def respond(self, ec, result, request):
272         if ec:
273             response = {"id": request["id"], "result": None,
274                         "error": {"message": str(ec), "code": -4}}
275         else:
276             response = {"id": request["id"], "result": result, "error": None}
277         self.processor.push_response(response)
278
279 class AddressSubscribe:
280
281     def __init__(self, backend, processor, cache, monitor):
282         self.backend = backend
283         self.processor = processor
284         self.cache = cache
285         self.monitor = monitor
286
287     def subscribe(self, request):
288         address = str(request["params"][0])
289         service = self.backend.mempool_service
290         chain = self.backend.blockchain
291         txpool = self.backend.transaction_pool
292         memory_buff = self.backend.memory_buffer
293         history.payment_history(service, chain, txpool, memory_buff,
294             address, bind(self.construct, _1, _2, request))
295
296     def construct(self, ec, result, request):
297         if ec:
298             response = {"id": request["id"], "result": None,
299                         "error": {"message": str(ec), "code": -4}}
300             self.processor.push_response(response)
301             return
302         last_id = self.monitor.mempool_n(result)
303         response = {"id": request["id"], "result": last_id, "error": None}
304         self.processor.push_response(response)
305         address = request["params"][0]
306         self.monitor.monitor(address, result)
307         # Cache result for get_history
308         self.cache.store(address, result)
309
310     def fetch_cached(self, request):
311         address = request["params"][0]
312         cached = self.cache.fetch(address)
313         if cached is None:
314             return False
315         response = {"id": request["id"], "result": cached, "error": None}
316         self.processor.push_response(response)
317         return True
318
319 class BlockchainProcessor(Processor):
320
321     def __init__(self, config):
322         Processor.__init__(self)
323         cache = HistoryCache()
324         self.backend = Backend()
325         monitor = MonitorAddress(self, cache, self.backend)
326         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
327         self.address_get_history = AddressGetHistory(self.backend, self)
328         self.address_subscribe = \
329             AddressSubscribe(self.backend, self, cache, monitor)
330
331     def stop(self):
332         self.backend.stop()
333
334     def process(self, request):
335         print "New request (lib)", request
336         if request["method"] == "blockchain.numblocks.subscribe":
337             self.numblocks_subscribe.subscribe(request)
338         elif request["method"] == "blockchain.address.subscribe":
339             self.address_subscribe.subscribe(request)
340         elif request["method"] == "blockchain.address.get_history":
341             if not self.address_subscribe.fetch_cached(request):
342                 self.address_get_history.get(request)
343         elif request["method"] == "blockchain.transaction.broadcast":
344             self.broadcast_transaction(request)
345
346     def broadcast_transaction(self, request):
347         raw_tx = bitcoin.data_chunk(str(request["params"][0]))
348         exporter = bitcoin.satoshi_exporter()
349         try:
350             tx = exporter.load_transaction(raw_tx)
351         except RuntimeError:
352             response = {"id": request["id"], "result": None,
353                         "error": {"message": 
354                             "Exception while parsing the transaction data.",
355                             "code": -4}}
356         else:
357             self.backend.protocol.broadcast_transaction(tx)
358             tx_hash = str(bitcoin.hash_transaction(tx))
359             response = {"id": request["id"], "result": tx_hash, "error": None}
360         self.push_response(response)
361
362     def run(self):
363         print "Warning: pre-alpha prototype. Full of bugs."
364         while not self.shared.stopped():
365             time.sleep(1)
366