bitcoind + levelDB processor integration
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 from json import dumps, loads
2 import leveldb, urllib
3 import deserialize
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys
7
8
9
10 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
11 hash_encode = lambda x: x[::-1].encode('hex')
12 hash_decode = lambda x: x.decode('hex')[::-1]
13
14
15
16 def rev_hex(s):
17     return s.decode('hex')[::-1].encode('hex')
18
19
20 def int_to_hex(i, length=1):
21     s = hex(i)[2:].rstrip('L')
22     s = "0"*(2*length - len(s)) + s
23     return rev_hex(s)
24
25
26 from processor import Processor, print_log
27
28 class BlockchainProcessor(Processor):
29
30     def __init__(self, config, shared):
31         Processor.__init__(self)
32
33         self.shared = shared
34         self.up_to_date = False
35         self.watched_addresses = []
36         self.history_cache = {}
37         self.chunk_cache = {}
38         self.cache_lock = threading.Lock()
39
40         self.mempool_hist = {}
41         self.known_mempool_hashes = []
42         self.address_queue = Queue()
43
44         self.dblock = threading.Lock()
45         try:
46             self.db = leveldb.LevelDB(config.get('leveldb', 'path'))
47         except:
48             traceback.print_exc(file=sys.stdout)
49             self.shared.stop()
50
51         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
52             config.get('bitcoind','user'),
53             config.get('bitcoind','password'),
54             config.get('bitcoind','host'),
55             config.get('bitcoind','port'))
56
57         self.height = 0
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('0'))
63             hh, self.height, _ = hist[0] 
64             self.block_hashes = [hh]
65             print_log( "hist", hist )
66         except:
67             #traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
71
72         # catch_up first
73         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
74         while not shared.stopped() and not self.up_to_date:
75             try:
76                 time.sleep(1)
77             except:
78                 print "keyboard interrupt: stopping threads"
79                 shared.stop()
80                 sys.exit(0)
81
82
83
84     def bitcoind(self, method, params=[]):
85         postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
86         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
87         r = loads(respdata)
88         if r['error'] != None:
89             raise BaseException(r['error'])
90         return r.get('result')
91     
92
93     def serialize(self, h):
94         s = ''
95         for txid, txpos, height in h:
96             s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
97         return s.decode('hex')
98
99
100     def deserialize(self, s):
101         h = []
102         while s:
103             txid = s[0:32].encode('hex')
104             txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
105             height = int( rev_hex( s[36:40].encode('hex') ), 16 )
106             h.append( ( txid, txpos, height ) )
107             s = s[40:]
108         return h
109
110
111     def block2header(self, b):
112         return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), 
113                 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":b.get('bits'), "nonce":b.get('nonce')}
114
115
116     def get_header(self, height):
117         block_hash = self.bitcoind('getblockhash', [height])
118         b = self.bitcoind('getblock', [block_hash])
119         return self.block2header(b)
120     
121
122     def get_chunk(self):
123         # store them on disk; store the current chunk in memory
124         pass
125
126
127     def get_transaction(self, txid, block_height=-1, is_coinbase = False):
128         t0 = time.time()
129         raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
130         t1 = time.time()
131         vds = deserialize.BCDataStream()
132         vds.write(raw_tx.decode('hex'))
133         out = deserialize.parse_Transaction(vds, is_coinbase)
134         t2 = time.time()
135         return out, t1 - t0, t2 - t1
136
137
138     def get_history(self, addr, cache_only=False):
139         with self.cache_lock: hist = self.history_cache.get( addr )
140         if hist is not None: return hist
141         if cache_only: return -1
142
143         with self.dblock:
144             try:
145                 hist = self.deserialize(self.db.Get(addr))
146             except: 
147                 hist = []
148
149         # should not be necessary
150         hist.sort( key=lambda tup: tup[1])
151         # check uniqueness too...
152
153         # add memory pool
154         for txid in self.mempool_hist.get(addr,[]):
155             hist.append((txid, 0))
156
157         hist = map(lambda x: {'tx_hash':x[0], 'height':x[1]}, hist)
158         with self.cache_lock: self.history_cache[addr] = hist
159         return hist
160
161
162     def get_status(self, addr, cache_only=False):
163         tx_points = self.get_history(addr, cache_only)
164         if cache_only and tx_points == -1: return -1
165
166         if not tx_points: return None
167         status = ''
168         for tx in tx_points:
169             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
170         return hashlib.sha256( status ).digest().encode('hex')
171
172
173     def get_merkle(self, target_hash, height):
174
175         block_hash = self.bitcoind('getblockhash', [height])
176         b = self.bitcoind('getblock', [block_hash])
177         merkle = b.get('tx')
178
179         s = []
180         while len(merkle) != 1:
181             if len(merkle)%2: merkle.append( merkle[-1] )
182             n = []
183             while merkle:
184                 new_hash = Hash( merkle[0] + merkle[1] )
185                 if merkle[0] == target_hash:
186                     s.append( merkle[1])
187                     target_hash = new_hash
188                 elif merkle[1] == target_hash:
189                     s.append( merkle[0])
190                     target_hash = new_hash
191                 n.append( new_hash )
192                 merkle = merkle[2:]
193             merkle = n
194
195         return {"block_height":height, "merkle":s, "pos":tx_pos}
196
197         
198     def add_to_batch(self, addr, tx_hash, tx_pos, tx_height):
199
200         # we do it chronologically, so nothing wrong can happen...
201         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
202         self.batch_list[addr] += s
203
204         # backlink
205         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
206         self.batch_txio[txo] = addr
207
208
209     def remove_from_batch(self, tx_hash, tx_pos):
210                     
211         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
212         try:
213             addr = self.batch_txio[txi]
214         except:
215             #raise BaseException(tx_hash, tx_pos)
216             print "WARNING: cannot find address for", (tx_hash, tx_pos)
217             return
218
219         serialized_hist = self.batch_list[addr]
220
221         l = len(serialized_hist)/40
222         for i in range(l):
223             if serialized_hist[40*i:40*i+36] == txi:
224                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
225                 break
226         else:
227             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
228         self.batch_list[addr] = serialized_hist
229
230
231     def deserialize_block(self, block):
232         txlist = block.get('tx')
233         tx_hashes = []  # ordered txids
234         txdict = {}     # deserialized tx
235         is_coinbase = True
236         for raw_tx in txlist:
237             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
238             tx_hashes.append(tx_hash)
239             vds = deserialize.BCDataStream()
240             vds.write(raw_tx.decode('hex'))
241             tx = deserialize.parse_Transaction(vds, is_coinbase)
242             txdict[tx_hash] = tx
243             is_coinbase = False
244         return tx_hashes, txdict
245
246
247     def import_block(self, block, block_hash, block_height, sync, revert=False):
248
249         self.batch_list = {}  # address -> history
250         self.batch_txio = {}  # transaction i/o -> address
251
252         inputs_to_read = []
253         addr_to_read = []
254
255         # deserialize transactions
256         t0 = time.time()
257         tx_hashes, txdict = self.deserialize_block(block)
258
259         # read addresses of tx inputs
260         t00 = time.time()
261         for tx in txdict.values():
262             for x in tx.get('inputs'):
263                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
264                 inputs_to_read.append(txi)
265
266         inputs_to_read.sort()
267         for txi in inputs_to_read:
268             try:
269                 addr = self.db.Get(txi)    
270             except:
271                 # the input could come from the same block
272                 continue
273             self.batch_txio[txi] = addr
274             addr_to_read.append(addr)
275
276         # read histories of addresses
277         for txid, tx in txdict.items():
278             for x in tx.get('outputs'):
279                 addr_to_read.append(x.get('address'))
280
281         addr_to_read.sort()
282         for addr in addr_to_read:
283             try:
284                 self.batch_list[addr] = self.db.Get(addr)
285             except: 
286                 self.batch_list[addr] = ''
287               
288         # process
289         t1 = time.time()
290
291         for txid in tx_hashes: # must be ordered
292             tx = txdict[txid]
293             if not revert:
294                 for x in tx.get('inputs'):
295                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
296                 for x in tx.get('outputs'):
297                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
298             else:
299                 for x in tx.get('outputs'):
300                     self.remove_from_batch( x.get('prevout_hash'), x.get('prevout_n'))
301                 for x in tx.get('inputs'):
302                     self.add_to_batch( x.get('address'), txid, x.get('index'), block_height)
303
304         # write
305         max_len = 0
306         max_addr = ''
307         t2 = time.time()
308
309         batch = leveldb.WriteBatch()
310         for addr, serialized_hist in self.batch_list.items():
311             batch.Put(addr, serialized_hist)
312             l = len(serialized_hist)
313             if l > max_len:
314                 max_len = l
315                 max_addr = addr
316
317         for txio, addr in self.batch_txio.items():
318             batch.Put(txio, addr)
319         # delete spent inputs
320         for txi in inputs_to_read:
321             batch.Delete(txi)
322         batch.Put('0', self.serialize( [(block_hash, block_height, 0)] ) )
323
324         # actual write
325         self.db.Write(batch, sync = sync)
326
327         t3 = time.time()
328         if t3 - t0 > 10: 
329             print_log("block", block_height, 
330                       "parse:%0.2f "%(t00 - t0), 
331                       "read:%0.2f "%(t1 - t00), 
332                       "proc:%.2f "%(t2-t1), 
333                       "write:%.2f "%(t3-t2), 
334                       "max:", max_len, max_addr)
335
336         # invalidate cache
337         for addr in self.batch_list.keys(): self.update_history_cache(addr)
338
339
340
341     def add_request(self, request):
342         # see if we can get if from cache. if not, add to queue
343         if self.process( request, cache_only = True) == -1:
344             self.queue.put(request)
345
346
347
348     def process(self, request, cache_only = False):
349         #print "abe process", request
350
351         message_id = request['id']
352         method = request['method']
353         params = request.get('params',[])
354         result = None
355         error = None
356
357         if method == 'blockchain.numblocks.subscribe':
358             result = self.height
359
360         elif method == 'blockchain.headers.subscribe':
361             result = self.header
362
363         elif method == 'blockchain.address.subscribe':
364             try:
365                 address = params[0]
366                 result = self.get_status(address, cache_only)
367                 self.watch_address(address)
368             except BaseException, e:
369                 error = str(e) + ': ' + address
370                 print_log( "error:", error )
371
372         elif method == 'blockchain.address.subscribe2':
373             try:
374                 address = params[0]
375                 result = self.get_status2(address, cache_only)
376                 self.watch_address(address)
377             except BaseException, e:
378                 error = str(e) + ': ' + address
379                 print_log( "error:", error )
380
381         elif method == 'blockchain.address.get_history':
382             try:
383                 address = params[0]
384                 result = self.get_history( address, cache_only )
385             except BaseException, e:
386                 error = str(e) + ': ' + address
387                 print_log( "error:", error )
388
389         elif method == 'blockchain.block.get_header':
390             if cache_only: 
391                 result = -1
392             else:
393                 try:
394                     height = params[0]
395                     result = self.get_header( height ) 
396                 except BaseException, e:
397                     error = str(e) + ': %d'% height
398                     print_log( "error:", error )
399                     
400         elif method == 'blockchain.block.get_chunk':
401             if cache_only:
402                 result = -1
403             else:
404                 try:
405                     index = params[0]
406                     result = self.get_chunk( index ) 
407                 except BaseException, e:
408                     error = str(e) + ': %d'% index
409                     print_log( "error:", error)
410
411         elif method == 'blockchain.transaction.broadcast':
412             txo = self.bitcoind('sendrawtransaction', params[0])
413             print_log( "sent tx:", txo )
414             result = txo 
415
416         elif method == 'blockchain.transaction.get_merkle':
417             if cache_only:
418                 result = -1
419             else:
420                 try:
421                     tx_hash = params[0]
422                     tx_height = params[1]
423                     result = self.get_merkle(tx_hash, tx_height) 
424                 except BaseException, e:
425                     error = str(e) + ': ' + tx_hash
426                     print_log( "error:", error )
427                     
428         elif method == 'blockchain.transaction.get':
429             try:
430                 tx_hash = params[0]
431                 height = params[1]
432                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) 
433             except BaseException, e:
434                 error = str(e) + ': ' + tx_hash
435                 print_log( "error:", error )
436
437         else:
438             error = "unknown method:%s"%method
439
440         if cache_only and result == -1: return -1
441
442         if error:
443             response = { 'id':message_id, 'error':error }
444             self.push_response(response)
445         elif result != '':
446             response = { 'id':message_id, 'result':result }
447             self.push_response(response)
448
449
450     def watch_address(self, addr):
451         if addr not in self.watched_addresses:
452             self.watched_addresses.append(addr)
453
454
455
456     def last_hash(self):
457         return self.block_hashes[-1]
458
459
460     def catch_up(self, sync = True):
461         t1 = time.time()
462
463         while not self.shared.stopped():
464
465             # are we done yet?
466             info = self.bitcoind('getinfo')
467             bitcoind_height = info.get('blocks')
468             bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
469             if self.last_hash() == bitcoind_block_hash: 
470                 self.up_to_date = True
471                 break
472
473             # not done..
474             self.up_to_date = False
475             block_hash = self.bitcoind('getblockhash', [self.height+1])
476             block = self.bitcoind('getblock', [block_hash, 1])
477
478             if block.get('previousblockhash') == self.last_hash():
479
480                 self.import_block(block, block_hash, self.height+1, sync)
481
482                 if (self.height+1)%100 == 0 and not sync: 
483                     t2 = time.time()
484                     print_log( "catch_up: block %d (%.3fs)"%( self.height+1, t2 - t1 ) )
485                     t1 = t2
486
487                 self.height = self.height + 1
488                 self.block_hashes.append(block_hash)
489                 self.block_hashes = self.block_hashes[-10:]
490                     
491             else:
492                 # revert current block
493                 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
494                 block_hash = self.last_hash()
495                 block = self.bitcoind('getblock', [block_hash, 1])
496                 self.height = self.height -1
497                 self.block_hashes.remove(block_hash)
498                 self.import_block(block, self.last_hash(), self.height, revert=True)
499         
500
501         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
502
503         
504
505             
506     def memorypool_update(self):
507
508         mempool_hashes = self.bitcoind('getrawmempool')
509
510         for tx_hash in mempool_hashes:
511             if tx_hash in self.known_mempool_hashes: continue
512             self.known_mempool_hashes.append(tx_hash)
513
514             tx = self.get_transaction(tx_hash)
515             if not tx: continue
516
517             for x in tx.get('inputs') + tx.get('outputs'):
518                 addr = x.get('address')
519                 hist = self.mempool_hist.get(addr, [])
520                 if tx_hash not in hist: 
521                     hist.append( tx_hash )
522                     self.mempool_hist[addr] = hist
523                     self.update_history_cache(addr)
524
525         self.known_mempool_hashes = mempool_hashes
526
527
528     def update_history_cache(self, address):
529         with self.cache_lock:
530             if self.history_cache.has_key(address):
531                 print_log( "cache: invalidating", address )
532                 self.history_cache.pop(address)
533
534
535
536     def main_iteration(self):
537
538         if self.shared.stopped(): 
539             print_log( "blockchain processor terminating" )
540             return
541
542         with self.dblock:
543             t1 = time.time()
544             self.catch_up()
545             t2 = time.time()
546             print_log( "blockchain: %d (%.3fs)"%( self.height+1, t2 - t1 ) )
547         self.memorypool_update()
548
549         if self.sent_height != self.height:
550             self.sent_height = self.height
551             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
552
553         if self.sent_header != self.header:
554             self.sent_header = self.header
555             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
556
557         while True:
558             try:
559                 addr = self.address_queue.get(False)
560             except:
561                 break
562             if addr in self.watched_addresses:
563                 status = self.get_status( addr )
564                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
565
566
567         if not self.shared.stopped(): 
568             threading.Timer(10, self.main_iteration).start()
569         else:
570             print_log( "blockchain processor terminating" )
571
572
573
574