mempool subscribe works with outputs of transactions.
[electrum-server.git] / backends / libbitcoin / __init__.py
1 import bitcoin
2 from bitcoin import bind, _1, _2, _3
3 from processor import Processor
4 import threading
5 import time
6
7 import history 
8
9 class HistoryCache:
10
11     def __init__(self):
12         self.lock = threading.Lock()
13         self.cache = {}
14
15     def store(self, address, result):
16         with self.lock:
17             self.cache[address] = result
18
19     def fetch(self, address):
20         try:
21             with self.lock:
22                 return self.cache[address]
23         except KeyError:
24             return None
25
26     def clear(self, addresses):
27         with self.lock:
28             for address in addresses:
29                 if self.cache.has_key(address):
30                     del self.cache[address]
31
32 class MonitorAddress:
33
34     def __init__(self, processor, cache):
35         self.processor = processor
36         self.cache = cache
37         self.lock = threading.Lock()
38         # key is hash:index, value is address
39         self.monitor_output = {}
40         # key is address
41         self.monitor_address = set()
42         # affected
43         self.affected = {}
44
45     def monitor(self, address, result):
46         for info in result:
47             if not info.has_key("raw_output_script"):
48                 continue
49             assert info["is_input"] == 0
50             tx_hash = info["tx_hash"]
51             output_index = info["index"]
52             outpoint = "%s:%s" % (tx_hash, output_index)
53             with self.lock:
54                 self.monitor_output[outpoint] = address
55         with self.lock:
56             self.monitor_address.add(address)
57
58     def tx_stored(self, tx_desc):
59         tx_hash, prevouts, addrs = tx_desc
60         affected_addrs = set()
61         for prevout_hash, prevout_index in prevouts:
62             prevout = "%s:%s" % (prevout_hash, prevout_index)
63             with self.lock:
64                 if self.monitor_output.has_key(prevout):
65                     affected_addrs.add(self.monitor_output[prevout])
66         for idx, address in addrs:
67             with self.lock:
68                 if address in self.monitor_address:
69                     affected_addrs.add(address)
70         with self.lock:
71             self.affected[tx_hash] = affected_addrs
72         self.cache.clear(affected_addrs)
73         self.notify(affected_addrs)
74
75     def tx_confirmed(self, tx_desc):
76         tx_hash, prevouts, addrs = tx_desc
77         with self.lock:
78             affected_addrs = self.affected[tx_hash]
79             del self.affected[tx_hash]
80         self.cache.clear(affected_addrs)
81         self.notify(affected_addrs)
82         # add new outputs to monitor
83         for idx, address in addrs:
84             outpoint = "%s:%s" % (tx_hash, idx)
85             with self.lock:
86                 if address in affected_addrs:
87                     self.monitor_output[outpoint] = address
88         # delete spent outpoints
89         for prevout_hash, prevout_index in prevouts:
90             prevout = "%s:%s" % (prevout_hash, prevout_index)
91             with self.lock:
92                 if self.monitor_output.has_key(prevout):
93                     del self.monitor_output[prevout]
94
95     def notify(self, affected_addrs):
96         templ_response = {"id": None,
97                           "method": "blockchain.address.subscribe",
98                           "params": []}
99         chain = self.backend.blockchain
100         txpool = self.backend.transaction_pool
101         membuf = self.backend.pool_buffer
102         for address in affected_addrs:
103             response = templ_response.copy()
104             response["params"].append(address)
105             history.payment_history(chain, txpool, membuf, address,
106                 bind(self.send_notify, _1, response))
107
108     def mempool_n(self, result):
109         assert result is not None
110         if len(result) == 0:
111             return None
112         # mempool:n status
113         # Order by time, grab last item (latest)
114         last_info = sorted(result, key=lambda k: k['timestamp'])[-1]
115         if last_info["block_hash"] == "mempool":
116             last_id = "mempool:%s" % len(result)
117         else:
118             last_id = last_info["block_hash"]
119         return last_id
120
121     def send_notify(self, result, response):
122         response["params"].append(self.mempool_n(result))
123         self.processor.push_response(response)
124
125 class Backend:
126
127     def __init__(self, monitor):
128         # Create 3 thread-pools each with 1 thread
129         self.network_service = bitcoin.async_service(1)
130         self.disk_service = bitcoin.async_service(1)
131         self.mempool_service = bitcoin.async_service(1)
132
133         self.hosts = bitcoin.hosts(self.network_service)
134         self.handshake = bitcoin.handshake(self.network_service)
135         self.network = bitcoin.network(self.network_service)
136         self.protocol = bitcoin.protocol(self.network_service, self.hosts,
137                                          self.handshake, self.network)
138
139         db_prefix = "/home/genjix/libbitcoin/database"
140         self.blockchain = bitcoin.bdb_blockchain(self.disk_service, db_prefix,
141                                                  self.blockchain_started)
142         self.poller = bitcoin.poller(self.blockchain)
143         self.transaction_pool = \
144             bitcoin.transaction_pool(self.mempool_service, self.blockchain)
145
146         self.protocol.subscribe_channel(self.monitor_tx)
147         self.session = \
148             bitcoin.session(self.network_service, self.hosts, self.handshake,
149                             self.network, self.protocol, self.blockchain,
150                             self.poller, self.transaction_pool)
151         self.session.start(self.handle_start)
152
153         self.pool_buffer = \
154             history.MemoryPoolBuffer(self.transaction_pool,
155                                      self.blockchain, monitor)
156
157     def handle_start(self, ec):
158         if ec:
159             print "Error starting backend:", ec
160
161     def blockchain_started(self, ec, chain):
162         print "Blockchain initialisation:", ec
163
164     def stop(self):
165         self.session.stop(self.handle_stop)
166
167     def handle_stop(self, ec):
168         if ec:
169             print "Error stopping backend:", ec
170         print "Stopped backend"
171
172     def monitor_tx(self, node):
173         # We will be notified here when connected to new bitcoin nodes
174         # Here we subscribe to new transactions from them which we
175         # add to the transaction_pool. That way we can track which
176         # transactions we are interested in.
177         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
178         # Re-subscribe to next new node
179         self.protocol.subscribe_channel(self.monitor_tx)
180
181     def recv_tx(self, ec, tx, node):
182         if ec:
183             print "Error with new transaction:", ec
184             return
185         tx_hash = bitcoin.hash_transaction(tx)
186         self.pool_buffer.recv_tx(tx, bind(self.store_tx, _1, tx_hash))
187         # Re-subscribe to new transactions from node
188         node.subscribe_transaction(bind(self.recv_tx, _1, _2, node))
189
190     def store_tx(self, ec, tx_hash):
191         if ec:
192             print "Error storing memory pool transaction", tx_hash, ec
193         else:
194             print "Accepted transaction", tx_hash
195
196 class GhostValue:
197
198     def __init__(self):
199         self.event = threading.Event()
200         self.value = None
201
202     def get(self):
203         self.event.wait()
204         return self.value
205
206     def set(self, value):
207         self.value = value
208         self.event.set()
209
210 class NumblocksSubscribe:
211
212     def __init__(self, backend, processor):
213         self.backend = backend
214         self.processor = processor
215         self.lock = threading.Lock()
216         self.backend.blockchain.subscribe_reorganize(self.reorganize)
217         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
218         self.latest = GhostValue()
219
220     def set_last_depth(self, ec, last_depth):
221         if ec:
222             print "Error retrieving last depth", ec
223         else:
224             self.latest.set(last_depth)
225
226     def reorganize(self, ec, fork_point, arrivals, replaced):
227         latest = fork_point + len(arrivals)
228         self.latest.set(latest)
229         response = {"id": None, "method": "blockchain.numblocks.subscribe",
230                     "result": latest}
231         self.processor.push_response(response)
232         self.backend.blockchain.subscribe_reorganize(self.reorganize)
233
234     def subscribe(self, request):
235         latest = self.latest.get()
236         response = {"id": request["id"],
237                     "method": "blockchain.numblocks.subscribe",
238                     "result": latest,
239                     "error": None}
240         self.processor.push_response(response)
241
242 class AddressGetHistory:
243
244     def __init__(self, backend, processor):
245         self.backend = backend
246         self.processor = processor
247
248     def get(self, request):
249         address = str(request["params"][0])
250         chain = self.backend.blockchain
251         txpool = self.backend.transaction_pool
252         membuf = self.backend.pool_buffer
253         history.payment_history(chain, txpool, membuf, address,
254             bind(self.respond, _1, request))
255
256     def respond(self, result, request):
257         if result is None:
258             response = {"id": request["id"], "result": None,
259                         "error": {"message": "Error", "code": -4}}
260         else:
261             response = {"id": request["id"], "result": result, "error": None}
262         self.processor.push_response(response)
263
264 class AddressSubscribe:
265
266     def __init__(self, backend, processor, cache, monitor):
267         self.backend = backend
268         self.processor = processor
269         self.cache = cache
270         self.monitor = monitor
271
272         self.backend.pool_buffer.cheat = self
273
274     def subscribe(self, request):
275         address = str(request["params"][0])
276         chain = self.backend.blockchain
277         txpool = self.backend.transaction_pool
278         membuf = self.backend.pool_buffer
279         history.payment_history(chain, txpool, membuf, address,
280             bind(self.construct, _1, request))
281
282     def construct(self, result, request):
283         if result is None:
284             response = {"id": request["id"], "result": None,
285                         "error": {"message": "Error", "code": -4}}
286             return
287         last_id = self.monitor.mempool_n(result)
288         response = {"id": request["id"], "result": last_id, "error": None}
289         self.processor.push_response(response)
290         address = request["params"][0]
291         self.monitor.monitor(address, result)
292         # Cache result for get_history
293         self.cache.store(address, result)
294
295     def fetch_cached(self, request):
296         address = request["params"][0]
297         cached = self.cache.fetch(address)
298         if cached is None:
299             return False
300         response = {"id": request["id"], "result": cached, "error": None}
301         self.processor.push_response(response)
302         return True
303
304 class BlockchainProcessor(Processor):
305
306     def __init__(self, config):
307         Processor.__init__(self)
308         cache = HistoryCache()
309         monitor = MonitorAddress(self, cache)
310         self.backend = Backend(monitor)
311         monitor.backend = self.backend
312         self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
313         self.address_get_history = AddressGetHistory(self.backend, self)
314         self.address_subscribe = \
315             AddressSubscribe(self.backend, self, cache, monitor)
316
317     def stop(self):
318         self.backend.stop()
319
320     def process(self, request):
321         print "New request (lib)", request
322         if request["method"] == "blockchain.numblocks.subscribe":
323             self.numblocks_subscribe.subscribe(request)
324         elif request["method"] == "blockchain.address.subscribe":
325             self.address_subscribe.subscribe(request)
326         elif request["method"] == "blockchain.address.get_history":
327             if not self.address_subscribe.fetch_cached(request):
328                 self.address_get_history.get(request)
329         elif request["method"] == "blockchain.transaction.broadcast":
330             self.broadcast_transaction(request)
331
332     def broadcast_transaction(self, request):
333         raw_tx = bitcoin.data_chunk(str(request["params"]))
334         exporter = bitcoin.satoshi_exporter()
335         try:
336             tx = exporter.load_transaction(raw_tx)
337         except RuntimeError:
338             response = {"id": request["id"], "result": None,
339                         "error": {"message": 
340                             "Exception while parsing the transaction data.",
341                             "code": -4}}
342         else:
343             self.backend.protocol.broadcast_transaction(tx)
344             tx_hash = str(bitcoin.hash_transaction(tx))
345             response = {"id": request["id"], "result": tx_hash, "error": None}
346         self.push_response(response)
347
348     def run(self):
349         print "Warning: pre-alpha prototype. Full of bugs."
350         while not self.shared.stopped():
351             time.sleep(1)
352