hash tree of unspents
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_addresses = {}
42         self.mempool_hist = {}
43         self.mempool_hashes = set([])
44         self.mempool_lock = threading.Lock()
45
46         self.address_queue = Queue()
47
48         try:
49             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
50         except:
51             self.test_reorgs = False
52         self.storage = Storage(config, shared, self.test_reorgs)
53
54         self.dblock = threading.Lock()
55
56         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
57             config.get('bitcoind', 'user'),
58             config.get('bitcoind', 'password'),
59             config.get('bitcoind', 'host'),
60             config.get('bitcoind', 'port'))
61
62         while True:
63             try:
64                 self.bitcoind('getinfo')
65                 break
66             except:
67                 print_log('cannot contact bitcoind...')
68                 time.sleep(5)
69                 continue
70
71         self.sent_height = 0
72         self.sent_header = None
73
74         # catch_up headers
75         self.init_headers(self.storage.height)
76
77         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
78         while not shared.stopped() and not self.up_to_date:
79             try:
80                 time.sleep(1)
81             except:
82                 print "keyboard interrupt: stopping threads"
83                 shared.stop()
84                 sys.exit(0)
85
86         print_log("Blockchain is up to date.")
87         self.memorypool_update()
88         print_log("Memory pool initialized.")
89
90         threading.Timer(10, self.main_iteration).start()
91
92
93
94     def mtime(self, name):
95         now = time.time()
96         if name != '':
97             delta = now - self.now
98             t = self.mtimes.get(name, 0)
99             self.mtimes[name] = t + delta
100         self.now = now
101
102     def print_mtime(self):
103         s = ''
104         for k, v in self.mtimes.items():
105             s += k+':'+"%.2f"%v+' '
106         print_log(s)
107
108
109     def bitcoind(self, method, params=[]):
110         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
111         try:
112             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
113         except:
114             traceback.print_exc(file=sys.stdout)
115             self.shared.stop()
116
117         r = loads(respdata)
118         if r['error'] is not None:
119             raise BaseException(r['error'])
120         return r.get('result')
121
122
123     def block2header(self, b):
124         return {
125             "block_height": b.get('height'),
126             "version": b.get('version'),
127             "prev_block_hash": b.get('previousblockhash'),
128             "merkle_root": b.get('merkleroot'),
129             "timestamp": b.get('time'),
130             "bits": int(b.get('bits'), 16),
131             "nonce": b.get('nonce'),
132         }
133
134     def get_header(self, height):
135         block_hash = self.bitcoind('getblockhash', [height])
136         b = self.bitcoind('getblock', [block_hash])
137         return self.block2header(b)
138
139     def init_headers(self, db_height):
140         self.chunk_cache = {}
141         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
142
143         if os.path.exists(self.headers_filename):
144             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
145             if height > 0:
146                 prev_hash = self.hash_header(self.read_header(height))
147             else:
148                 prev_hash = None
149         else:
150             open(self.headers_filename, 'wb').close()
151             prev_hash = None
152             height = -1
153
154         if height < db_height:
155             print_log("catching up missing headers:", height, db_height)
156
157         try:
158             while height < db_height:
159                 height = height + 1
160                 header = self.get_header(height)
161                 if height > 1:
162                     assert prev_hash == header.get('prev_block_hash')
163                 self.write_header(header, sync=False)
164                 prev_hash = self.hash_header(header)
165                 if (height % 1000) == 0:
166                     print_log("headers file:", height)
167         except KeyboardInterrupt:
168             self.flush_headers()
169             sys.exit()
170
171         self.flush_headers()
172
173     def hash_header(self, header):
174         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
175
176     def read_header(self, block_height):
177         if os.path.exists(self.headers_filename):
178             with open(self.headers_filename, 'rb') as f:
179                 f.seek(block_height * 80)
180                 h = f.read(80)
181             if len(h) == 80:
182                 h = header_from_string(h)
183                 return h
184
185     def read_chunk(self, index):
186         with open(self.headers_filename, 'rb') as f:
187             f.seek(index*2016*80)
188             chunk = f.read(2016*80)
189         return chunk.encode('hex')
190
191     def write_header(self, header, sync=True):
192         if not self.headers_data:
193             self.headers_offset = header.get('block_height')
194
195         self.headers_data += header_to_string(header).decode('hex')
196         if sync or len(self.headers_data) > 40*100:
197             self.flush_headers()
198
199         with self.cache_lock:
200             chunk_index = header.get('block_height')/2016
201             if self.chunk_cache.get(chunk_index):
202                 self.chunk_cache.pop(chunk_index)
203
204     def pop_header(self):
205         # we need to do this only if we have not flushed
206         if self.headers_data:
207             self.headers_data = self.headers_data[:-40]
208
209     def flush_headers(self):
210         if not self.headers_data:
211             return
212         with open(self.headers_filename, 'rb+') as f:
213             f.seek(self.headers_offset*80)
214             f.write(self.headers_data)
215         self.headers_data = ''
216
217     def get_chunk(self, i):
218         # store them on disk; store the current chunk in memory
219         with self.cache_lock:
220             chunk = self.chunk_cache.get(i)
221             if not chunk:
222                 chunk = self.read_chunk(i)
223                 self.chunk_cache[i] = chunk
224
225         return chunk
226
227     def get_mempool_transaction(self, txid):
228         try:
229             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
230         except:
231             return None
232
233         vds = deserialize.BCDataStream()
234         vds.write(raw_tx.decode('hex'))
235         try:
236             return deserialize.parse_Transaction(vds, is_coinbase=False)
237         except:
238             print_log("ERROR: cannot parse", txid)
239             return None
240
241     def get_history(self, addr, cache_only=False):
242         with self.cache_lock:
243             hist = self.history_cache.get(addr)
244         if hist is not None:
245             return hist
246         if cache_only:
247             return -1
248
249         with self.dblock:
250             try:
251                 h = self.storage.get_history(str((addr)))
252                 hist = self.storage.deserialize(h)
253                 is_known = True
254             except:
255                 self.shared.stop()
256                 raise
257             if hist:
258                 is_known = True
259             else:
260                 hist = []
261                 is_known = False
262
263         # sort history, because redeeming transactions are next to the corresponding txout
264         hist.sort(key=lambda tup: tup[2])
265
266         # add memory pool
267         with self.mempool_lock:
268             for txid in self.mempool_hist.get(addr, []):
269                 hist.append((txid, 0, 0))
270
271         # uniqueness
272         hist = set(map(lambda x: (x[0], x[2]), hist))
273
274         # convert to dict
275         hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
276
277         # add something to distinguish between unused and empty addresses
278         if hist == [] and is_known:
279             hist = ['*']
280
281         with self.cache_lock:
282             self.history_cache[addr] = hist
283         return hist
284
285     def get_status(self, addr, cache_only=False):
286         tx_points = self.get_history(addr, cache_only)
287         if cache_only and tx_points == -1:
288             return -1
289
290         if not tx_points:
291             return None
292         if tx_points == ['*']:
293             return '*'
294         status = ''
295         for tx in tx_points:
296             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
297         return hashlib.sha256(status).digest().encode('hex')
298
299     def get_merkle(self, tx_hash, height):
300
301         block_hash = self.bitcoind('getblockhash', [height])
302         b = self.bitcoind('getblock', [block_hash])
303         tx_list = b.get('tx')
304         tx_pos = tx_list.index(tx_hash)
305
306         merkle = map(hash_decode, tx_list)
307         target_hash = hash_decode(tx_hash)
308         s = []
309         while len(merkle) != 1:
310             if len(merkle) % 2:
311                 merkle.append(merkle[-1])
312             n = []
313             while merkle:
314                 new_hash = Hash(merkle[0] + merkle[1])
315                 if merkle[0] == target_hash:
316                     s.append(hash_encode(merkle[1]))
317                     target_hash = new_hash
318                 elif merkle[1] == target_hash:
319                     s.append(hash_encode(merkle[0]))
320                     target_hash = new_hash
321                 n.append(new_hash)
322                 merkle = merkle[2:]
323             merkle = n
324
325         return {"block_height": height, "merkle": s, "pos": tx_pos}
326
327
328     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
329         # keep it sorted
330         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
331         assert len(s) == 80
332
333         serialized_hist = self.batch_list[addr]
334
335         l = len(serialized_hist)/80
336         for i in range(l-1, -1, -1):
337             item = serialized_hist[80*i:80*(i+1)]
338             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
339             if item_height <= tx_height:
340                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
341                 break
342         else:
343             serialized_hist = s + serialized_hist
344
345         self.batch_list[addr] = serialized_hist
346
347         # backlink
348         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
349         self.batch_txio[txo] = addr
350
351
352
353
354
355
356     def deserialize_block(self, block):
357         txlist = block.get('tx')
358         tx_hashes = []  # ordered txids
359         txdict = {}     # deserialized tx
360         is_coinbase = True
361         for raw_tx in txlist:
362             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
363             vds = deserialize.BCDataStream()
364             vds.write(raw_tx.decode('hex'))
365             try:
366                 tx = deserialize.parse_Transaction(vds, is_coinbase)
367             except:
368                 print_log("ERROR: cannot parse", tx_hash)
369                 continue
370             tx_hashes.append(tx_hash)
371             txdict[tx_hash] = tx
372             is_coinbase = False
373         return tx_hashes, txdict
374
375
376
377     def import_block(self, block, block_hash, block_height, sync, revert=False):
378
379         touched_addr = set([])
380
381         # deserialize transactions
382         tx_hashes, txdict = self.deserialize_block(block)
383
384         # undo info
385         if revert:
386             undo_info = self.storage.get_undo_info(block_height)
387             tx_hashes.reverse()
388         else:
389             undo_info = {}
390
391         for txid in tx_hashes:  # must be ordered
392             tx = txdict[txid]
393             if not revert:
394                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
395                 undo_info[txid] = undo
396             else:
397                 undo = undo_info.pop(txid)
398                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
399
400         if revert: 
401             assert undo_info == {}
402
403         # add undo info
404         if not revert:
405             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
406
407         # add the max
408         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
409
410         for addr in touched_addr:
411             self.invalidate_cache(addr)
412
413         self.storage.update_hashes()
414
415
416     def add_request(self, session, request):
417         # see if we can get if from cache. if not, add to queue
418         if self.process(session, request, cache_only=True) == -1:
419             self.queue.put((session, request))
420
421
422     def do_subscribe(self, method, params, session):
423         with self.watch_lock:
424             if method == 'blockchain.numblocks.subscribe':
425                 if session not in self.watch_blocks:
426                     self.watch_blocks.append(session)
427
428             elif method == 'blockchain.headers.subscribe':
429                 if session not in self.watch_headers:
430                     self.watch_headers.append(session)
431
432             elif method == 'blockchain.address.subscribe':
433                 address = params[0]
434                 l = self.watched_addresses.get(address)
435                 if l is None:
436                     self.watched_addresses[address] = [session]
437                 elif session not in l:
438                     l.append(session)
439
440
441     def do_unsubscribe(self, method, params, session):
442         with self.watch_lock:
443             if method == 'blockchain.numblocks.subscribe':
444                 if session in self.watch_blocks:
445                     self.watch_blocks.remove(session)
446             elif method == 'blockchain.headers.subscribe':
447                 if session in self.watch_headers:
448                     self.watch_headers.remove(session)
449             elif method == "blockchain.address.subscribe":
450                 addr = params[0]
451                 l = self.watched_addresses.get(addr)
452                 if not l:
453                     return
454                 if session in l:
455                     l.remove(session)
456                 if session in l:
457                     print "error rc!!"
458                     self.shared.stop()
459                 if l == []:
460                     self.watched_addresses.pop(addr)
461
462
463     def process(self, session, request, cache_only=False):
464         
465         message_id = request['id']
466         method = request['method']
467         params = request.get('params', [])
468         result = None
469         error = None
470
471         if method == 'blockchain.numblocks.subscribe':
472             result = self.storage.height
473
474         elif method == 'blockchain.headers.subscribe':
475             result = self.header
476
477         elif method == 'blockchain.address.subscribe':
478             try:
479                 address = params[0]
480                 result = self.get_status(address, cache_only)
481             except BaseException, e:
482                 error = str(e) + ': ' + address
483                 print_log("error:", error)
484
485         elif method == 'blockchain.address.get_history':
486             try:
487                 address = params[0]
488                 result = self.get_history(address, cache_only)
489             except BaseException, e:
490                 error = str(e) + ': ' + address
491                 print_log("error:", error)
492
493         elif method == 'blockchain.block.get_header':
494             if cache_only:
495                 result = -1
496             else:
497                 try:
498                     height = params[0]
499                     result = self.get_header(height)
500                 except BaseException, e:
501                     error = str(e) + ': %d' % height
502                     print_log("error:", error)
503
504         elif method == 'blockchain.block.get_chunk':
505             if cache_only:
506                 result = -1
507             else:
508                 try:
509                     index = params[0]
510                     result = self.get_chunk(index)
511                 except BaseException, e:
512                     error = str(e) + ': %d' % index
513                     print_log("error:", error)
514
515         elif method == 'blockchain.transaction.broadcast':
516             try:
517                 txo = self.bitcoind('sendrawtransaction', params)
518                 print_log("sent tx:", txo)
519                 result = txo
520             except BaseException, e:
521                 result = str(e)  # do not send an error
522                 print_log("error:", result, params)
523
524         elif method == 'blockchain.transaction.get_merkle':
525             if cache_only:
526                 result = -1
527             else:
528                 try:
529                     tx_hash = params[0]
530                     tx_height = params[1]
531                     result = self.get_merkle(tx_hash, tx_height)
532                 except BaseException, e:
533                     error = str(e) + ': ' + repr(params)
534                     print_log("get_merkle error:", error)
535
536         elif method == 'blockchain.transaction.get':
537             try:
538                 tx_hash = params[0]
539                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
540             except BaseException, e:
541                 error = str(e) + ': ' + repr(params)
542                 print_log("tx get error:", error)
543
544         else:
545             error = "unknown method:%s" % method
546
547         if cache_only and result == -1:
548             return -1
549
550         if error:
551             self.push_response(session, {'id': message_id, 'error': error})
552         elif result != '':
553             self.push_response(session, {'id': message_id, 'result': result})
554
555
556     def getfullblock(self, block_hash):
557         block = self.bitcoind('getblock', [block_hash])
558
559         rawtxreq = []
560         i = 0
561         for txid in block['tx']:
562             rawtxreq.append({
563                 "method": "getrawtransaction",
564                 "params": [txid],
565                 "id": i,
566             })
567             i += 1
568
569         postdata = dumps(rawtxreq)
570         try:
571             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
572         except:
573             traceback.print_exc(file=sys.stdout)
574             self.shared.stop()
575
576         r = loads(respdata)
577         rawtxdata = []
578         for ir in r:
579             if ir['error'] is not None:
580                 self.shared.stop()
581                 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
582                 raise BaseException(ir['error'])
583             rawtxdata.append(ir['result'])
584         block['tx'] = rawtxdata
585         return block
586
587     def catch_up(self, sync=True):
588
589         prh = None
590         while not self.shared.stopped():
591
592             self.mtime('')
593
594             # are we done yet?
595             info = self.bitcoind('getinfo')
596             self.bitcoind_height = info.get('blocks')
597             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
598             if self.storage.last_hash == bitcoind_block_hash:
599                 self.up_to_date = True
600                 break
601
602             # not done..
603             self.up_to_date = False
604             next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
605             next_block = self.getfullblock(next_block_hash)
606             self.mtime('daemon')
607
608             # fixme: this is unsafe, if we revert when the undo info is not yet written
609             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
610
611             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
612
613                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
614                 self.storage.height = self.storage.height + 1
615                 self.write_header(self.block2header(next_block), sync)
616                 self.storage.last_hash = next_block_hash
617                 self.mtime('import')
618             
619                 if self.storage.height % 1000 == 0 and not sync:
620                     t_daemon = self.mtimes.get('daemon')
621                     t_import = self.mtimes.get('import')
622                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
623                     self.mtimes['daemon'] = 0
624                     self.mtimes['import'] = 0
625
626                 if prh:
627                     assert prh == self.storage.get_root_hash().encode('hex')
628                     prh = None
629
630             else:
631                 prh = self.storage.get_root_hash().encode('hex')
632
633                 # revert current block
634                 block = self.getfullblock(self.storage.last_hash)
635                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
636                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
637                 self.pop_header()
638                 self.flush_headers()
639
640                 self.storage.height -= 1
641
642                 # read previous header from disk
643                 self.header = self.read_header(self.storage.height)
644                 self.storage.last_hash = self.hash_header(self.header)
645
646
647         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
648
649         if self.shared.stopped(): 
650             print_log( "closing database" )
651             self.storage.close()
652
653
654     def memorypool_update(self):
655         mempool_hashes = set(self.bitcoind('getrawmempool'))
656         touched_addresses = set([])
657
658         for tx_hash in mempool_hashes:
659             if tx_hash in self.mempool_hashes:
660                 continue
661
662             tx = self.get_mempool_transaction(tx_hash)
663             if not tx:
664                 continue
665
666             mpa = self.mempool_addresses.get(tx_hash, [])
667             for x in tx.get('inputs'):
668                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
669                 addr = x.get('address')
670                 if addr and addr not in mpa:
671                     mpa.append(addr)
672                     touched_addresses.add(addr)
673
674             for x in tx.get('outputs'):
675                 addr = x.get('address')
676                 if addr and addr not in mpa:
677                     mpa.append(addr)
678                     touched_addresses.add(addr)
679
680             self.mempool_addresses[tx_hash] = mpa
681             self.mempool_hashes.add(tx_hash)
682
683         # remove older entries from mempool_hashes
684         self.mempool_hashes = mempool_hashes
685
686         # remove deprecated entries from mempool_addresses
687         for tx_hash, addresses in self.mempool_addresses.items():
688             if tx_hash not in self.mempool_hashes:
689                 self.mempool_addresses.pop(tx_hash)
690                 for addr in addresses:
691                     touched_addresses.add(addr)
692
693         # rebuild mempool histories
694         new_mempool_hist = {}
695         for tx_hash, addresses in self.mempool_addresses.items():
696             for addr in addresses:
697                 h = new_mempool_hist.get(addr, [])
698                 if tx_hash not in h:
699                     h.append(tx_hash)
700                 new_mempool_hist[addr] = h
701
702         with self.mempool_lock:
703             self.mempool_hist = new_mempool_hist
704
705         # invalidate cache for touched addresses
706         for addr in touched_addresses:
707             self.invalidate_cache(addr)
708
709
710     def invalidate_cache(self, address):
711         with self.cache_lock:
712             if address in self.history_cache:
713                 print_log("cache: invalidating", address)
714                 self.history_cache.pop(address)
715
716         with self.watch_lock:
717             sessions = self.watched_addresses.get(address)
718
719         if sessions:
720             # TODO: update cache here. if new value equals cached value, do not send notification
721             self.address_queue.put((address,sessions))
722
723     def main_iteration(self):
724         if self.shared.stopped():
725             print_log("blockchain processor terminating")
726             self.storage.close()
727             return
728
729         with self.dblock:
730             t1 = time.time()
731             self.catch_up()
732             t2 = time.time()
733
734         self.memorypool_update()
735
736         if self.sent_height != self.storage.height:
737             self.sent_height = self.storage.height
738             for session in self.watch_blocks:
739                 self.push_response(session, {
740                         'id': None,
741                         'method': 'blockchain.numblocks.subscribe',
742                         'params': [self.storage.height],
743                         })
744
745         if self.sent_header != self.header:
746             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
747             self.sent_header = self.header
748             for session in self.watch_headers:
749                 self.push_response(session, {
750                         'id': None,
751                         'method': 'blockchain.headers.subscribe',
752                         'params': [self.header],
753                         })
754
755         while True:
756             try:
757                 addr, sessions = self.address_queue.get(False)
758             except:
759                 break
760
761             status = self.get_status(addr)
762             for session in sessions:
763                 self.push_response(session, {
764                         'id': None,
765                         'method': 'blockchain.address.subscribe',
766                         'params': [addr, status],
767                         })
768
769         if not self.shared.stopped():
770             threading.Timer(10, self.main_iteration).start()
771         else:
772             print_log("blockchain processor terminating")