fixes
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 from json import dumps, loads
2 import leveldb, urllib
3 import deserialize
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys, os
7
8
9
10 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
11 hash_encode = lambda x: x[::-1].encode('hex')
12 hash_decode = lambda x: x.decode('hex')[::-1]
13
14
15
16 def rev_hex(s):
17     return s.decode('hex')[::-1].encode('hex')
18
19
20 def int_to_hex(i, length=1):
21     s = hex(i)[2:].rstrip('L')
22     s = "0"*(2*length - len(s)) + s
23     return rev_hex(s)
24
25 def header_to_string(res):
26     pbh = res.get('prev_block_hash')
27     if pbh is None: pbh = '0'*64
28     s = int_to_hex(res.get('version'),4) \
29         + rev_hex(pbh) \
30         + rev_hex(res.get('merkle_root')) \
31         + int_to_hex(int(res.get('timestamp')),4) \
32         + int_to_hex(int(res.get('bits')),4) \
33         + int_to_hex(int(res.get('nonce')),4)
34     return s
35
36 def header_from_string( s):
37     hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
38     h = {}
39     h['version'] = hex_to_int(s[0:4])
40     h['prev_block_hash'] = hash_encode(s[4:36])
41     h['merkle_root'] = hash_encode(s[36:68])
42     h['timestamp'] = hex_to_int(s[68:72])
43     h['bits'] = hex_to_int(s[72:76])
44     h['nonce'] = hex_to_int(s[76:80])
45     return h
46
47
48
49
50 from processor import Processor, print_log
51
52 class BlockchainProcessor(Processor):
53
54     def __init__(self, config, shared):
55         Processor.__init__(self)
56
57         self.shared = shared
58         self.up_to_date = False
59         self.watched_addresses = []
60         self.history_cache = {}
61         self.chunk_cache = {}
62         self.cache_lock = threading.Lock()
63         self.headers_data = ''
64
65         self.mempool_hist = {}
66         self.known_mempool_hashes = []
67         self.address_queue = Queue()
68         self.dbpath = config.get('leveldb', 'path')
69
70         self.dblock = threading.Lock()
71         try:
72             self.db = leveldb.LevelDB(self.dbpath)
73         except:
74             traceback.print_exc(file=sys.stdout)
75             self.shared.stop()
76
77         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
78             config.get('bitcoind','user'),
79             config.get('bitcoind','password'),
80             config.get('bitcoind','host'),
81             config.get('bitcoind','port'))
82
83         self.height = 0
84         self.sent_height = 0
85         self.sent_header = None
86
87
88         try:
89             hist = self.deserialize(self.db.Get('0'))
90             hh, self.height, _ = hist[0] 
91             self.block_hashes = [hh]
92             print_log( "hist", hist )
93         except:
94             #traceback.print_exc(file=sys.stdout)
95             print_log('initializing database')
96             self.height = 0
97             self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
98
99         # catch_up headers
100         self.init_headers(self.height)
101
102         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
103         while not shared.stopped() and not self.up_to_date:
104             try:
105                 time.sleep(1)
106             except:
107                 print "keyboard interrupt: stopping threads"
108                 shared.stop()
109                 sys.exit(0)
110
111         print "blockchain is up to date."
112
113         threading.Timer(10, self.main_iteration).start()
114
115
116
117     def bitcoind(self, method, params=[]):
118         postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
119         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
120         r = loads(respdata)
121         if r['error'] != None:
122             raise BaseException(r['error'])
123         return r.get('result')
124     
125
126     def serialize(self, h):
127         s = ''
128         for txid, txpos, height in h:
129             s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
130         return s.decode('hex')
131
132
133     def deserialize(self, s):
134         h = []
135         while s:
136             txid = s[0:32].encode('hex')
137             txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
138             height = int( rev_hex( s[36:40].encode('hex') ), 16 )
139             h.append( ( txid, txpos, height ) )
140             s = s[40:]
141         return h
142
143
144     def block2header(self, b):
145         return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), 
146                 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
147
148
149     def get_header(self, height):
150         block_hash = self.bitcoind('getblockhash', [height])
151         b = self.bitcoind('getblock', [block_hash])
152         return self.block2header(b)
153     
154
155     def init_headers(self, db_height):
156         self.chunk_cache = {}
157         self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
158
159         height = 0
160         if os.path.exists(self.headers_filename):
161             height = os.path.getsize(self.headers_filename)/80
162
163         if height:
164             prev_header = self.read_header(height -1)
165             prev_hash = self.hash_header(prev_header)
166         else:
167             open(self.headers_filename,'wb').close()
168             prev_hash = None
169
170         if height != db_height:
171             print_log( "catching up missing headers:", height, db_height)
172
173         s = ''
174         try:
175             for i in range(height, db_height):
176                 header = self.get_header(i)
177                 assert prev_hash == header.get('prev_block_hash')
178                 self.write_header(header, sync=False)
179                 prev_hash = self.hash_header(header)
180                 if i%1000==0: print_log("headers file:",i)
181         except KeyboardInterrupt:
182             self.flush_headers()
183             sys.exit()
184
185         self.flush_headers()
186
187
188     def hash_header(self, header):
189         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
190
191
192     def read_header(self, block_height):
193         if os.path.exists(self.headers_filename):
194             f = open(self.headers_filename,'rb')
195             f.seek(block_height*80)
196             h = f.read(80)
197             f.close()
198             if len(h) == 80:
199                 h = header_from_string(h)
200                 return h
201
202
203     def read_chunk(self, index):
204         f = open(self.headers_filename,'rb')
205         f.seek(index*2016*80)
206         chunk = f.read(2016*80)
207         f.close()
208         return chunk.encode('hex')
209
210
211     def write_header(self, header, sync=True):
212         if not self.headers_data:
213             self.headers_offset = header.get('block_height')
214         self.headers_data += header_to_string(header).decode('hex')
215         if sync or len(self.headers_data) > 40*100:
216             self.flush_headers()
217
218     def pop_header(self):
219         # we need to do this only if we have not flushed
220         if self.headers_data:
221             self.headers_data = self.headers_data[:-40]
222
223     def flush_headers(self):
224         if not self.headers_data: return
225         f = open(self.headers_filename,'rb+')
226         f.seek(self.headers_offset*80)
227         f.write(self.headers_data)
228         f.close()
229         self.headers_data = ''
230
231
232     def get_chunk(self, i):
233         # store them on disk; store the current chunk in memory
234         chunk = self.chunk_cache.get(i)
235         if not chunk:
236             chunk = self.read_chunk(i)
237             self.chunk_cache[i] = chunk
238         return chunk
239
240
241     def get_transaction(self, txid, block_height=-1, is_coinbase = False):
242         raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
243         vds = deserialize.BCDataStream()
244         vds.write(raw_tx.decode('hex'))
245         out = deserialize.parse_Transaction(vds, is_coinbase)
246         return out
247
248
249     def get_history(self, addr, cache_only=False):
250         with self.cache_lock: hist = self.history_cache.get( addr )
251         if hist is not None: return hist
252         if cache_only: return -1
253
254         with self.dblock:
255             try:
256                 hist = self.deserialize(self.db.Get(addr))
257                 is_known = True
258             except: 
259                 hist = []
260                 is_known = False
261
262         # should not be necessary
263         hist.sort( key=lambda tup: tup[1])
264         # check uniqueness too...
265
266         # add memory pool
267         for txid in self.mempool_hist.get(addr,[]):
268             hist.append((txid, 0, 0))
269
270         hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
271         # add something to distinguish between unused and empty addresses
272         if hist == [] and is_known: hist = ['*']
273
274         with self.cache_lock: self.history_cache[addr] = hist
275         return hist
276
277
278     def get_status(self, addr, cache_only=False):
279         tx_points = self.get_history(addr, cache_only)
280         if cache_only and tx_points == -1: return -1
281
282         if not tx_points: return None
283         if tx_points == ['*']: return '*'
284         status = ''
285         for tx in tx_points:
286             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
287         return hashlib.sha256( status ).digest().encode('hex')
288
289
290     def get_merkle(self, tx_hash, height):
291
292         block_hash = self.bitcoind('getblockhash', [height])
293         b = self.bitcoind('getblock', [block_hash])
294         tx_list = b.get('tx')
295         tx_pos = tx_list.index(tx_hash)
296         
297         merkle = map(hash_decode, tx_list)
298         target_hash = hash_decode(tx_hash)
299         s = []
300         while len(merkle) != 1:
301             if len(merkle)%2: merkle.append( merkle[-1] )
302             n = []
303             while merkle:
304                 new_hash = Hash( merkle[0] + merkle[1] )
305                 if merkle[0] == target_hash:
306                     s.append( hash_encode( merkle[1]))
307                     target_hash = new_hash
308                 elif merkle[1] == target_hash:
309                     s.append( hash_encode( merkle[0]))
310                     target_hash = new_hash
311                 n.append( new_hash )
312                 merkle = merkle[2:]
313             merkle = n
314
315         return {"block_height":height, "merkle":s, "pos":tx_pos}
316
317         
318     def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
319
320         # we do it chronologically, so nothing wrong can happen...
321         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
322         self.batch_list[addr] += s
323
324         # backlink
325         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
326         self.batch_txio[txo] = addr
327
328
329     def remove_from_batch(self, tx_hash, tx_pos):
330                     
331         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
332         try:
333             addr = self.batch_txio[txi]
334         except:
335             #raise BaseException(tx_hash, tx_pos)
336             print "WARNING: cannot find address for", (tx_hash, tx_pos)
337             return
338
339         serialized_hist = self.batch_list[addr]
340
341         l = len(serialized_hist)/40
342         for i in range(l):
343             if serialized_hist[40*i:40*i+36] == txi:
344                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
345                 break
346         else:
347             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
348         self.batch_list[addr] = serialized_hist
349
350
351     def deserialize_block(self, block):
352         txlist = block.get('tx')
353         tx_hashes = []  # ordered txids
354         txdict = {}     # deserialized tx
355         is_coinbase = True
356         for raw_tx in txlist:
357             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
358             tx_hashes.append(tx_hash)
359             vds = deserialize.BCDataStream()
360             vds.write(raw_tx.decode('hex'))
361             tx = deserialize.parse_Transaction(vds, is_coinbase)
362             txdict[tx_hash] = tx
363             is_coinbase = False
364         return tx_hashes, txdict
365
366
367     def import_block(self, block, block_hash, block_height, sync, revert=False):
368
369         self.batch_list = {}  # address -> history
370         self.batch_txio = {}  # transaction i/o -> address
371
372         inputs_to_read = []
373         addr_to_read = []
374
375         # deserialize transactions
376         t0 = time.time()
377         tx_hashes, txdict = self.deserialize_block(block)
378
379         # read addresses of tx inputs
380         t00 = time.time()
381         for tx in txdict.values():
382             for x in tx.get('inputs'):
383                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
384                 inputs_to_read.append(txi)
385
386         inputs_to_read.sort()
387         for txi in inputs_to_read:
388             try:
389                 addr = self.db.Get(txi)    
390             except:
391                 # the input could come from the same block
392                 continue
393             self.batch_txio[txi] = addr
394             addr_to_read.append(addr)
395
396         # read histories of addresses
397         for txid, tx in txdict.items():
398             for x in tx.get('outputs'):
399                 addr_to_read.append(x.get('address'))
400
401         addr_to_read.sort()
402         for addr in addr_to_read:
403             try:
404                 self.batch_list[addr] = self.db.Get(addr)
405             except: 
406                 self.batch_list[addr] = ''
407               
408         # process
409         t1 = time.time()
410
411         for txid in tx_hashes: # must be ordered
412             tx = txdict[txid]
413             if not revert:
414                 for x in tx.get('inputs'):
415                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
416                 for x in tx.get('outputs'):
417                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
418             else:
419                 for x in tx.get('outputs'):
420                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
421                 for x in tx.get('inputs'):
422                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
423
424         # write
425         max_len = 0
426         max_addr = ''
427         t2 = time.time()
428
429         batch = leveldb.WriteBatch()
430         for addr, serialized_hist in self.batch_list.items():
431             batch.Put(addr, serialized_hist)
432             l = len(serialized_hist)
433             if l > max_len:
434                 max_len = l
435                 max_addr = addr
436
437         for txio, addr in self.batch_txio.items():
438             batch.Put(txio, addr)
439         # delete spent inputs
440         for txi in inputs_to_read:
441             batch.Delete(txi)
442         batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
443
444         # actual write
445         self.db.Write(batch, sync = sync)
446
447         t3 = time.time()
448         if t3 - t0 > 10: 
449             print_log("block", block_height, 
450                       "parse:%0.2f "%(t00 - t0), 
451                       "read:%0.2f "%(t1 - t00), 
452                       "proc:%.2f "%(t2-t1), 
453                       "write:%.2f "%(t3-t2), 
454                       "max:", max_len, max_addr)
455
456         # invalidate cache
457         for addr in self.batch_list.keys(): self.update_history_cache(addr)
458
459
460
461     def add_request(self, request):
462         # see if we can get if from cache. if not, add to queue
463         if self.process( request, cache_only = True) == -1:
464             self.queue.put(request)
465
466
467
468     def process(self, request, cache_only = False):
469         #print "abe process", request
470
471         message_id = request['id']
472         method = request['method']
473         params = request.get('params',[])
474         result = None
475         error = None
476
477         if method == 'blockchain.numblocks.subscribe':
478             result = self.height
479
480         elif method == 'blockchain.headers.subscribe':
481             result = self.header
482
483         elif method == 'blockchain.address.subscribe':
484             try:
485                 address = params[0]
486                 result = self.get_status(address, cache_only)
487                 self.watch_address(address)
488             except BaseException, e:
489                 error = str(e) + ': ' + address
490                 print_log( "error:", error )
491
492         elif method == 'blockchain.address.subscribe2':
493             try:
494                 address = params[0]
495                 result = self.get_status(address, cache_only)
496                 self.watch_address(address)
497             except BaseException, e:
498                 error = str(e) + ': ' + address
499                 print_log( "error:", error )
500
501         elif method == 'blockchain.address.get_history2':
502             try:
503                 address = params[0]
504                 result = self.get_history( address, cache_only )
505             except BaseException, e:
506                 error = str(e) + ': ' + address
507                 print_log( "error:", error )
508
509         elif method == 'blockchain.block.get_header':
510             if cache_only: 
511                 result = -1
512             else:
513                 try:
514                     height = params[0]
515                     result = self.get_header( height ) 
516                 except BaseException, e:
517                     error = str(e) + ': %d'% height
518                     print_log( "error:", error )
519                     
520         elif method == 'blockchain.block.get_chunk':
521             if cache_only:
522                 result = -1
523             else:
524                 try:
525                     index = params[0]
526                     result = self.get_chunk( index ) 
527                 except BaseException, e:
528                     error = str(e) + ': %d'% index
529                     print_log( "error:", error)
530
531         elif method == 'blockchain.transaction.broadcast':
532             txo = self.bitcoind('sendrawtransaction', params)
533             print_log( "sent tx:", txo )
534             result = txo 
535
536         elif method == 'blockchain.transaction.get_merkle':
537             if cache_only:
538                 result = -1
539             else:
540                 try:
541                     tx_hash = params[0]
542                     tx_height = params[1]
543                     result = self.get_merkle(tx_hash, tx_height) 
544                 except BaseException, e:
545                     error = str(e) + ': ' + tx_hash
546                     print_log( "error:", error )
547                     
548         elif method == 'blockchain.transaction.get':
549             try:
550                 tx_hash = params[0]
551                 height = params[1]
552                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) 
553             except BaseException, e:
554                 error = str(e) + ': ' + tx_hash
555                 print_log( "error:", error )
556
557         else:
558             error = "unknown method:%s"%method
559
560         if cache_only and result == -1: return -1
561
562         if error:
563             response = { 'id':message_id, 'error':error }
564             self.push_response(response)
565         elif result != '':
566             response = { 'id':message_id, 'result':result }
567             self.push_response(response)
568
569
570     def watch_address(self, addr):
571         if addr not in self.watched_addresses:
572             self.watched_addresses.append(addr)
573
574
575
576     def last_hash(self):
577         return self.block_hashes[-1]
578
579
580     def catch_up(self, sync = True):
581         t1 = time.time()
582
583         while not self.shared.stopped():
584
585             # are we done yet?
586             info = self.bitcoind('getinfo')
587             bitcoind_height = info.get('blocks')
588             bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
589             if self.last_hash() == bitcoind_block_hash: 
590                 self.up_to_date = True
591                 break
592
593             # not done..
594             self.up_to_date = False
595             block_hash = self.bitcoind('getblockhash', [self.height+1])
596             block = self.bitcoind('getblock', [block_hash, 1])
597
598             if block.get('previousblockhash') == self.last_hash():
599
600                 self.import_block(block, block_hash, self.height+1, sync)
601                 self.height = self.height + 1
602                 self.write_header(self.block2header(block), sync)
603
604                 self.block_hashes.append(block_hash)
605                 self.block_hashes = self.block_hashes[-10:]
606
607                 if (self.height+1)%100 == 0 and not sync: 
608                     t2 = time.time()
609                     print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
610                     t1 = t2
611
612                     
613             else:
614                 # revert current block
615                 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
616                 block_hash = self.last_hash()
617                 block = self.bitcoind('getblock', [block_hash, 1])
618                 self.height = self.height -1
619                 self.pop_header()
620
621                 self.block_hashes.remove(block_hash)
622                 self.import_block(block, self.last_hash(), self.height, revert=True)
623         
624
625         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
626
627         
628
629             
630     def memorypool_update(self):
631
632         mempool_hashes = self.bitcoind('getrawmempool')
633
634         for tx_hash in mempool_hashes:
635             if tx_hash in self.known_mempool_hashes: continue
636             self.known_mempool_hashes.append(tx_hash)
637
638             tx = self.get_transaction(tx_hash)
639             if not tx: continue
640
641             for x in tx.get('inputs') + tx.get('outputs'):
642                 addr = x.get('address')
643                 hist = self.mempool_hist.get(addr, [])
644                 if tx_hash not in hist: 
645                     hist.append( tx_hash )
646                     self.mempool_hist[addr] = hist
647                     self.update_history_cache(addr)
648
649         self.known_mempool_hashes = mempool_hashes
650
651
652     def update_history_cache(self, address):
653         with self.cache_lock:
654             if self.history_cache.has_key(address):
655                 print_log( "cache: invalidating", address )
656                 self.history_cache.pop(address)
657
658
659
660     def main_iteration(self):
661
662         if self.shared.stopped(): 
663             print_log( "blockchain processor terminating" )
664             return
665
666         with self.dblock:
667             t1 = time.time()
668             self.catch_up()
669             t2 = time.time()
670
671         self.memorypool_update()
672
673         if self.sent_height != self.height:
674             self.sent_height = self.height
675             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
676
677         if self.sent_header != self.header:
678             print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
679             self.sent_header = self.header
680             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
681
682         while True:
683             try:
684                 addr = self.address_queue.get(False)
685             except:
686                 break
687             if addr in self.watched_addresses:
688                 status = self.get_status( addr )
689                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
690
691
692         if not self.shared.stopped(): 
693             threading.Timer(10, self.main_iteration).start()
694         else:
695             print_log( "blockchain processor terminating" )
696
697
698
699