various bug fixes
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 from json import dumps, loads
2 import leveldb, urllib
3 import deserialize
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys
7
8
9
10 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
11 hash_encode = lambda x: x[::-1].encode('hex')
12 hash_decode = lambda x: x.decode('hex')[::-1]
13
14
15
16 def rev_hex(s):
17     return s.decode('hex')[::-1].encode('hex')
18
19
20 def int_to_hex(i, length=1):
21     s = hex(i)[2:].rstrip('L')
22     s = "0"*(2*length - len(s)) + s
23     return rev_hex(s)
24
25
26 from processor import Processor, print_log
27
28 class BlockchainProcessor(Processor):
29
30     def __init__(self, config, shared):
31         Processor.__init__(self)
32
33         self.shared = shared
34         self.up_to_date = False
35         self.watched_addresses = []
36         self.history_cache = {}
37         self.chunk_cache = {}
38         self.cache_lock = threading.Lock()
39
40         self.mempool_hist = {}
41         self.known_mempool_hashes = []
42         self.address_queue = Queue()
43
44         self.dblock = threading.Lock()
45         try:
46             self.db = leveldb.LevelDB(config.get('leveldb', 'path'))
47         except:
48             traceback.print_exc(file=sys.stdout)
49             self.shared.stop()
50
51         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
52             config.get('bitcoind','user'),
53             config.get('bitcoind','password'),
54             config.get('bitcoind','host'),
55             config.get('bitcoind','port'))
56
57         self.height = 0
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('0'))
63             hh, self.height, _ = hist[0] 
64             self.block_hashes = [hh]
65             print_log( "hist", hist )
66         except:
67             #traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
71
72         # catch_up first
73         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
74         while not shared.stopped() and not self.up_to_date:
75             try:
76                 time.sleep(1)
77             except:
78                 print "keyboard interrupt: stopping threads"
79                 shared.stop()
80                 sys.exit(0)
81
82         print "blockchain is up to date."
83         threading.Timer(10, self.main_iteration).start()
84
85
86
87     def bitcoind(self, method, params=[]):
88         postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
89         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
90         r = loads(respdata)
91         if r['error'] != None:
92             raise BaseException(r['error'])
93         return r.get('result')
94     
95
96     def serialize(self, h):
97         s = ''
98         for txid, txpos, height in h:
99             s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
100         return s.decode('hex')
101
102
103     def deserialize(self, s):
104         h = []
105         while s:
106             txid = s[0:32].encode('hex')
107             txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
108             height = int( rev_hex( s[36:40].encode('hex') ), 16 )
109             h.append( ( txid, txpos, height ) )
110             s = s[40:]
111         return h
112
113
114     def block2header(self, b):
115         return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), 
116                 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
117
118
119     def get_header(self, height):
120         block_hash = self.bitcoind('getblockhash', [height])
121         b = self.bitcoind('getblock', [block_hash])
122         return self.block2header(b)
123     
124
125     def get_chunk(self):
126         # store them on disk; store the current chunk in memory
127         pass
128
129
130     def get_transaction(self, txid, block_height=-1, is_coinbase = False):
131         raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
132         vds = deserialize.BCDataStream()
133         vds.write(raw_tx.decode('hex'))
134         out = deserialize.parse_Transaction(vds, is_coinbase)
135         return out
136
137
138     def get_history(self, addr, cache_only=False):
139         with self.cache_lock: hist = self.history_cache.get( addr )
140         if hist is not None: return hist
141         if cache_only: return -1
142
143         with self.dblock:
144             try:
145                 hist = self.deserialize(self.db.Get(addr))
146                 is_known = True
147             except: 
148                 hist = []
149                 is_known = False
150
151         # should not be necessary
152         hist.sort( key=lambda tup: tup[1])
153         # check uniqueness too...
154
155         # add memory pool
156         for txid in self.mempool_hist.get(addr,[]):
157             hist.append((txid, 0))
158
159         hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
160         # add something to distinguish between unused and empty addresses
161         if hist == [] and is_known: hist = ['*']
162
163         with self.cache_lock: self.history_cache[addr] = hist
164         return hist
165
166
167     def get_status(self, addr, cache_only=False):
168         tx_points = self.get_history(addr, cache_only)
169         if cache_only and tx_points == -1: return -1
170
171         if not tx_points: return None
172         if tx_points == ['*']: return '*'
173         status = ''
174         for tx in tx_points:
175             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
176         return hashlib.sha256( status ).digest().encode('hex')
177
178
179     def get_merkle(self, tx_hash, height):
180
181         block_hash = self.bitcoind('getblockhash', [height])
182         b = self.bitcoind('getblock', [block_hash])
183         tx_list = b.get('tx')
184         tx_pos = tx_list.index(tx_hash)
185         
186         merkle = map(hash_decode, tx_list)
187         target_hash = hash_decode(tx_hash)
188         s = []
189         while len(merkle) != 1:
190             if len(merkle)%2: merkle.append( merkle[-1] )
191             n = []
192             while merkle:
193                 new_hash = Hash( merkle[0] + merkle[1] )
194                 if merkle[0] == target_hash:
195                     s.append( hash_encode( merkle[1]))
196                     target_hash = new_hash
197                 elif merkle[1] == target_hash:
198                     s.append( hash_encode( merkle[0]))
199                     target_hash = new_hash
200                 n.append( new_hash )
201                 merkle = merkle[2:]
202             merkle = n
203
204         return {"block_height":height, "merkle":s, "pos":tx_pos}
205
206         
207     def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
208
209         # we do it chronologically, so nothing wrong can happen...
210         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
211         self.batch_list[addr] += s
212
213         # backlink
214         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
215         self.batch_txio[txo] = addr
216
217
218     def remove_from_batch(self, tx_hash, tx_pos):
219                     
220         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
221         try:
222             addr = self.batch_txio[txi]
223         except:
224             #raise BaseException(tx_hash, tx_pos)
225             print "WARNING: cannot find address for", (tx_hash, tx_pos)
226             return
227
228         serialized_hist = self.batch_list[addr]
229
230         l = len(serialized_hist)/40
231         for i in range(l):
232             if serialized_hist[40*i:40*i+36] == txi:
233                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
234                 break
235         else:
236             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
237         self.batch_list[addr] = serialized_hist
238
239
240     def deserialize_block(self, block):
241         txlist = block.get('tx')
242         tx_hashes = []  # ordered txids
243         txdict = {}     # deserialized tx
244         is_coinbase = True
245         for raw_tx in txlist:
246             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
247             tx_hashes.append(tx_hash)
248             vds = deserialize.BCDataStream()
249             vds.write(raw_tx.decode('hex'))
250             tx = deserialize.parse_Transaction(vds, is_coinbase)
251             txdict[tx_hash] = tx
252             is_coinbase = False
253         return tx_hashes, txdict
254
255
256     def import_block(self, block, block_hash, block_height, sync, revert=False):
257
258         self.batch_list = {}  # address -> history
259         self.batch_txio = {}  # transaction i/o -> address
260
261         inputs_to_read = []
262         addr_to_read = []
263
264         # deserialize transactions
265         t0 = time.time()
266         tx_hashes, txdict = self.deserialize_block(block)
267
268         # read addresses of tx inputs
269         t00 = time.time()
270         for tx in txdict.values():
271             for x in tx.get('inputs'):
272                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
273                 inputs_to_read.append(txi)
274
275         inputs_to_read.sort()
276         for txi in inputs_to_read:
277             try:
278                 addr = self.db.Get(txi)    
279             except:
280                 # the input could come from the same block
281                 continue
282             self.batch_txio[txi] = addr
283             addr_to_read.append(addr)
284
285         # read histories of addresses
286         for txid, tx in txdict.items():
287             for x in tx.get('outputs'):
288                 addr_to_read.append(x.get('address'))
289
290         addr_to_read.sort()
291         for addr in addr_to_read:
292             try:
293                 self.batch_list[addr] = self.db.Get(addr)
294             except: 
295                 self.batch_list[addr] = ''
296               
297         # process
298         t1 = time.time()
299
300         for txid in tx_hashes: # must be ordered
301             tx = txdict[txid]
302             if not revert:
303                 for x in tx.get('inputs'):
304                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
305                 for x in tx.get('outputs'):
306                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
307             else:
308                 for x in tx.get('outputs'):
309                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
310                 for x in tx.get('inputs'):
311                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
312
313         # write
314         max_len = 0
315         max_addr = ''
316         t2 = time.time()
317
318         batch = leveldb.WriteBatch()
319         for addr, serialized_hist in self.batch_list.items():
320             batch.Put(addr, serialized_hist)
321             l = len(serialized_hist)
322             if l > max_len:
323                 max_len = l
324                 max_addr = addr
325
326         for txio, addr in self.batch_txio.items():
327             batch.Put(txio, addr)
328         # delete spent inputs
329         for txi in inputs_to_read:
330             batch.Delete(txi)
331         batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
332
333         # actual write
334         self.db.Write(batch, sync = sync)
335
336         t3 = time.time()
337         if t3 - t0 > 10: 
338             print_log("block", block_height, 
339                       "parse:%0.2f "%(t00 - t0), 
340                       "read:%0.2f "%(t1 - t00), 
341                       "proc:%.2f "%(t2-t1), 
342                       "write:%.2f "%(t3-t2), 
343                       "max:", max_len, max_addr)
344
345         # invalidate cache
346         for addr in self.batch_list.keys(): self.update_history_cache(addr)
347
348
349
350     def add_request(self, request):
351         # see if we can get if from cache. if not, add to queue
352         if self.process( request, cache_only = True) == -1:
353             self.queue.put(request)
354
355
356
357     def process(self, request, cache_only = False):
358         #print "abe process", request
359
360         message_id = request['id']
361         method = request['method']
362         params = request.get('params',[])
363         result = None
364         error = None
365
366         if method == 'blockchain.numblocks.subscribe':
367             result = self.height
368
369         elif method == 'blockchain.headers.subscribe':
370             result = self.header
371
372         elif method == 'blockchain.address.subscribe':
373             try:
374                 address = params[0]
375                 result = self.get_status(address, cache_only)
376                 self.watch_address(address)
377             except BaseException, e:
378                 error = str(e) + ': ' + address
379                 print_log( "error:", error )
380
381         elif method == 'blockchain.address.subscribe2':
382             try:
383                 address = params[0]
384                 result = self.get_status(address, cache_only)
385                 self.watch_address(address)
386             except BaseException, e:
387                 error = str(e) + ': ' + address
388                 print_log( "error:", error )
389
390         elif method == 'blockchain.address.get_history2':
391             try:
392                 address = params[0]
393                 result = self.get_history( address, cache_only )
394             except BaseException, e:
395                 error = str(e) + ': ' + address
396                 print_log( "error:", error )
397
398         elif method == 'blockchain.block.get_header':
399             if cache_only: 
400                 result = -1
401             else:
402                 try:
403                     height = params[0]
404                     result = self.get_header( height ) 
405                 except BaseException, e:
406                     error = str(e) + ': %d'% height
407                     print_log( "error:", error )
408                     
409         elif method == 'blockchain.block.get_chunk':
410             if cache_only:
411                 result = -1
412             else:
413                 try:
414                     index = params[0]
415                     result = self.get_chunk( index ) 
416                 except BaseException, e:
417                     error = str(e) + ': %d'% index
418                     print_log( "error:", error)
419
420         elif method == 'blockchain.transaction.broadcast':
421             txo = self.bitcoind('sendrawtransaction', params[0])
422             print_log( "sent tx:", txo )
423             result = txo 
424
425         elif method == 'blockchain.transaction.get_merkle':
426             if cache_only:
427                 result = -1
428             else:
429                 try:
430                     tx_hash = params[0]
431                     tx_height = params[1]
432                     result = self.get_merkle(tx_hash, tx_height) 
433                 except BaseException, e:
434                     error = str(e) + ': ' + tx_hash
435                     print_log( "error:", error )
436                     
437         elif method == 'blockchain.transaction.get':
438             try:
439                 tx_hash = params[0]
440                 height = params[1]
441                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) 
442             except BaseException, e:
443                 error = str(e) + ': ' + tx_hash
444                 print_log( "error:", error )
445
446         else:
447             error = "unknown method:%s"%method
448
449         if cache_only and result == -1: return -1
450
451         if error:
452             response = { 'id':message_id, 'error':error }
453             self.push_response(response)
454         elif result != '':
455             response = { 'id':message_id, 'result':result }
456             self.push_response(response)
457
458
459     def watch_address(self, addr):
460         if addr not in self.watched_addresses:
461             self.watched_addresses.append(addr)
462
463
464
465     def last_hash(self):
466         return self.block_hashes[-1]
467
468
469     def catch_up(self, sync = True):
470         t1 = time.time()
471
472         while not self.shared.stopped():
473
474             # are we done yet?
475             info = self.bitcoind('getinfo')
476             bitcoind_height = info.get('blocks')
477             bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
478             if self.last_hash() == bitcoind_block_hash: 
479                 self.up_to_date = True
480                 break
481
482             # not done..
483             self.up_to_date = False
484             block_hash = self.bitcoind('getblockhash', [self.height+1])
485             block = self.bitcoind('getblock', [block_hash, 1])
486
487             if block.get('previousblockhash') == self.last_hash():
488
489                 self.import_block(block, block_hash, self.height+1, sync)
490                 self.height = self.height + 1
491                 self.block_hashes.append(block_hash)
492                 self.block_hashes = self.block_hashes[-10:]
493
494                 if (self.height+1)%100 == 0 and not sync: 
495                     t2 = time.time()
496                     print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
497                     t1 = t2
498
499                     
500             else:
501                 # revert current block
502                 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
503                 block_hash = self.last_hash()
504                 block = self.bitcoind('getblock', [block_hash, 1])
505                 self.height = self.height -1
506                 self.block_hashes.remove(block_hash)
507                 self.import_block(block, self.last_hash(), self.height, revert=True)
508         
509
510         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
511
512         
513
514             
515     def memorypool_update(self):
516
517         mempool_hashes = self.bitcoind('getrawmempool')
518
519         for tx_hash in mempool_hashes:
520             if tx_hash in self.known_mempool_hashes: continue
521             self.known_mempool_hashes.append(tx_hash)
522
523             tx = self.get_transaction(tx_hash)
524             if not tx: continue
525
526             for x in tx.get('inputs') + tx.get('outputs'):
527                 addr = x.get('address')
528                 hist = self.mempool_hist.get(addr, [])
529                 if tx_hash not in hist: 
530                     hist.append( tx_hash )
531                     self.mempool_hist[addr] = hist
532                     self.update_history_cache(addr)
533
534         self.known_mempool_hashes = mempool_hashes
535
536
537     def update_history_cache(self, address):
538         with self.cache_lock:
539             if self.history_cache.has_key(address):
540                 print_log( "cache: invalidating", address )
541                 self.history_cache.pop(address)
542
543
544
545     def main_iteration(self):
546
547         if self.shared.stopped(): 
548             print_log( "blockchain processor terminating" )
549             return
550
551         with self.dblock:
552             t1 = time.time()
553             self.catch_up()
554             t2 = time.time()
555
556         self.memorypool_update()
557
558         if self.sent_height != self.height:
559             self.sent_height = self.height
560             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
561
562         if self.sent_header != self.header:
563             print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
564             self.sent_header = self.header
565             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
566
567         while True:
568             try:
569                 addr = self.address_queue.get(False)
570             except:
571                 break
572             if addr in self.watched_addresses:
573                 status = self.get_status( addr )
574                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
575
576
577         if not self.shared.stopped(): 
578             threading.Timer(10, self.main_iteration).start()
579         else:
580             print_log( "blockchain processor terminating" )
581
582
583
584