C++ core with working memory pool
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
4 import threading
5 import time
6
7 import history1 as history
8 import membuf
9
10 class HistoryCache:
11
12     def __init__(self):
13         self.lock = threading.Lock()
14         self.cache = {}
15
16     def store(self, address, result):
17         with self.lock:
18             self.cache[address] = result
19
20     def fetch(self, address):
21         try:
22             with self.lock:
23                 return self.cache[address]
24         except KeyError:
25             return None
26
27     def clear(self, addresses):
28         with self.lock:
29             for address in addresses:
30                 if self.cache.has_key(address):
31                     del self.cache[address]
32
33 class MonitorAddress:
34
35     def __init__(self, processor, cache, backend):
36         self.processor = processor
37         self.cache = cache
38         self.backend = backend
39         self.lock = threading.Lock()
40         # key is hash:index, value is address
41         self.monitor_output = {}
42         # key is address
43         self.monitor_address = set()
44         # affected
45         self.affected = {}
46
47         backend.memory_buffer.set_handles(self.tx_stored, self.tx_confirmed)
48
49     def monitor(self, address, result):
50         for info in result:
51             if not info.has_key("raw_output_script"):
52                 continue
53             assert info["is_input"] == 0
54             tx_hash = info["tx_hash"]
55             output_index = info["index"]
56             outpoint = "%s:%s" % (tx_hash, output_index)
57             with self.lock:
58                 self.monitor_output[outpoint] = address
59         with self.lock:
60             self.monitor_address.add(address)
61
62     def unpack(self, tx):
63         tx_hash = bitcoin.hash_transaction(tx)
64         previous_outputs = []
65         for input in tx.inputs:
66             prevout = input.previous_output
67             prevout = "%s:%s" % (prevout.hash, prevout.index)
68             previous_outputs.append(prevout)
69         addrs = []
70         for output_index, output in enumerate(tx.outputs):
71             address = bitcoin.payment_address()
72             if address.extract(output.output_script):
73                 addrs.append((output_index, str(address)))
74         return tx_hash, previous_outputs, addrs
75
76     def tx_stored(self, tx):
77         affected_addrs = set()
78         tx_hash, previous_outputs, addrs = self.unpack(tx)
79         for prevout in previous_outputs:
80             with self.lock:
81                 if self.monitor_output.has_key(prevout):
82                     affected_addrs.add(self.monitor_output[prevout])
83         for idx, address in addrs:
84             with self.lock:
85                 if address in self.monitor_address:
86                     affected_addrs.add(address)
87         with self.lock:
88             self.affected[tx_hash] = affected_addrs
89         self.cache.clear(affected_addrs)
90         self.notify(affected_addrs)
91
92     def tx_confirmed(self, tx_desc):
93         tx_hash, previous_outputs, addrs = self.unpack(tx)
94         with self.lock:
95             affected_addrs = self.affected[tx_hash]
96             del self.affected[tx_hash]
97         self.cache.clear(affected_addrs)
98         self.notify(affected_addrs)
99         # add new outputs to monitor
100         for idx, address in addrs:
101             outpoint = "%s:%s" % (tx_hash, idx)
102             with self.lock:
103                 if address in affected_addrs:
104                     self.monitor_output[outpoint] = address
105         # delete spent outpoints
106         for prevout in previous_outputs:
107             with self.lock:
108                 if self.monitor_output.has_key(prevout):
109                     del self.monitor_output[prevout]
110
111     def notify(self, affected_addrs):
112         templ_response = {"id": None,
113                           "method": "blockchain.address.subscribe",
114                           "params": []}
115         service = self.backend.mempool_service
116         chain = self.backend.blockchain
117         txpool = self.backend.transaction_pool
118         memory_buff = self.backend.memory_buffer
119         for address in affected_addrs:
120             response = templ_response.copy()
121             response["params"].append(address)
122             history.payment_history(service, chain, txpool, memory_buff,
123                 address, bind(self.send_notify, _1, _2, response))
124
125     def mempool_n(self, result):
126         assert result is not None
127         if len(result) == 0:
128             return None
129         # mempool:n status
130         # Order by time, grab last item (latest)
131         last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
132         if last_info["block_hash"] == "mempool":
133             last_id = "mempool:%s" % len(result)
134         else:
135             last_id = last_info["block_hash"]
136         return last_id
137
138     def send_notify(self, ec, result, response):
139         if ec:
140             print "Error: Monitor.send_notify()", ec
141             return
142         response["params"].append(self.mempool_n(result))
143         self.processor.push_response(response)
144
145 class Backend:
146
147     def __init__(self):
148         # Create 3 thread-pools each with 1 thread
149         self.network_service = bitcoin.async_service(1)
150         self.disk_service = bitcoin.async_service(1)
151         self.mempool_service = bitcoin.async_service(1)
152
153         self.hosts = bitcoin.hosts(self.network_service)
154         self.handshake = bitcoin.handshake(self.network_service)
155         self.network = bitcoin.network(self.network_service)
156         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
157                                          self.handshake, self.network)
158
159         db_prefix = "/home/genjix/libbitcoin/database"
160         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
161                                                  self.blockchain_started)
162         self.poller = bitcoin.poller(self.mempool_service, self.blockchain)
163         self.transaction_pool = \
164             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
165
166         self.protocol.subscribe_channel(self.monitor_tx)
167         self.session = \
168             bitcoin.session(self.network_service, self.hosts, self.handshake,
169                             self.network, self.protocol, self.blockchain,
170                             self.poller, self.transaction_pool)
171         self.session.start(self.handle_start)
172
173         self.memory_buffer = \
174             membuf.memory_buffer(self.mempool_service.internal_ptr,
175                                  self.blockchain.internal_ptr,
176                                  self.transaction_pool.internal_ptr)
177
178     def handle_start(self, ec):
179         if ec:
180             print "Error starting backend:", ec
181
182     def blockchain_started(self, ec, chain):
183         print "Blockchain initialisation:", ec
184
185     def stop(self):
186         self.session.stop(self.handle_stop)
187
188     def handle_stop(self, ec):
189         if ec:
190             print "Error stopping backend:", ec
191         print "Stopped backend"
192
193     def monitor_tx(self, node):
194         # We will be notified here when connected to new bitcoin nodes
195         # Here we subscribe to new transactions from them which we
196         # add to the transaction_pool. That way we can track which
197         # transactions we are interested in.
198         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
199         # Re-subscribe to next new node
200         self.protocol.subscribe_channel(self.monitor_tx)
201
202     def recv_tx(self, ec, tx, node):
203         if ec:
204             print "Error with new transaction:", ec
205             return
206         tx_hash = bitcoin.hash_transaction(tx)
207         self.memory_buffer.receive(tx, bind(self.store_tx, _1, tx_hash))
208         # Re-subscribe to new transactions from node
209         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
210
211     def store_tx(self, ec, tx_hash):
212         if ec:
213             print "Error storing memory pool transaction", tx_hash, ec
214         else:
215             print "Accepted transaction", tx_hash
216
217 class GhostValue:
218
219     def __init__(self):
220         self.event = threading.Event()
221         self.value = None
222
223     def get(self):
224         self.event.wait()
225         return self.value
226
227     def set(self, value):
228         self.value = value
229         self.event.set()
230
231 class NumblocksSubscribe:
232
233     def __init__(self, backend, processor):
234         self.backend = backend
235         self.processor = processor
236         self.lock = threading.Lock()
237         self.backend.blockchain.subscribe_reorganize(self.reorganize)
238         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
239         self.latest = GhostValue()
240
241     def set_last_depth(self, ec, last_depth):
242         if ec:
243             print "Error retrieving last depth", ec
244         else:
245             self.latest.set(last_depth)
246
247     def reorganize(self, ec, fork_point, arrivals, replaced):
248         latest = fork_point + len(arrivals)
249         self.latest.set(latest)
250         response = {"id": None, "method": "blockchain.numblocks.subscribe",
251                     "result": latest}
252         self.processor.push_response(response)
253         self.backend.blockchain.subscribe_reorganize(self.reorganize)
254
255     def subscribe(self, request):
256         latest = self.latest.get()
257         response = {"id": request["id"],
258                     "method": "blockchain.numblocks.subscribe",
259                     "result": latest,
260                     "error": None}
261         self.processor.push_response(response)
262
263 class AddressGetHistory:
264
265     def __init__(self, backend, processor):
266         self.backend = backend
267         self.processor = processor
268
269     def get(self, request):
270         address = str(request["params"][0])
271         service = self.backend.mempool_service
272         chain = self.backend.blockchain
273         txpool = self.backend.transaction_pool
274         memory_buff = self.backend.memory_buffer
275         history.payment_history(service, chain, txpool, memory_buff,
276             address, bind(self.respond, _1, _2, request))
277
278     def respond(self, ec, result, request):
279         if ec:
280             response = {"id": request["id"], "result": None,
281                         "error": {"message": str(ec), "code": -4}}
282         else:
283             response = {"id": request["id"], "result": result, "error": None}
284         self.processor.push_response(response)
285
286 class AddressSubscribe:
287
288     def __init__(self, backend, processor, cache, monitor):
289         self.backend = backend
290         self.processor = processor
291         self.cache = cache
292         self.monitor = monitor
293
294     def subscribe(self, request):
295         address = str(request["params"][0])
296         service = self.backend.mempool_service
297         chain = self.backend.blockchain
298         txpool = self.backend.transaction_pool
299         memory_buff = self.backend.memory_buffer
300         history.payment_history(service, chain, txpool, memory_buff,
301             address, bind(self.construct, _1, _2, request))
302
303     def construct(self, ec, result, request):
304         if ec:
305             response = {"id": request["id"], "result": None,
306                         "error": {"message": str(ec), "code": -4}}
307             self.processor.push_response(response)
308             return
309         last_id = self.monitor.mempool_n(result)
310         response = {"id": request["id"], "result": last_id, "error": None}
311         self.processor.push_response(response)
312         address = request["params"][0]
313         self.monitor.monitor(address, result)
314         # Cache result for get_history
315         self.cache.store(address, result)
316
317     def fetch_cached(self, request):
318         address = request["params"][0]
319         cached = self.cache.fetch(address)
320         if cached is None:
321             return False
322         response = {"id": request["id"], "result": cached, "error": None}
323         self.processor.push_response(response)
324         return True
325
326 class BlockchainProcessor(Processor):
327
328     def __init__(self, config):
329         Processor.__init__(self)
330         cache = HistoryCache()
331         self.backend = Backend()
332         monitor = MonitorAddress(self, cache, self.backend)
333         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
334         self.address_get_history = AddressGetHistory(self.backend, self)
335         self.address_subscribe = \
336             AddressSubscribe(self.backend, self, cache, monitor)
337
338     def stop(self):
339         self.backend.stop()
340
341     def process(self, request):
342         print "New request (lib)", request
343         if request["method"] == "blockchain.numblocks.subscribe":
344             self.numblocks_subscribe.subscribe(request)
345         elif request["method"] == "blockchain.address.subscribe":
346             self.address_subscribe.subscribe(request)
347         elif request["method"] == "blockchain.address.get_history":
348             if not self.address_subscribe.fetch_cached(request):
349                 self.address_get_history.get(request)
350         elif request["method"] == "blockchain.transaction.broadcast":
351             self.broadcast_transaction(request)
352
353     def broadcast_transaction(self, request):
354         raw_tx = bitcoin.data_chunk(str(request["params"]))
355         exporter = bitcoin.satoshi_exporter()
356         try:
357             tx = exporter.load_transaction(raw_tx)
358         except RuntimeError:
359             response = {"id": request["id"], "result": None,
360                         "error": {"message": 
361                             "Exception while parsing the transaction data.",
362                             "code": -4}}
363         else:
364             self.backend.protocol.broadcast_transaction(tx)
365             tx_hash = str(bitcoin.hash_transaction(tx))
366             response = {"id": request["id"], "result": tx_hash, "error": None}
367         self.push_response(response)
368
369     def run(self):
370         print "Warning: pre-alpha prototype. Full of bugs."
371         while not self.shared.stopped():
372             time.sleep(1)
373