fix for blockchain reorgs
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 from json import dumps, loads
2 import leveldb, urllib
3 import deserialize
4 import ast, time, threading, hashlib
5 from Queue import Queue
6 import traceback, sys, os
7
8
9
10 Hash = lambda x: hashlib.sha256(hashlib.sha256(x).digest()).digest()
11 hash_encode = lambda x: x[::-1].encode('hex')
12 hash_decode = lambda x: x.decode('hex')[::-1]
13
14
15
16 def rev_hex(s):
17     return s.decode('hex')[::-1].encode('hex')
18
19
20 def int_to_hex(i, length=1):
21     s = hex(i)[2:].rstrip('L')
22     s = "0"*(2*length - len(s)) + s
23     return rev_hex(s)
24
25 def header_to_string(res):
26     pbh = res.get('prev_block_hash')
27     if pbh is None: pbh = '0'*64
28     s = int_to_hex(res.get('version'),4) \
29         + rev_hex(pbh) \
30         + rev_hex(res.get('merkle_root')) \
31         + int_to_hex(int(res.get('timestamp')),4) \
32         + int_to_hex(int(res.get('bits')),4) \
33         + int_to_hex(int(res.get('nonce')),4)
34     return s
35
36 def header_from_string( s):
37     hex_to_int = lambda s: eval('0x' + s[::-1].encode('hex'))
38     h = {}
39     h['version'] = hex_to_int(s[0:4])
40     h['prev_block_hash'] = hash_encode(s[4:36])
41     h['merkle_root'] = hash_encode(s[36:68])
42     h['timestamp'] = hex_to_int(s[68:72])
43     h['bits'] = hex_to_int(s[72:76])
44     h['nonce'] = hex_to_int(s[76:80])
45     return h
46
47
48
49
50 from processor import Processor, print_log
51
52 class BlockchainProcessor(Processor):
53
54     def __init__(self, config, shared):
55         Processor.__init__(self)
56
57         self.shared = shared
58         self.up_to_date = False
59         self.watched_addresses = []
60         self.history_cache = {}
61         self.chunk_cache = {}
62         self.cache_lock = threading.Lock()
63         self.headers_data = ''
64
65         self.mempool_addresses = {}
66         self.mempool_hist = {}
67         self.mempool_hashes = []
68         self.mempool_lock = threading.Lock()
69
70         self.address_queue = Queue()
71         self.dbpath = config.get('leveldb', 'path')
72
73         self.dblock = threading.Lock()
74         try:
75             self.db = leveldb.LevelDB(self.dbpath)
76         except:
77             traceback.print_exc(file=sys.stdout)
78             self.shared.stop()
79
80         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
81             config.get('bitcoind','user'),
82             config.get('bitcoind','password'),
83             config.get('bitcoind','host'),
84             config.get('bitcoind','port'))
85
86         self.height = 0
87         self.sent_height = 0
88         self.sent_header = None
89
90
91         try:
92             hist = self.deserialize(self.db.Get('height'))
93             self.last_hash, self.height, _ = hist[0] 
94             print_log( "hist", hist )
95         except:
96             #traceback.print_exc(file=sys.stdout)
97             print_log('initializing database')
98             self.height = 0
99             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
100
101         # catch_up headers
102         self.init_headers(self.height)
103
104         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
105         while not shared.stopped() and not self.up_to_date:
106             try:
107                 time.sleep(1)
108             except:
109                 print "keyboard interrupt: stopping threads"
110                 shared.stop()
111                 sys.exit(0)
112
113         print_log( "blockchain is up to date." )
114
115         threading.Timer(10, self.main_iteration).start()
116
117
118
119     def bitcoind(self, method, params=[]):
120         postdata = dumps({"method": method, 'params': params, 'id':'jsonrpc'})
121         respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
122         r = loads(respdata)
123         if r['error'] != None:
124             raise BaseException(r['error'])
125         return r.get('result')
126     
127
128     def serialize(self, h):
129         s = ''
130         for txid, txpos, height in h:
131             s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
132         return s.decode('hex')
133
134
135     def deserialize(self, s):
136         h = []
137         while s:
138             txid = s[0:32].encode('hex')
139             txpos = int( rev_hex( s[32:36].encode('hex') ), 16 )
140             height = int( rev_hex( s[36:40].encode('hex') ), 16 )
141             h.append( ( txid, txpos, height ) )
142             s = s[40:]
143         return h
144
145
146     def block2header(self, b):
147         return {"block_height":b.get('height'), "version":b.get('version'), "prev_block_hash":b.get('previousblockhash'), 
148                 "merkle_root":b.get('merkleroot'), "timestamp":b.get('time'), "bits":int(b.get('bits'),16), "nonce":b.get('nonce')}
149
150
151     def get_header(self, height):
152         block_hash = self.bitcoind('getblockhash', [height])
153         b = self.bitcoind('getblock', [block_hash])
154         return self.block2header(b)
155     
156
157     def init_headers(self, db_height):
158         self.chunk_cache = {}
159         self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
160
161         if os.path.exists(self.headers_filename):
162             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
163             if height > 0:
164                 prev_hash = self.hash_header(self.read_header(height))
165             else:
166                 prev_hash = None
167         else:
168             open(self.headers_filename,'wb').close()
169             prev_hash = None
170             height = -1
171
172         if height < db_height:
173             print_log( "catching up missing headers:", height, db_height)
174
175         try:
176             while height != db_height:
177                 height = height + 1
178                 header = self.get_header(height)
179                 if height>1: 
180                     assert prev_hash == header.get('prev_block_hash')
181                 self.write_header(header, sync=False)
182                 prev_hash = self.hash_header(header)
183                 if height%1000==0: print_log("headers file:",height)
184         except KeyboardInterrupt:
185             self.flush_headers()
186             sys.exit()
187
188         self.flush_headers()
189
190
191     def hash_header(self, header):
192         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
193
194
195     def read_header(self, block_height):
196         if os.path.exists(self.headers_filename):
197             f = open(self.headers_filename,'rb')
198             f.seek(block_height*80)
199             h = f.read(80)
200             f.close()
201             if len(h) == 80:
202                 h = header_from_string(h)
203                 return h
204
205
206     def read_chunk(self, index):
207         f = open(self.headers_filename,'rb')
208         f.seek(index*2016*80)
209         chunk = f.read(2016*80)
210         f.close()
211         return chunk.encode('hex')
212
213
214     def write_header(self, header, sync=True):
215         if not self.headers_data:
216             self.headers_offset = header.get('block_height')
217
218         self.headers_data += header_to_string(header).decode('hex')
219         if sync or len(self.headers_data) > 40*100:
220             self.flush_headers()
221
222     def pop_header(self):
223         # we need to do this only if we have not flushed
224         if self.headers_data:
225             self.headers_data = self.headers_data[:-40]
226
227     def flush_headers(self):
228         if not self.headers_data: return
229         f = open(self.headers_filename,'rb+')
230         f.seek(self.headers_offset*80)
231         f.write(self.headers_data)
232         f.close()
233         self.headers_data = ''
234
235
236     def get_chunk(self, i):
237         # store them on disk; store the current chunk in memory
238         chunk = self.chunk_cache.get(i)
239         if not chunk:
240             chunk = self.read_chunk(i)
241             self.chunk_cache[i] = chunk
242         return chunk
243
244
245     def get_transaction(self, txid, block_height=-1, is_coinbase = False):
246         raw_tx = self.bitcoind('getrawtransaction', [txid, 0, block_height])
247         vds = deserialize.BCDataStream()
248         vds.write(raw_tx.decode('hex'))
249         out = deserialize.parse_Transaction(vds, is_coinbase)
250         return out
251
252
253     def get_history(self, addr, cache_only=False):
254         with self.cache_lock: hist = self.history_cache.get( addr )
255         if hist is not None: return hist
256         if cache_only: return -1
257
258         with self.dblock:
259             try:
260                 hist = self.deserialize(self.db.Get(addr))
261                 is_known = True
262             except: 
263                 hist = []
264                 is_known = False
265
266         # should not be necessary
267         hist.sort( key=lambda tup: tup[1])
268         # check uniqueness too...
269
270         # add memory pool
271         with self.mempool_lock:
272             for txid in self.mempool_hist.get(addr,[]):
273                 hist.append((txid, 0, 0))
274
275         hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
276         # add something to distinguish between unused and empty addresses
277         if hist == [] and is_known: hist = ['*']
278
279         with self.cache_lock: self.history_cache[addr] = hist
280         return hist
281
282
283     def get_status(self, addr, cache_only=False):
284         tx_points = self.get_history(addr, cache_only)
285         if cache_only and tx_points == -1: return -1
286
287         if not tx_points: return None
288         if tx_points == ['*']: return '*'
289         status = ''
290         for tx in tx_points:
291             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
292         return hashlib.sha256( status ).digest().encode('hex')
293
294
295     def get_merkle(self, tx_hash, height):
296
297         block_hash = self.bitcoind('getblockhash', [height])
298         b = self.bitcoind('getblock', [block_hash])
299         tx_list = b.get('tx')
300         tx_pos = tx_list.index(tx_hash)
301         
302         merkle = map(hash_decode, tx_list)
303         target_hash = hash_decode(tx_hash)
304         s = []
305         while len(merkle) != 1:
306             if len(merkle)%2: merkle.append( merkle[-1] )
307             n = []
308             while merkle:
309                 new_hash = Hash( merkle[0] + merkle[1] )
310                 if merkle[0] == target_hash:
311                     s.append( hash_encode( merkle[1]))
312                     target_hash = new_hash
313                 elif merkle[1] == target_hash:
314                     s.append( hash_encode( merkle[0]))
315                     target_hash = new_hash
316                 n.append( new_hash )
317                 merkle = merkle[2:]
318             merkle = n
319
320         return {"block_height":height, "merkle":s, "pos":tx_pos}
321
322         
323
324
325     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
326
327         # keep it sorted
328         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
329
330         serialized_hist = self.batch_list[addr] 
331
332         l = len(serialized_hist)/40
333         for i in range(l-1, -1, -1):
334             item = serialized_hist[40*i:40*(i+1)]
335             item_height = int( rev_hex( item[36:40].encode('hex') ), 16 )
336             if item_height < tx_height:
337                 serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
338                 break
339         else:
340             serialized_hist = s + serialized_hist
341
342         self.batch_list[addr] = serialized_hist
343
344         # backlink
345         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
346         self.batch_txio[txo] = addr
347
348
349     def remove_from_history(self, tx_hash, tx_pos):
350                     
351         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
352         try:
353             addr = self.batch_txio[txi]
354         except:
355             raise BaseException(tx_hash, tx_pos)
356             print "WARNING: cannot find address for", (tx_hash, tx_pos)
357             return
358
359         serialized_hist = self.batch_list[addr]
360
361         l = len(serialized_hist)/40
362         for i in range(l):
363             if serialized_hist[40*i:40*i+36] == txi:
364                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
365                 break
366         else:
367             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
368         self.batch_list[addr] = serialized_hist
369
370
371     def deserialize_block(self, block):
372         txlist = block.get('tx')
373         tx_hashes = []  # ordered txids
374         txdict = {}     # deserialized tx
375         is_coinbase = True
376         for raw_tx in txlist:
377             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
378             tx_hashes.append(tx_hash)
379             vds = deserialize.BCDataStream()
380             vds.write(raw_tx.decode('hex'))
381             tx = deserialize.parse_Transaction(vds, is_coinbase)
382             txdict[tx_hash] = tx
383             is_coinbase = False
384         return tx_hashes, txdict
385
386
387     def import_block(self, block, block_hash, block_height, sync, revert=False):
388
389         self.batch_list = {}  # address -> history
390         self.batch_txio = {}  # transaction i/o -> address
391
392         inputs_to_read = []
393         addr_to_read = []
394
395         # deserialize transactions
396         t0 = time.time()
397         tx_hashes, txdict = self.deserialize_block(block)
398
399         t00 = time.time()
400
401         if revert:
402             # read addresses of tx outputs
403             for tx_hash, tx in txdict.items():
404                 for x in tx.get('outputs'):
405                     txo = (tx_hash + int_to_hex(x.get('index'), 4)).decode('hex')
406                 self.batch_txio[txo] = x.get('address')
407         else:
408             # read addresses of tx inputs
409             for tx in txdict.values():
410                 for x in tx.get('inputs'):
411                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
412                     inputs_to_read.append(txi)
413
414             inputs_to_read.sort()
415             for txi in inputs_to_read:
416                 try:
417                     addr = self.db.Get(txi)
418                 except:
419                     # the input could come from the same block
420                     continue
421                 self.batch_txio[txi] = addr
422                 addr_to_read.append(addr)
423
424
425         # read histories of addresses
426         for txid, tx in txdict.items():
427             for x in tx.get('outputs'):
428                 addr_to_read.append(x.get('address'))
429
430         addr_to_read.sort()
431         for addr in addr_to_read:
432             try:
433                 self.batch_list[addr] = self.db.Get(addr)
434             except: 
435                 self.batch_list[addr] = ''
436               
437         # process
438         t1 = time.time()
439
440         for txid in tx_hashes: # must be ordered
441             tx = txdict[txid]
442             if not revert:
443                 for x in tx.get('inputs'):
444                     self.remove_from_history( x.get('prevout_hash'), x.get('prevout_n'))
445                 for x in tx.get('outputs'):
446                     self.add_to_history( x.get('address'), txid, x.get('index'), block_height)
447             else:
448                 for x in tx.get('outputs'):
449                     self.remove_from_history( txid, x.get('index'))
450
451                 for x in tx.get('inputs'):
452                     prevout_height = self.db.Get(x['prevout_hash'].decode('hex'))
453                     try:
454                         # note: this will fail if the block containing txi is part of the reorg and has been orphaned by bitcoind
455                         txi = self.get_transaction(x.get('prevout_hash'), prevout_height ) 
456                     except:
457                         # so, if if it fails, we need to read the block containing txi
458                         prevout_block_hash = self.db.Get('%d'%prevout_height)
459                         prevout_block = self.bitcoind('getblock', [prevout_block_hash, 1])
460                         for txc in prevout_block['tx']:
461                             if hash_encode(Hash(txc)) == prevout_hash: 
462                                 raw_txi = txc
463                                 break
464                         else: 
465                             raise BaseException('txi not found')
466
467                         vds = deserialize.BCDataStream()
468                         vds.write(raw_txi.decode('hex'))
469                         txi = deserialize.parse_Transaction(vds, False)
470
471                     print "txi", txi
472                     output = txi.get('outputs')[x.get('prevout_n')]
473                     prevout_addr = output['address']
474                     self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
475                     # no longer chronological..
476                     self.add_to_history( prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
477                     print "new hist", self.deserialize(self.batch_list[prevout_addr])
478
479         # write
480         max_len = 0
481         max_addr = ''
482         t2 = time.time()
483
484         batch = leveldb.WriteBatch()
485         for addr, serialized_hist in self.batch_list.items():
486             batch.Put(addr, serialized_hist)
487             l = len(serialized_hist)
488             if l > max_len:
489                 max_len = l
490                 max_addr = addr
491
492         for txio, addr in self.batch_txio.items():
493             batch.Put(txio, addr)
494         # delete spent inputs
495         for txi in inputs_to_read:
496             batch.Delete(txi)
497
498         # add tx -> height
499         for txid in tx_hashes:
500             batch.Put(txid.decode('hex'), "%d"%block_height)
501         # add height -> block_hash
502         batch.Put("%d"%block_height, block_hash)
503         # add the max
504         batch.Put('height', self.serialize( [(block_hash, block_height, 0)] ) )
505
506         # actual write
507         self.db.Write(batch, sync = sync)
508
509         t3 = time.time()
510         if t3 - t0 > 10 and not sync: 
511             print_log("block", block_height, 
512                       "parse:%0.2f "%(t00 - t0), 
513                       "read:%0.2f "%(t1 - t00), 
514                       "proc:%.2f "%(t2-t1), 
515                       "write:%.2f "%(t3-t2), 
516                       "max:", max_len, max_addr)
517
518         for addr in self.batch_list.keys(): self.invalidate_cache(addr)
519
520
521
522     def add_request(self, request):
523         # see if we can get if from cache. if not, add to queue
524         if self.process( request, cache_only = True) == -1:
525             self.queue.put(request)
526
527
528
529     def process(self, request, cache_only = False):
530         #print "abe process", request
531
532         message_id = request['id']
533         method = request['method']
534         params = request.get('params',[])
535         result = None
536         error = None
537
538         if method == 'blockchain.numblocks.subscribe':
539             result = self.height
540
541         elif method == 'blockchain.headers.subscribe':
542             result = self.header
543
544         elif method == 'blockchain.address.subscribe':
545             try:
546                 address = params[0]
547                 result = self.get_status(address, cache_only)
548                 self.watch_address(address)
549             except BaseException, e:
550                 error = str(e) + ': ' + address
551                 print_log( "error:", error )
552
553         elif method == 'blockchain.address.subscribe2':
554             try:
555                 address = params[0]
556                 result = self.get_status(address, cache_only)
557                 self.watch_address(address)
558             except BaseException, e:
559                 error = str(e) + ': ' + address
560                 print_log( "error:", error )
561
562         elif method == 'blockchain.address.get_history2':
563             try:
564                 address = params[0]
565                 result = self.get_history( address, cache_only )
566             except BaseException, e:
567                 error = str(e) + ': ' + address
568                 print_log( "error:", error )
569
570         elif method == 'blockchain.block.get_header':
571             if cache_only: 
572                 result = -1
573             else:
574                 try:
575                     height = params[0]
576                     result = self.get_header( height ) 
577                 except BaseException, e:
578                     error = str(e) + ': %d'% height
579                     print_log( "error:", error )
580                     
581         elif method == 'blockchain.block.get_chunk':
582             if cache_only:
583                 result = -1
584             else:
585                 try:
586                     index = params[0]
587                     result = self.get_chunk( index ) 
588                 except BaseException, e:
589                     error = str(e) + ': %d'% index
590                     print_log( "error:", error)
591
592         elif method == 'blockchain.transaction.broadcast':
593             txo = self.bitcoind('sendrawtransaction', params)
594             print_log( "sent tx:", txo )
595             result = txo 
596
597         elif method == 'blockchain.transaction.get_merkle':
598             if cache_only:
599                 result = -1
600             else:
601                 try:
602                     tx_hash = params[0]
603                     tx_height = params[1]
604                     result = self.get_merkle(tx_hash, tx_height) 
605                 except BaseException, e:
606                     error = str(e) + ': ' + tx_hash
607                     print_log( "error:", error )
608                     
609         elif method == 'blockchain.transaction.get':
610             try:
611                 tx_hash = params[0]
612                 height = params[1]
613                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height] ) 
614             except BaseException, e:
615                 error = str(e) + ': ' + tx_hash
616                 print_log( "error:", error )
617
618         else:
619             error = "unknown method:%s"%method
620
621         if cache_only and result == -1: return -1
622
623         if error:
624             response = { 'id':message_id, 'error':error }
625             self.push_response(response)
626         elif result != '':
627             response = { 'id':message_id, 'result':result }
628             self.push_response(response)
629
630
631     def watch_address(self, addr):
632         if addr not in self.watched_addresses:
633             self.watched_addresses.append(addr)
634
635
636
637     def catch_up(self, sync = True):
638         #   a reorg in bitcoind id not synchronous with my database
639         #        
640         #                     -------> F ------> G -------> H
641         #                    /
642         #                   /
643         #        A ------> B --------> C ------> E
644         #        
645         #        we always compare the hash in the headers file to the hash returned by bitcoind
646
647
648         t1 = time.time()
649
650         while not self.shared.stopped():
651
652             # are we done yet?
653             info = self.bitcoind('getinfo')
654             bitcoind_height = info.get('blocks')
655             bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
656             if self.last_hash == bitcoind_block_hash: 
657                 self.up_to_date = True
658                 break
659
660             # not done..
661             self.up_to_date = False
662             next_block_hash = self.bitcoind('getblockhash', [self.height+1])
663             next_block = self.bitcoind('getblock', [next_block_hash, 1])
664
665             if next_block.get('previousblockhash') == self.last_hash:
666
667                 self.import_block(next_block, next_block_hash, self.height+1, sync)
668                 self.height = self.height + 1
669                 self.write_header(self.block2header(next_block), sync)
670                 self.last_hash = next_block_hash
671
672                 if (self.height)%100 == 0 and not sync: 
673                     t2 = time.time()
674                     print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
675                     t1 = t2
676                     
677             else:
678                 # revert current block
679                 block = self.bitcoind('getblock', [self.last_hash, 1])
680                 print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash )
681                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
682                 self.pop_header()
683
684                 self.height = self.height -1
685
686                 # read previous header from disk
687                 self.header = self.read_header(self.height) 
688                 self.last_hash = self.hash_header(self.header)
689         
690
691         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
692
693
694
695             
696     def memorypool_update(self):
697
698         mempool_hashes = self.bitcoind('getrawmempool')
699
700         for tx_hash in mempool_hashes:
701             if tx_hash in self.mempool_hashes: continue
702
703             tx = self.get_transaction(tx_hash)
704             if not tx: continue
705
706             for x in tx.get('inputs'):
707                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
708                 try:
709                     addr = self.db.Get(txi)    
710                 except:
711                     continue
712                 l = self.mempool_addresses.get(tx_hash, [])
713                 if addr not in l: 
714                     l.append( addr )
715                     self.mempool_addresses[tx_hash] = l
716
717             for x in tx.get('outputs'):
718                 addr = x.get('address')
719                 l = self.mempool_addresses.get(tx_hash, [])
720                 if addr not in l: 
721                     l.append( addr )
722                     self.mempool_addresses[tx_hash] = l
723
724             self.mempool_hashes.append(tx_hash)
725
726         # remove older entries from mempool_hashes
727         self.mempool_hashes = mempool_hashes
728
729         # remove deprecated entries from mempool_addresses
730         for tx_hash, addresses in self.mempool_addresses.items():
731             if tx_hash not in self.mempool_hashes:
732                 self.mempool_addresses.pop(tx_hash)
733
734         # rebuild histories
735         new_mempool_hist = {}
736         for tx_hash, addresses in self.mempool_addresses.items():
737             for addr in addresses:
738                 h = new_mempool_hist.get(addr, [])
739                 if tx_hash not in h: 
740                     h.append( tx_hash )
741                 new_mempool_hist[addr] = h
742
743         for addr in new_mempool_hist.keys():
744             if addr in self.mempool_hist.keys():
745                 if self.mempool_hist[addr] != new_mempool_hist[addr]: 
746                     self.invalidate_cache(addr)
747             else:
748                 self.invalidate_cache(addr)
749
750         with self.mempool_lock:
751             self.mempool_hist = new_mempool_hist
752
753
754
755     def invalidate_cache(self, address):
756         with self.cache_lock:
757             if self.history_cache.has_key(address):
758                 print_log( "cache: invalidating", address )
759                 self.history_cache.pop(address)
760
761         if address in self.watched_addresses:
762             self.address_queue.put(address)
763
764
765
766     def main_iteration(self):
767
768         if self.shared.stopped(): 
769             print_log( "blockchain processor terminating" )
770             return
771
772         with self.dblock:
773             t1 = time.time()
774             self.catch_up()
775             t2 = time.time()
776
777         self.memorypool_update()
778         t3 = time.time()
779         # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
780
781
782         if self.sent_height != self.height:
783             self.sent_height = self.height
784             self.push_response({ 'id': None, 'method':'blockchain.numblocks.subscribe', 'params':[self.height] })
785
786         if self.sent_header != self.header:
787             print_log( "blockchain: %d (%.3fs)"%( self.height, t2 - t1 ) )
788             self.sent_header = self.header
789             self.push_response({ 'id': None, 'method':'blockchain.headers.subscribe', 'params':[self.header] })
790
791         while True:
792             try:
793                 addr = self.address_queue.get(False)
794             except:
795                 break
796             if addr in self.watched_addresses:
797                 status = self.get_status( addr )
798                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
799                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
800
801
802         if not self.shared.stopped(): 
803             threading.Timer(10, self.main_iteration).start()
804         else:
805             print_log( "blockchain processor terminating" )
806
807
808
809