fix
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_addresses = {}
42         self.mempool_hist = {}
43         self.mempool_hashes = set([])
44         self.mempool_lock = threading.Lock()
45
46         self.address_queue = Queue()
47
48         try:
49             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
50         except:
51             self.test_reorgs = False
52         self.storage = Storage(config, shared, self.test_reorgs)
53
54         self.dblock = threading.Lock()
55
56         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
57             config.get('bitcoind', 'user'),
58             config.get('bitcoind', 'password'),
59             config.get('bitcoind', 'host'),
60             config.get('bitcoind', 'port'))
61
62         while True:
63             try:
64                 self.bitcoind('getinfo')
65                 break
66             except:
67                 print_log('cannot contact bitcoind...')
68                 time.sleep(5)
69                 continue
70
71         self.sent_height = 0
72         self.sent_header = None
73
74         # catch_up headers
75         self.init_headers(self.storage.height)
76
77         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
78         while not shared.stopped() and not self.up_to_date:
79             try:
80                 time.sleep(1)
81             except:
82                 print "keyboard interrupt: stopping threads"
83                 shared.stop()
84                 sys.exit(0)
85
86         print_log("Blockchain is up to date.")
87         self.memorypool_update()
88         print_log("Memory pool initialized.")
89
90         threading.Timer(10, self.main_iteration).start()
91
92
93
94     def mtime(self, name):
95         now = time.time()
96         if name != '':
97             delta = now - self.now
98             t = self.mtimes.get(name, 0)
99             self.mtimes[name] = t + delta
100         self.now = now
101
102     def print_mtime(self):
103         s = ''
104         for k, v in self.mtimes.items():
105             s += k+':'+"%.2f"%v+' '
106         print_log(s)
107
108
109     def bitcoind(self, method, params=[]):
110         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
111         try:
112             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
113         except:
114             traceback.print_exc(file=sys.stdout)
115             self.shared.stop()
116
117         r = loads(respdata)
118         if r['error'] is not None:
119             raise BaseException(r['error'])
120         return r.get('result')
121
122
123     def block2header(self, b):
124         return {
125             "block_height": b.get('height'),
126             "version": b.get('version'),
127             "prev_block_hash": b.get('previousblockhash'),
128             "merkle_root": b.get('merkleroot'),
129             "timestamp": b.get('time'),
130             "bits": int(b.get('bits'), 16),
131             "nonce": b.get('nonce'),
132         }
133
134     def get_header(self, height):
135         block_hash = self.bitcoind('getblockhash', [height])
136         b = self.bitcoind('getblock', [block_hash])
137         return self.block2header(b)
138
139     def init_headers(self, db_height):
140         self.chunk_cache = {}
141         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
142
143         if os.path.exists(self.headers_filename):
144             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
145             if height > 0:
146                 prev_hash = self.hash_header(self.read_header(height))
147             else:
148                 prev_hash = None
149         else:
150             open(self.headers_filename, 'wb').close()
151             prev_hash = None
152             height = -1
153
154         if height < db_height:
155             print_log("catching up missing headers:", height, db_height)
156
157         try:
158             while height < db_height:
159                 height = height + 1
160                 header = self.get_header(height)
161                 if height > 1:
162                     assert prev_hash == header.get('prev_block_hash')
163                 self.write_header(header, sync=False)
164                 prev_hash = self.hash_header(header)
165                 if (height % 1000) == 0:
166                     print_log("headers file:", height)
167         except KeyboardInterrupt:
168             self.flush_headers()
169             sys.exit()
170
171         self.flush_headers()
172
173     def hash_header(self, header):
174         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
175
176     def read_header(self, block_height):
177         if os.path.exists(self.headers_filename):
178             with open(self.headers_filename, 'rb') as f:
179                 f.seek(block_height * 80)
180                 h = f.read(80)
181             if len(h) == 80:
182                 h = header_from_string(h)
183                 return h
184
185     def read_chunk(self, index):
186         with open(self.headers_filename, 'rb') as f:
187             f.seek(index*2016*80)
188             chunk = f.read(2016*80)
189         return chunk.encode('hex')
190
191     def write_header(self, header, sync=True):
192         if not self.headers_data:
193             self.headers_offset = header.get('block_height')
194
195         self.headers_data += header_to_string(header).decode('hex')
196         if sync or len(self.headers_data) > 40*100:
197             self.flush_headers()
198
199         with self.cache_lock:
200             chunk_index = header.get('block_height')/2016
201             if self.chunk_cache.get(chunk_index):
202                 self.chunk_cache.pop(chunk_index)
203
204     def pop_header(self):
205         # we need to do this only if we have not flushed
206         if self.headers_data:
207             self.headers_data = self.headers_data[:-40]
208
209     def flush_headers(self):
210         if not self.headers_data:
211             return
212         with open(self.headers_filename, 'rb+') as f:
213             f.seek(self.headers_offset*80)
214             f.write(self.headers_data)
215         self.headers_data = ''
216
217     def get_chunk(self, i):
218         # store them on disk; store the current chunk in memory
219         with self.cache_lock:
220             chunk = self.chunk_cache.get(i)
221             if not chunk:
222                 chunk = self.read_chunk(i)
223                 self.chunk_cache[i] = chunk
224
225         return chunk
226
227     def get_mempool_transaction(self, txid):
228         try:
229             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
230         except:
231             return None
232
233         vds = deserialize.BCDataStream()
234         vds.write(raw_tx.decode('hex'))
235         try:
236             return deserialize.parse_Transaction(vds, is_coinbase=False)
237         except:
238             print_log("ERROR: cannot parse", txid)
239             return None
240
241
242     def get_history(self, addr, cache_only=False):
243         with self.cache_lock:
244             hist = self.history_cache.get(addr)
245         if hist is not None:
246             return hist
247         if cache_only:
248             return -1
249
250         with self.dblock:
251             try:
252                 hist = self.storage.get_history(addr)
253                 is_known = True
254             except:
255                 self.shared.stop()
256                 raise
257             if hist:
258                 is_known = True
259             else:
260                 hist = []
261                 is_known = False
262
263         # add memory pool
264         with self.mempool_lock:
265             for txid in self.mempool_hist.get(addr, []):
266                 hist.append({'tx_hash':txid, 'height':0})
267
268         # add something to distinguish between unused and empty addresses
269         if hist == [] and is_known:
270             hist = ['*']
271
272         with self.cache_lock:
273             self.history_cache[addr] = hist
274         return hist
275
276
277     def get_status(self, addr, cache_only=False):
278         tx_points = self.get_history(addr, cache_only)
279         if cache_only and tx_points == -1:
280             return -1
281
282         if not tx_points:
283             return None
284         if tx_points == ['*']:
285             return '*'
286         status = ''
287         for tx in tx_points:
288             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
289         return hashlib.sha256(status).digest().encode('hex')
290
291     def get_merkle(self, tx_hash, height):
292
293         block_hash = self.bitcoind('getblockhash', [height])
294         b = self.bitcoind('getblock', [block_hash])
295         tx_list = b.get('tx')
296         tx_pos = tx_list.index(tx_hash)
297
298         merkle = map(hash_decode, tx_list)
299         target_hash = hash_decode(tx_hash)
300         s = []
301         while len(merkle) != 1:
302             if len(merkle) % 2:
303                 merkle.append(merkle[-1])
304             n = []
305             while merkle:
306                 new_hash = Hash(merkle[0] + merkle[1])
307                 if merkle[0] == target_hash:
308                     s.append(hash_encode(merkle[1]))
309                     target_hash = new_hash
310                 elif merkle[1] == target_hash:
311                     s.append(hash_encode(merkle[0]))
312                     target_hash = new_hash
313                 n.append(new_hash)
314                 merkle = merkle[2:]
315             merkle = n
316
317         return {"block_height": height, "merkle": s, "pos": tx_pos}
318
319
320     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
321         # keep it sorted
322         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
323         assert len(s) == 80
324
325         serialized_hist = self.batch_list[addr]
326
327         l = len(serialized_hist)/80
328         for i in range(l-1, -1, -1):
329             item = serialized_hist[80*i:80*(i+1)]
330             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
331             if item_height <= tx_height:
332                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
333                 break
334         else:
335             serialized_hist = s + serialized_hist
336
337         self.batch_list[addr] = serialized_hist
338
339         # backlink
340         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
341         self.batch_txio[txo] = addr
342
343
344
345
346
347
348     def deserialize_block(self, block):
349         txlist = block.get('tx')
350         tx_hashes = []  # ordered txids
351         txdict = {}     # deserialized tx
352         is_coinbase = True
353         for raw_tx in txlist:
354             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
355             vds = deserialize.BCDataStream()
356             vds.write(raw_tx.decode('hex'))
357             try:
358                 tx = deserialize.parse_Transaction(vds, is_coinbase)
359             except:
360                 print_log("ERROR: cannot parse", tx_hash)
361                 continue
362             tx_hashes.append(tx_hash)
363             txdict[tx_hash] = tx
364             is_coinbase = False
365         return tx_hashes, txdict
366
367
368
369     def import_block(self, block, block_hash, block_height, sync, revert=False):
370
371         touched_addr = set([])
372
373         # deserialize transactions
374         tx_hashes, txdict = self.deserialize_block(block)
375
376         # undo info
377         if revert:
378             undo_info = self.storage.get_undo_info(block_height)
379             tx_hashes.reverse()
380         else:
381             undo_info = {}
382
383         for txid in tx_hashes:  # must be ordered
384             tx = txdict[txid]
385             if not revert:
386                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
387                 undo_info[txid] = undo
388             else:
389                 undo = undo_info.pop(txid)
390                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
391
392         if revert: 
393             assert undo_info == {}
394
395         # add undo info
396         if not revert:
397             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
398
399         # add the max
400         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
401
402         for addr in touched_addr:
403             self.invalidate_cache(addr)
404
405         self.storage.update_hashes()
406
407
408     def add_request(self, session, request):
409         # see if we can get if from cache. if not, add to queue
410         if self.process(session, request, cache_only=True) == -1:
411             self.queue.put((session, request))
412
413
414     def do_subscribe(self, method, params, session):
415         with self.watch_lock:
416             if method == 'blockchain.numblocks.subscribe':
417                 if session not in self.watch_blocks:
418                     self.watch_blocks.append(session)
419
420             elif method == 'blockchain.headers.subscribe':
421                 if session not in self.watch_headers:
422                     self.watch_headers.append(session)
423
424             elif method == 'blockchain.address.subscribe':
425                 address = params[0]
426                 l = self.watched_addresses.get(address)
427                 if l is None:
428                     self.watched_addresses[address] = [session]
429                 elif session not in l:
430                     l.append(session)
431
432
433     def do_unsubscribe(self, method, params, session):
434         with self.watch_lock:
435             if method == 'blockchain.numblocks.subscribe':
436                 if session in self.watch_blocks:
437                     self.watch_blocks.remove(session)
438             elif method == 'blockchain.headers.subscribe':
439                 if session in self.watch_headers:
440                     self.watch_headers.remove(session)
441             elif method == "blockchain.address.subscribe":
442                 addr = params[0]
443                 l = self.watched_addresses.get(addr)
444                 if not l:
445                     return
446                 if session in l:
447                     l.remove(session)
448                 if session in l:
449                     print "error rc!!"
450                     self.shared.stop()
451                 if l == []:
452                     self.watched_addresses.pop(addr)
453
454
455     def process(self, session, request, cache_only=False):
456         
457         message_id = request['id']
458         method = request['method']
459         params = request.get('params', [])
460         result = None
461         error = None
462
463         if method == 'blockchain.numblocks.subscribe':
464             result = self.storage.height
465
466         elif method == 'blockchain.headers.subscribe':
467             result = self.header
468
469         elif method == 'blockchain.address.subscribe':
470             try:
471                 address = str(params[0])
472                 result = self.get_status(address, cache_only)
473             except BaseException, e:
474                 error = str(e) + ': ' + address
475                 print_log("error:", error)
476
477         elif method == 'blockchain.address.get_history':
478             try:
479                 address = str(params[0])
480                 result = self.get_history(address, cache_only)
481             except BaseException, e:
482                 error = str(e) + ': ' + address
483                 print_log("error:", error)
484
485         elif method == 'blockchain.address.get_balance':
486             try:
487                 address = str(params[0])
488                 result = self.storage.get_balance(address)
489             except BaseException, e:
490                 error = str(e) + ': ' + address
491                 print_log("error:", error)
492
493         elif method == 'blockchain.address.get_path':
494             try:
495                 address = str(params[0])
496                 result = self.storage.get_address_path(address)
497             except BaseException, e:
498                 error = str(e) + ': ' + address
499                 print_log("error:", error)
500
501         elif method == 'blockchain.address.listunspent':
502             try:
503                 address = str(params[0])
504                 result = self.storage.listunspent(address)
505             except BaseException, e:
506                 error = str(e) + ': ' + address
507                 print_log("error:", error)
508
509         elif method == 'blockchain.block.get_header':
510             if cache_only:
511                 result = -1
512             else:
513                 try:
514                     height = params[0]
515                     result = self.get_header(height)
516                 except BaseException, e:
517                     error = str(e) + ': %d' % height
518                     print_log("error:", error)
519
520         elif method == 'blockchain.block.get_chunk':
521             if cache_only:
522                 result = -1
523             else:
524                 try:
525                     index = params[0]
526                     result = self.get_chunk(index)
527                 except BaseException, e:
528                     error = str(e) + ': %d' % index
529                     print_log("error:", error)
530
531         elif method == 'blockchain.transaction.broadcast':
532             try:
533                 txo = self.bitcoind('sendrawtransaction', params)
534                 print_log("sent tx:", txo)
535                 result = txo
536             except BaseException, e:
537                 result = str(e)  # do not send an error
538                 print_log("error:", result, params)
539
540         elif method == 'blockchain.transaction.get_merkle':
541             if cache_only:
542                 result = -1
543             else:
544                 try:
545                     tx_hash = params[0]
546                     tx_height = params[1]
547                     result = self.get_merkle(tx_hash, tx_height)
548                 except BaseException, e:
549                     error = str(e) + ': ' + repr(params)
550                     print_log("get_merkle error:", error)
551
552         elif method == 'blockchain.transaction.get':
553             try:
554                 tx_hash = params[0]
555                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
556             except BaseException, e:
557                 error = str(e) + ': ' + repr(params)
558                 print_log("tx get error:", error)
559
560         else:
561             error = "unknown method:%s" % method
562
563         if cache_only and result == -1:
564             return -1
565
566         if error:
567             self.push_response(session, {'id': message_id, 'error': error})
568         elif result != '':
569             self.push_response(session, {'id': message_id, 'result': result})
570
571
572     def getfullblock(self, block_hash):
573         block = self.bitcoind('getblock', [block_hash])
574
575         rawtxreq = []
576         i = 0
577         for txid in block['tx']:
578             rawtxreq.append({
579                 "method": "getrawtransaction",
580                 "params": [txid],
581                 "id": i,
582             })
583             i += 1
584
585         postdata = dumps(rawtxreq)
586         try:
587             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
588         except:
589             traceback.print_exc(file=sys.stdout)
590             self.shared.stop()
591
592         r = loads(respdata)
593         rawtxdata = []
594         for ir in r:
595             if ir['error'] is not None:
596                 self.shared.stop()
597                 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
598                 raise BaseException(ir['error'])
599             rawtxdata.append(ir['result'])
600         block['tx'] = rawtxdata
601         return block
602
603     def catch_up(self, sync=True):
604
605         prh = None
606         while not self.shared.stopped():
607
608             self.mtime('')
609
610             # are we done yet?
611             info = self.bitcoind('getinfo')
612             self.bitcoind_height = info.get('blocks')
613             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
614             if self.storage.last_hash == bitcoind_block_hash:
615                 self.up_to_date = True
616                 break
617
618             # not done..
619             self.up_to_date = False
620             next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
621             next_block = self.getfullblock(next_block_hash)
622             self.mtime('daemon')
623
624             # fixme: this is unsafe, if we revert when the undo info is not yet written
625             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
626
627             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
628
629                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
630                 self.storage.height = self.storage.height + 1
631                 self.write_header(self.block2header(next_block), sync)
632                 self.storage.last_hash = next_block_hash
633                 self.mtime('import')
634             
635                 if self.storage.height % 1000 == 0 and not sync:
636                     t_daemon = self.mtimes.get('daemon')
637                     t_import = self.mtimes.get('import')
638                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
639                     self.mtimes['daemon'] = 0
640                     self.mtimes['import'] = 0
641
642                 if prh:
643                     rh = self.storage.get_root_hash().encode('hex')
644                     if prh != rh:
645                         print_log("root hash error", prh, rh)
646                         self.shared.stop()
647                         raise
648                     prh = None
649
650             else:
651                 prh = self.storage.get_root_hash().encode('hex')
652
653                 # revert current block
654                 block = self.getfullblock(self.storage.last_hash)
655                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
656                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
657                 self.pop_header()
658                 self.flush_headers()
659
660                 self.storage.height -= 1
661
662                 # read previous header from disk
663                 self.header = self.read_header(self.storage.height)
664                 self.storage.last_hash = self.hash_header(self.header)
665
666
667         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
668
669         if self.shared.stopped(): 
670             print_log( "closing database" )
671             self.storage.close()
672
673
674     def memorypool_update(self):
675         mempool_hashes = set(self.bitcoind('getrawmempool'))
676         touched_addresses = set([])
677
678         for tx_hash in mempool_hashes:
679             if tx_hash in self.mempool_hashes:
680                 continue
681
682             tx = self.get_mempool_transaction(tx_hash)
683             if not tx:
684                 continue
685
686             mpa = self.mempool_addresses.get(tx_hash, [])
687             for x in tx.get('inputs'):
688                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
689                 addr = x.get('address')
690                 if addr and addr not in mpa:
691                     mpa.append(addr)
692                     touched_addresses.add(addr)
693
694             for x in tx.get('outputs'):
695                 addr = x.get('address')
696                 if addr and addr not in mpa:
697                     mpa.append(addr)
698                     touched_addresses.add(addr)
699
700             self.mempool_addresses[tx_hash] = mpa
701             self.mempool_hashes.add(tx_hash)
702
703         # remove older entries from mempool_hashes
704         self.mempool_hashes = mempool_hashes
705
706         # remove deprecated entries from mempool_addresses
707         for tx_hash, addresses in self.mempool_addresses.items():
708             if tx_hash not in self.mempool_hashes:
709                 self.mempool_addresses.pop(tx_hash)
710                 for addr in addresses:
711                     touched_addresses.add(addr)
712
713         # rebuild mempool histories
714         new_mempool_hist = {}
715         for tx_hash, addresses in self.mempool_addresses.items():
716             for addr in addresses:
717                 h = new_mempool_hist.get(addr, [])
718                 if tx_hash not in h:
719                     h.append(tx_hash)
720                 new_mempool_hist[addr] = h
721
722         with self.mempool_lock:
723             self.mempool_hist = new_mempool_hist
724
725         # invalidate cache for touched addresses
726         for addr in touched_addresses:
727             self.invalidate_cache(addr)
728
729
730     def invalidate_cache(self, address):
731         with self.cache_lock:
732             if address in self.history_cache:
733                 print_log("cache: invalidating", address)
734                 self.history_cache.pop(address)
735
736         with self.watch_lock:
737             sessions = self.watched_addresses.get(address)
738
739         if sessions:
740             # TODO: update cache here. if new value equals cached value, do not send notification
741             self.address_queue.put((address,sessions))
742
743     def main_iteration(self):
744         if self.shared.stopped():
745             print_log("blockchain processor terminating")
746             self.storage.close()
747             return
748
749         with self.dblock:
750             t1 = time.time()
751             self.catch_up()
752             t2 = time.time()
753
754         self.memorypool_update()
755
756         if self.sent_height != self.storage.height:
757             self.sent_height = self.storage.height
758             for session in self.watch_blocks:
759                 self.push_response(session, {
760                         'id': None,
761                         'method': 'blockchain.numblocks.subscribe',
762                         'params': [self.storage.height],
763                         })
764
765         if self.sent_header != self.header:
766             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
767             self.sent_header = self.header
768             for session in self.watch_headers:
769                 self.push_response(session, {
770                         'id': None,
771                         'method': 'blockchain.headers.subscribe',
772                         'params': [self.header],
773                         })
774
775         while True:
776             try:
777                 addr, sessions = self.address_queue.get(False)
778             except:
779                 break
780
781             status = self.get_status(addr)
782             for session in sessions:
783                 self.push_response(session, {
784                         'id': None,
785                         'method': 'blockchain.address.subscribe',
786                         'params': [addr, status],
787                         })
788
789         if not self.shared.stopped():
790             threading.Timer(10, self.main_iteration).start()
791         else:
792             print_log("blockchain processor terminating")