28e87c8289e56533190a9e14784812652b500b47
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_addresses = {}
42         self.mempool_hist = {}
43         self.mempool_hashes = set([])
44         self.mempool_lock = threading.Lock()
45
46         self.address_queue = Queue()
47
48         try:
49             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
50         except:
51             self.test_reorgs = False
52         self.storage = Storage(config, shared, self.test_reorgs)
53
54         self.dblock = threading.Lock()
55
56         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
57             config.get('bitcoind', 'user'),
58             config.get('bitcoind', 'password'),
59             config.get('bitcoind', 'host'),
60             config.get('bitcoind', 'port'))
61
62         while True:
63             try:
64                 self.bitcoind('getinfo')
65                 break
66             except:
67                 print_log('cannot contact bitcoind...')
68                 time.sleep(5)
69                 continue
70
71         self.sent_height = 0
72         self.sent_header = None
73
74         # catch_up headers
75         self.init_headers(self.storage.height)
76
77         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
78         while not shared.stopped() and not self.up_to_date:
79             try:
80                 time.sleep(1)
81             except:
82                 print "keyboard interrupt: stopping threads"
83                 shared.stop()
84                 sys.exit(0)
85
86         print_log("Blockchain is up to date.")
87         self.memorypool_update()
88         print_log("Memory pool initialized.")
89
90         threading.Timer(10, self.main_iteration).start()
91
92
93
94     def mtime(self, name):
95         now = time.time()
96         if name != '':
97             delta = now - self.now
98             t = self.mtimes.get(name, 0)
99             self.mtimes[name] = t + delta
100         self.now = now
101
102     def print_mtime(self):
103         s = ''
104         for k, v in self.mtimes.items():
105             s += k+':'+"%.2f"%v+' '
106         print_log(s)
107
108
109     def bitcoind(self, method, params=[]):
110         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
111         try:
112             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
113         except:
114             traceback.print_exc(file=sys.stdout)
115             self.shared.stop()
116
117         r = loads(respdata)
118         if r['error'] is not None:
119             raise BaseException(r['error'])
120         return r.get('result')
121
122
123     def block2header(self, b):
124         return {
125             "block_height": b.get('height'),
126             "version": b.get('version'),
127             "prev_block_hash": b.get('previousblockhash'),
128             "merkle_root": b.get('merkleroot'),
129             "timestamp": b.get('time'),
130             "bits": int(b.get('bits'), 16),
131             "nonce": b.get('nonce'),
132         }
133
134     def get_header(self, height):
135         block_hash = self.bitcoind('getblockhash', [height])
136         b = self.bitcoind('getblock', [block_hash])
137         return self.block2header(b)
138
139     def init_headers(self, db_height):
140         self.chunk_cache = {}
141         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
142
143         if os.path.exists(self.headers_filename):
144             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
145             if height > 0:
146                 prev_hash = self.hash_header(self.read_header(height))
147             else:
148                 prev_hash = None
149         else:
150             open(self.headers_filename, 'wb').close()
151             prev_hash = None
152             height = -1
153
154         if height < db_height:
155             print_log("catching up missing headers:", height, db_height)
156
157         try:
158             while height < db_height:
159                 height = height + 1
160                 header = self.get_header(height)
161                 if height > 1:
162                     assert prev_hash == header.get('prev_block_hash')
163                 self.write_header(header, sync=False)
164                 prev_hash = self.hash_header(header)
165                 if (height % 1000) == 0:
166                     print_log("headers file:", height)
167         except KeyboardInterrupt:
168             self.flush_headers()
169             sys.exit()
170
171         self.flush_headers()
172
173     def hash_header(self, header):
174         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
175
176     def read_header(self, block_height):
177         if os.path.exists(self.headers_filename):
178             with open(self.headers_filename, 'rb') as f:
179                 f.seek(block_height * 80)
180                 h = f.read(80)
181             if len(h) == 80:
182                 h = header_from_string(h)
183                 return h
184
185     def read_chunk(self, index):
186         with open(self.headers_filename, 'rb') as f:
187             f.seek(index*2016*80)
188             chunk = f.read(2016*80)
189         return chunk.encode('hex')
190
191     def write_header(self, header, sync=True):
192         if not self.headers_data:
193             self.headers_offset = header.get('block_height')
194
195         self.headers_data += header_to_string(header).decode('hex')
196         if sync or len(self.headers_data) > 40*100:
197             self.flush_headers()
198
199         with self.cache_lock:
200             chunk_index = header.get('block_height')/2016
201             if self.chunk_cache.get(chunk_index):
202                 self.chunk_cache.pop(chunk_index)
203
204     def pop_header(self):
205         # we need to do this only if we have not flushed
206         if self.headers_data:
207             self.headers_data = self.headers_data[:-40]
208
209     def flush_headers(self):
210         if not self.headers_data:
211             return
212         with open(self.headers_filename, 'rb+') as f:
213             f.seek(self.headers_offset*80)
214             f.write(self.headers_data)
215         self.headers_data = ''
216
217     def get_chunk(self, i):
218         # store them on disk; store the current chunk in memory
219         with self.cache_lock:
220             chunk = self.chunk_cache.get(i)
221             if not chunk:
222                 chunk = self.read_chunk(i)
223                 self.chunk_cache[i] = chunk
224
225         return chunk
226
227     def get_mempool_transaction(self, txid):
228         try:
229             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
230         except:
231             return None
232
233         vds = deserialize.BCDataStream()
234         vds.write(raw_tx.decode('hex'))
235         try:
236             return deserialize.parse_Transaction(vds, is_coinbase=False)
237         except:
238             print_log("ERROR: cannot parse", txid)
239             return None
240
241
242     def get_history(self, addr, cache_only=False):
243         with self.cache_lock:
244             hist = self.history_cache.get(addr)
245         if hist is not None:
246             return hist
247         if cache_only:
248             return -1
249
250         with self.dblock:
251             try:
252                 hist = self.storage.get_history(addr)
253                 is_known = True
254             except:
255                 self.shared.stop()
256                 raise
257             if hist:
258                 is_known = True
259             else:
260                 hist = []
261                 is_known = False
262
263         # add memory pool
264         with self.mempool_lock:
265             for txid in self.mempool_hist.get(addr, []):
266                 hist.append({'tx_hash':txid, 'height':0})
267
268         # add something to distinguish between unused and empty addresses
269         if hist == [] and is_known:
270             hist = ['*']
271
272         with self.cache_lock:
273             self.history_cache[addr] = hist
274         return hist
275
276
277     def get_status(self, addr, cache_only=False):
278         tx_points = self.get_history(addr, cache_only)
279         if cache_only and tx_points == -1:
280             return -1
281
282         if not tx_points:
283             return None
284         if tx_points == ['*']:
285             return '*'
286         status = ''
287         for tx in tx_points:
288             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
289         return hashlib.sha256(status).digest().encode('hex')
290
291     def get_merkle(self, tx_hash, height):
292
293         block_hash = self.bitcoind('getblockhash', [height])
294         b = self.bitcoind('getblock', [block_hash])
295         tx_list = b.get('tx')
296         tx_pos = tx_list.index(tx_hash)
297
298         merkle = map(hash_decode, tx_list)
299         target_hash = hash_decode(tx_hash)
300         s = []
301         while len(merkle) != 1:
302             if len(merkle) % 2:
303                 merkle.append(merkle[-1])
304             n = []
305             while merkle:
306                 new_hash = Hash(merkle[0] + merkle[1])
307                 if merkle[0] == target_hash:
308                     s.append(hash_encode(merkle[1]))
309                     target_hash = new_hash
310                 elif merkle[1] == target_hash:
311                     s.append(hash_encode(merkle[0]))
312                     target_hash = new_hash
313                 n.append(new_hash)
314                 merkle = merkle[2:]
315             merkle = n
316
317         return {"block_height": height, "merkle": s, "pos": tx_pos}
318
319
320     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
321         # keep it sorted
322         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
323         assert len(s) == 80
324
325         serialized_hist = self.batch_list[addr]
326
327         l = len(serialized_hist)/80
328         for i in range(l-1, -1, -1):
329             item = serialized_hist[80*i:80*(i+1)]
330             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
331             if item_height <= tx_height:
332                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
333                 break
334         else:
335             serialized_hist = s + serialized_hist
336
337         self.batch_list[addr] = serialized_hist
338
339         # backlink
340         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
341         self.batch_txio[txo] = addr
342
343
344
345
346
347
348     def deserialize_block(self, block):
349         txlist = block.get('tx')
350         tx_hashes = []  # ordered txids
351         txdict = {}     # deserialized tx
352         is_coinbase = True
353         for raw_tx in txlist:
354             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
355             vds = deserialize.BCDataStream()
356             vds.write(raw_tx.decode('hex'))
357             try:
358                 tx = deserialize.parse_Transaction(vds, is_coinbase)
359             except:
360                 print_log("ERROR: cannot parse", tx_hash)
361                 continue
362             tx_hashes.append(tx_hash)
363             txdict[tx_hash] = tx
364             is_coinbase = False
365         return tx_hashes, txdict
366
367
368
369     def import_block(self, block, block_hash, block_height, sync, revert=False):
370
371         touched_addr = set([])
372
373         # deserialize transactions
374         tx_hashes, txdict = self.deserialize_block(block)
375
376         # undo info
377         if revert:
378             undo_info = self.storage.get_undo_info(block_height)
379             tx_hashes.reverse()
380         else:
381             undo_info = {}
382
383         for txid in tx_hashes:  # must be ordered
384             tx = txdict[txid]
385             if not revert:
386                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
387                 undo_info[txid] = undo
388             else:
389                 undo = undo_info.pop(txid)
390                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
391
392         if revert: 
393             assert undo_info == {}
394
395         # add undo info
396         if not revert:
397             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
398
399         # add the max
400         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
401
402         for addr in touched_addr:
403             self.invalidate_cache(addr)
404
405         self.storage.update_hashes()
406
407
408     def add_request(self, session, request):
409         # see if we can get if from cache. if not, add to queue
410         if self.process(session, request, cache_only=True) == -1:
411             self.queue.put((session, request))
412
413
414     def do_subscribe(self, method, params, session):
415         with self.watch_lock:
416             if method == 'blockchain.numblocks.subscribe':
417                 if session not in self.watch_blocks:
418                     self.watch_blocks.append(session)
419
420             elif method == 'blockchain.headers.subscribe':
421                 if session not in self.watch_headers:
422                     self.watch_headers.append(session)
423
424             elif method == 'blockchain.address.subscribe':
425                 address = params[0]
426                 l = self.watched_addresses.get(address)
427                 if l is None:
428                     self.watched_addresses[address] = [session]
429                 elif session not in l:
430                     l.append(session)
431
432
433     def do_unsubscribe(self, method, params, session):
434         with self.watch_lock:
435             if method == 'blockchain.numblocks.subscribe':
436                 if session in self.watch_blocks:
437                     self.watch_blocks.remove(session)
438             elif method == 'blockchain.headers.subscribe':
439                 if session in self.watch_headers:
440                     self.watch_headers.remove(session)
441             elif method == "blockchain.address.subscribe":
442                 addr = params[0]
443                 l = self.watched_addresses.get(addr)
444                 if not l:
445                     return
446                 if session in l:
447                     l.remove(session)
448                 if session in l:
449                     print "error rc!!"
450                     self.shared.stop()
451                 if l == []:
452                     self.watched_addresses.pop(addr)
453
454
455     def process(self, session, request, cache_only=False):
456         
457         message_id = request['id']
458         method = request['method']
459         params = request.get('params', [])
460         result = None
461         error = None
462
463         if method == 'blockchain.numblocks.subscribe':
464             result = self.storage.height
465
466         elif method == 'blockchain.headers.subscribe':
467             result = self.header
468
469         elif method == 'blockchain.address.subscribe':
470             try:
471                 address = str(params[0])
472                 result = self.get_status(address, cache_only)
473             except BaseException, e:
474                 error = str(e) + ': ' + address
475                 print_log("error:", error)
476
477         elif method == 'blockchain.address.get_history':
478             try:
479                 address = str(params[0])
480                 result = self.get_history(address, cache_only)
481             except BaseException, e:
482                 error = str(e) + ': ' + address
483                 print_log("error:", error)
484
485         elif method == 'blockchain.address.get_balance':
486             try:
487                 address = str(params[0])
488                 result = self.storage.get_balance(address)
489             except BaseException, e:
490                 error = str(e) + ': ' + address
491                 print_log("error:", error)
492
493         elif method == 'blockchain.address.get_proof':
494             try:
495                 address = str(params[0])
496                 result = self.storage.get_proof(address)
497             except BaseException, e:
498                 error = str(e) + ': ' + address
499                 print_log("error:", error)
500
501         elif method == 'blockchain.address.listunspent':
502             try:
503                 address = str(params[0])
504                 result = self.storage.listunspent(address)
505             except BaseException, e:
506                 error = str(e) + ': ' + address
507                 print_log("error:", error)
508
509         elif method == 'blockchain.utxo.get_address':
510             try:
511                 txid = str(params[0])
512                 pos = int(params[1])
513                 txi = (txid + int_to_hex(pos, 4)).decode('hex')
514                 result = self.storage.get_address(txi)
515             except BaseException, e:
516                 error = str(e)
517                 print_log("error:", error, txid, pos)
518
519         elif method == 'blockchain.block.get_header':
520             if cache_only:
521                 result = -1
522             else:
523                 try:
524                     height = int(params[0])
525                     result = self.get_header(height)
526                 except BaseException, e:
527                     error = str(e) + ': %d' % height
528                     print_log("error:", error)
529
530         elif method == 'blockchain.block.get_chunk':
531             if cache_only:
532                 result = -1
533             else:
534                 try:
535                     index = int(params[0])
536                     result = self.get_chunk(index)
537                 except BaseException, e:
538                     error = str(e) + ': %d' % index
539                     print_log("error:", error)
540
541         elif method == 'blockchain.transaction.broadcast':
542             try:
543                 txo = self.bitcoind('sendrawtransaction', params)
544                 print_log("sent tx:", txo)
545                 result = txo
546             except BaseException, e:
547                 result = str(e)  # do not send an error
548                 print_log("error:", result, params)
549
550         elif method == 'blockchain.transaction.get_merkle':
551             if cache_only:
552                 result = -1
553             else:
554                 try:
555                     tx_hash = params[0]
556                     tx_height = params[1]
557                     result = self.get_merkle(tx_hash, tx_height)
558                 except BaseException, e:
559                     error = str(e) + ': ' + repr(params)
560                     print_log("get_merkle error:", error)
561
562         elif method == 'blockchain.transaction.get':
563             try:
564                 tx_hash = params[0]
565                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
566             except BaseException, e:
567                 error = str(e) + ': ' + repr(params)
568                 print_log("tx get error:", error)
569
570         else:
571             error = "unknown method:%s" % method
572
573         if cache_only and result == -1:
574             return -1
575
576         if error:
577             self.push_response(session, {'id': message_id, 'error': error})
578         elif result != '':
579             self.push_response(session, {'id': message_id, 'result': result})
580
581
582     def getfullblock(self, block_hash):
583         block = self.bitcoind('getblock', [block_hash])
584
585         rawtxreq = []
586         i = 0
587         for txid in block['tx']:
588             rawtxreq.append({
589                 "method": "getrawtransaction",
590                 "params": [txid],
591                 "id": i,
592             })
593             i += 1
594
595         postdata = dumps(rawtxreq)
596         try:
597             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
598         except:
599             traceback.print_exc(file=sys.stdout)
600             self.shared.stop()
601
602         r = loads(respdata)
603         rawtxdata = []
604         for ir in r:
605             if ir['error'] is not None:
606                 self.shared.stop()
607                 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
608                 raise BaseException(ir['error'])
609             rawtxdata.append(ir['result'])
610         block['tx'] = rawtxdata
611         return block
612
613     def catch_up(self, sync=True):
614
615         prh = None
616         while not self.shared.stopped():
617
618             self.mtime('')
619
620             # are we done yet?
621             info = self.bitcoind('getinfo')
622             self.bitcoind_height = info.get('blocks')
623             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
624             if self.storage.last_hash == bitcoind_block_hash:
625                 self.up_to_date = True
626                 break
627
628             # not done..
629             self.up_to_date = False
630             next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
631             next_block = self.getfullblock(next_block_hash)
632             self.mtime('daemon')
633
634             # fixme: this is unsafe, if we revert when the undo info is not yet written
635             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
636
637             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
638
639                 prev_root_hash = self.storage.get_root_hash()
640
641                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
642                 self.storage.height = self.storage.height + 1
643                 self.write_header(self.block2header(next_block), sync)
644                 self.storage.last_hash = next_block_hash
645                 self.mtime('import')
646             
647                 if self.storage.height % 1000 == 0 and not sync:
648                     t_daemon = self.mtimes.get('daemon')
649                     t_import = self.mtimes.get('import')
650                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
651                     self.mtimes['daemon'] = 0
652                     self.mtimes['import'] = 0
653
654             else:
655
656                 # revert current block
657                 block = self.getfullblock(self.storage.last_hash)
658                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
659                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
660                 self.pop_header()
661                 self.flush_headers()
662
663                 self.storage.height -= 1
664
665                 # read previous header from disk
666                 self.header = self.read_header(self.storage.height)
667                 self.storage.last_hash = self.hash_header(self.header)
668
669                 if prev_root_hash:
670                     assert prev_root_hash == self.storage.get_root_hash()
671                     prev_root_hash = None
672
673
674         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
675         self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
676
677         if self.shared.stopped(): 
678             print_log( "closing database" )
679             self.storage.close()
680
681
682     def memorypool_update(self):
683         mempool_hashes = set(self.bitcoind('getrawmempool'))
684         touched_addresses = set([])
685
686         for tx_hash in mempool_hashes:
687             if tx_hash in self.mempool_hashes:
688                 continue
689
690             tx = self.get_mempool_transaction(tx_hash)
691             if not tx:
692                 continue
693
694             mpa = self.mempool_addresses.get(tx_hash, [])
695             for x in tx.get('inputs'):
696                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
697                 addr = x.get('address')
698                 if addr and addr not in mpa:
699                     mpa.append(addr)
700                     touched_addresses.add(addr)
701
702             for x in tx.get('outputs'):
703                 addr = x.get('address')
704                 if addr and addr not in mpa:
705                     mpa.append(addr)
706                     touched_addresses.add(addr)
707
708             self.mempool_addresses[tx_hash] = mpa
709             self.mempool_hashes.add(tx_hash)
710
711         # remove older entries from mempool_hashes
712         self.mempool_hashes = mempool_hashes
713
714         # remove deprecated entries from mempool_addresses
715         for tx_hash, addresses in self.mempool_addresses.items():
716             if tx_hash not in self.mempool_hashes:
717                 self.mempool_addresses.pop(tx_hash)
718                 for addr in addresses:
719                     touched_addresses.add(addr)
720
721         # rebuild mempool histories
722         new_mempool_hist = {}
723         for tx_hash, addresses in self.mempool_addresses.items():
724             for addr in addresses:
725                 h = new_mempool_hist.get(addr, [])
726                 if tx_hash not in h:
727                     h.append(tx_hash)
728                 new_mempool_hist[addr] = h
729
730         with self.mempool_lock:
731             self.mempool_hist = new_mempool_hist
732
733         # invalidate cache for touched addresses
734         for addr in touched_addresses:
735             self.invalidate_cache(addr)
736
737
738     def invalidate_cache(self, address):
739         with self.cache_lock:
740             if address in self.history_cache:
741                 print_log("cache: invalidating", address)
742                 self.history_cache.pop(address)
743
744         with self.watch_lock:
745             sessions = self.watched_addresses.get(address)
746
747         if sessions:
748             # TODO: update cache here. if new value equals cached value, do not send notification
749             self.address_queue.put((address,sessions))
750
751     def main_iteration(self):
752         if self.shared.stopped():
753             print_log("blockchain processor terminating")
754             self.storage.close()
755             return
756
757         with self.dblock:
758             t1 = time.time()
759             self.catch_up()
760             t2 = time.time()
761
762         self.memorypool_update()
763
764         if self.sent_height != self.storage.height:
765             self.sent_height = self.storage.height
766             for session in self.watch_blocks:
767                 self.push_response(session, {
768                         'id': None,
769                         'method': 'blockchain.numblocks.subscribe',
770                         'params': [self.storage.height],
771                         })
772
773         if self.sent_header != self.header:
774             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
775             self.sent_header = self.header
776             for session in self.watch_headers:
777                 self.push_response(session, {
778                         'id': None,
779                         'method': 'blockchain.headers.subscribe',
780                         'params': [self.header],
781                         })
782
783         while True:
784             try:
785                 addr, sessions = self.address_queue.get(False)
786             except:
787                 break
788
789             status = self.get_status(addr)
790             for session in sessions:
791                 self.push_response(session, {
792                         'id': None,
793                         'method': 'blockchain.address.subscribe',
794                         'params': [addr, status],
795                         })
796
797         if not self.shared.stopped():
798             threading.Timer(10, self.main_iteration).start()
799         else:
800             print_log("blockchain processor terminating")