a1f382268dc5c1edfef64430372b1d58497a337f
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_values = {}
42         self.mempool_addresses = {}
43         self.mempool_hist = {}
44         self.mempool_hashes = set([])
45         self.mempool_lock = threading.Lock()
46
47         self.address_queue = Queue()
48
49         try:
50             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
51         except:
52             self.test_reorgs = False
53         self.storage = Storage(config, shared, self.test_reorgs)
54
55         self.dblock = threading.Lock()
56
57         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
58             config.get('bitcoind', 'user'),
59             config.get('bitcoind', 'password'),
60             config.get('bitcoind', 'host'),
61             config.get('bitcoind', 'port'))
62
63         while True:
64             try:
65                 self.bitcoind('getinfo')
66                 break
67             except:
68                 print_log('cannot contact bitcoind...')
69                 time.sleep(5)
70                 continue
71
72         self.sent_height = 0
73         self.sent_header = None
74
75         # catch_up headers
76         self.init_headers(self.storage.height)
77
78         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
79         while not shared.stopped() and not self.up_to_date:
80             try:
81                 time.sleep(1)
82             except:
83                 print "keyboard interrupt: stopping threads"
84                 shared.stop()
85                 sys.exit(0)
86
87         print_log("Blockchain is up to date.")
88         self.memorypool_update()
89         print_log("Memory pool initialized.")
90
91         self.timer = threading.Timer(10, self.main_iteration)
92         self.timer.start()
93
94
95
96     def mtime(self, name):
97         now = time.time()
98         if name != '':
99             delta = now - self.now
100             t = self.mtimes.get(name, 0)
101             self.mtimes[name] = t + delta
102         self.now = now
103
104     def print_mtime(self):
105         s = ''
106         for k, v in self.mtimes.items():
107             s += k+':'+"%.2f"%v+' '
108         print_log(s)
109
110
111     def bitcoind(self, method, params=[]):
112         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
113         try:
114             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
115         except:
116             print_log("error calling bitcoind")
117             traceback.print_exc(file=sys.stdout)
118             self.shared.stop()
119
120         r = loads(respdata)
121         if r['error'] is not None:
122             raise BaseException(r['error'])
123         return r.get('result')
124
125
126     def block2header(self, b):
127         return {
128             "block_height": b.get('height'),
129             "version": b.get('version'),
130             "prev_block_hash": b.get('previousblockhash'),
131             "merkle_root": b.get('merkleroot'),
132             "timestamp": b.get('time'),
133             "bits": int(b.get('bits'), 16),
134             "nonce": b.get('nonce'),
135         }
136
137     def get_header(self, height):
138         block_hash = self.bitcoind('getblockhash', [height])
139         b = self.bitcoind('getblock', [block_hash])
140         return self.block2header(b)
141
142     def init_headers(self, db_height):
143         self.chunk_cache = {}
144         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
145
146         if os.path.exists(self.headers_filename):
147             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
148             if height > 0:
149                 prev_hash = self.hash_header(self.read_header(height))
150             else:
151                 prev_hash = None
152         else:
153             open(self.headers_filename, 'wb').close()
154             prev_hash = None
155             height = -1
156
157         if height < db_height:
158             print_log("catching up missing headers:", height, db_height)
159
160         try:
161             while height < db_height:
162                 height = height + 1
163                 header = self.get_header(height)
164                 if height > 1:
165                     assert prev_hash == header.get('prev_block_hash')
166                 self.write_header(header, sync=False)
167                 prev_hash = self.hash_header(header)
168                 if (height % 1000) == 0:
169                     print_log("headers file:", height)
170         except KeyboardInterrupt:
171             self.flush_headers()
172             sys.exit()
173
174         self.flush_headers()
175
176     def hash_header(self, header):
177         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
178
179     def read_header(self, block_height):
180         if os.path.exists(self.headers_filename):
181             with open(self.headers_filename, 'rb') as f:
182                 f.seek(block_height * 80)
183                 h = f.read(80)
184             if len(h) == 80:
185                 h = header_from_string(h)
186                 return h
187
188     def read_chunk(self, index):
189         with open(self.headers_filename, 'rb') as f:
190             f.seek(index*2016*80)
191             chunk = f.read(2016*80)
192         return chunk.encode('hex')
193
194     def write_header(self, header, sync=True):
195         if not self.headers_data:
196             self.headers_offset = header.get('block_height')
197
198         self.headers_data += header_to_string(header).decode('hex')
199         if sync or len(self.headers_data) > 40*100:
200             self.flush_headers()
201
202         with self.cache_lock:
203             chunk_index = header.get('block_height')/2016
204             if self.chunk_cache.get(chunk_index):
205                 self.chunk_cache.pop(chunk_index)
206
207     def pop_header(self):
208         # we need to do this only if we have not flushed
209         if self.headers_data:
210             self.headers_data = self.headers_data[:-40]
211
212     def flush_headers(self):
213         if not self.headers_data:
214             return
215         with open(self.headers_filename, 'rb+') as f:
216             f.seek(self.headers_offset*80)
217             f.write(self.headers_data)
218         self.headers_data = ''
219
220     def get_chunk(self, i):
221         # store them on disk; store the current chunk in memory
222         with self.cache_lock:
223             chunk = self.chunk_cache.get(i)
224             if not chunk:
225                 chunk = self.read_chunk(i)
226                 self.chunk_cache[i] = chunk
227
228         return chunk
229
230     def get_mempool_transaction(self, txid):
231         try:
232             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
233         except:
234             return None
235
236         vds = deserialize.BCDataStream()
237         vds.write(raw_tx.decode('hex'))
238         try:
239             return deserialize.parse_Transaction(vds, is_coinbase=False)
240         except:
241             print_log("ERROR: cannot parse", txid)
242             return None
243
244
245     def get_history(self, addr, cache_only=False):
246         with self.cache_lock:
247             hist = self.history_cache.get(addr)
248         if hist is not None:
249             return hist
250         if cache_only:
251             return -1
252
253         with self.dblock:
254             try:
255                 hist = self.storage.get_history(addr)
256                 is_known = True
257             except:
258                 print_log("error get_history")
259                 self.shared.stop()
260                 raise
261             if hist:
262                 is_known = True
263             else:
264                 hist = []
265                 is_known = False
266
267         # add memory pool
268         with self.mempool_lock:
269             for txid, delta in self.mempool_hist.get(addr, []):
270                 hist.append({'tx_hash':txid, 'height':0})
271
272         # add something to distinguish between unused and empty addresses
273         if hist == [] and is_known:
274             hist = ['*']
275
276         with self.cache_lock:
277             self.history_cache[addr] = hist
278         return hist
279
280
281     def get_unconfirmed_value(self, addr):
282         v = 0
283         with self.mempool_lock:
284             for txid, delta in self.mempool_hist.get(addr, []):
285                 v += delta
286         return v
287
288
289     def get_status(self, addr, cache_only=False):
290         tx_points = self.get_history(addr, cache_only)
291         if cache_only and tx_points == -1:
292             return -1
293
294         if not tx_points:
295             return None
296         if tx_points == ['*']:
297             return '*'
298         status = ''
299         for tx in tx_points:
300             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
301         return hashlib.sha256(status).digest().encode('hex')
302
303     def get_merkle(self, tx_hash, height):
304
305         block_hash = self.bitcoind('getblockhash', [height])
306         b = self.bitcoind('getblock', [block_hash])
307         tx_list = b.get('tx')
308         tx_pos = tx_list.index(tx_hash)
309
310         merkle = map(hash_decode, tx_list)
311         target_hash = hash_decode(tx_hash)
312         s = []
313         while len(merkle) != 1:
314             if len(merkle) % 2:
315                 merkle.append(merkle[-1])
316             n = []
317             while merkle:
318                 new_hash = Hash(merkle[0] + merkle[1])
319                 if merkle[0] == target_hash:
320                     s.append(hash_encode(merkle[1]))
321                     target_hash = new_hash
322                 elif merkle[1] == target_hash:
323                     s.append(hash_encode(merkle[0]))
324                     target_hash = new_hash
325                 n.append(new_hash)
326                 merkle = merkle[2:]
327             merkle = n
328
329         return {"block_height": height, "merkle": s, "pos": tx_pos}
330
331
332     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
333         # keep it sorted
334         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
335         assert len(s) == 80
336
337         serialized_hist = self.batch_list[addr]
338
339         l = len(serialized_hist)/80
340         for i in range(l-1, -1, -1):
341             item = serialized_hist[80*i:80*(i+1)]
342             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
343             if item_height <= tx_height:
344                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
345                 break
346         else:
347             serialized_hist = s + serialized_hist
348
349         self.batch_list[addr] = serialized_hist
350
351         # backlink
352         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
353         self.batch_txio[txo] = addr
354
355
356
357
358
359
360     def deserialize_block(self, block):
361         txlist = block.get('tx')
362         tx_hashes = []  # ordered txids
363         txdict = {}     # deserialized tx
364         is_coinbase = True
365         for raw_tx in txlist:
366             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
367             vds = deserialize.BCDataStream()
368             vds.write(raw_tx.decode('hex'))
369             try:
370                 tx = deserialize.parse_Transaction(vds, is_coinbase)
371             except:
372                 print_log("ERROR: cannot parse", tx_hash)
373                 continue
374             tx_hashes.append(tx_hash)
375             txdict[tx_hash] = tx
376             is_coinbase = False
377         return tx_hashes, txdict
378
379
380
381     def import_block(self, block, block_hash, block_height, sync, revert=False):
382
383         touched_addr = set([])
384
385         # deserialize transactions
386         tx_hashes, txdict = self.deserialize_block(block)
387
388         # undo info
389         if revert:
390             undo_info = self.storage.get_undo_info(block_height)
391             tx_hashes.reverse()
392         else:
393             undo_info = {}
394
395         for txid in tx_hashes:  # must be ordered
396             tx = txdict[txid]
397             if not revert:
398                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
399                 undo_info[txid] = undo
400             else:
401                 undo = undo_info.pop(txid)
402                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
403
404         if revert: 
405             assert undo_info == {}
406
407         # add undo info
408         if not revert:
409             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
410
411         # add the max
412         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
413
414         for addr in touched_addr:
415             self.invalidate_cache(addr)
416
417         self.storage.update_hashes()
418
419
420     def add_request(self, session, request):
421         # see if we can get if from cache. if not, add to queue
422         if self.process(session, request, cache_only=True) == -1:
423             self.queue.put((session, request))
424
425
426     def do_subscribe(self, method, params, session):
427         with self.watch_lock:
428             if method == 'blockchain.numblocks.subscribe':
429                 if session not in self.watch_blocks:
430                     self.watch_blocks.append(session)
431
432             elif method == 'blockchain.headers.subscribe':
433                 if session not in self.watch_headers:
434                     self.watch_headers.append(session)
435
436             elif method == 'blockchain.address.subscribe':
437                 address = params[0]
438                 l = self.watched_addresses.get(address)
439                 if l is None:
440                     self.watched_addresses[address] = [session]
441                 elif session not in l:
442                     l.append(session)
443
444
445     def do_unsubscribe(self, method, params, session):
446         with self.watch_lock:
447             if method == 'blockchain.numblocks.subscribe':
448                 if session in self.watch_blocks:
449                     self.watch_blocks.remove(session)
450             elif method == 'blockchain.headers.subscribe':
451                 if session in self.watch_headers:
452                     self.watch_headers.remove(session)
453             elif method == "blockchain.address.subscribe":
454                 addr = params[0]
455                 l = self.watched_addresses.get(addr)
456                 if not l:
457                     return
458                 if session in l:
459                     l.remove(session)
460                 if session in l:
461                     print_log("error rc!!")
462                     self.shared.stop()
463                 if l == []:
464                     self.watched_addresses.pop(addr)
465
466
467     def process(self, session, request, cache_only=False):
468         
469         message_id = request['id']
470         method = request['method']
471         params = request.get('params', [])
472         result = None
473         error = None
474
475         if method == 'blockchain.numblocks.subscribe':
476             result = self.storage.height
477
478         elif method == 'blockchain.headers.subscribe':
479             result = self.header
480
481         elif method == 'blockchain.address.subscribe':
482             try:
483                 address = str(params[0])
484                 result = self.get_status(address, cache_only)
485             except BaseException, e:
486                 error = str(e) + ': ' + address
487                 print_log("error:", error)
488
489         elif method == 'blockchain.address.get_history':
490             try:
491                 address = str(params[0])
492                 result = self.get_history(address, cache_only)
493             except BaseException, e:
494                 error = str(e) + ': ' + address
495                 print_log("error:", error)
496
497         elif method == 'blockchain.address.get_mempool':
498             try:
499                 address = str(params[0])
500                 result = self.get_unconfirmed_history(address, cache_only)
501             except BaseException, e:
502                 error = str(e) + ': ' + address
503                 print_log("error:", error)
504
505         elif method == 'blockchain.address.get_balance':
506             try:
507                 address = str(params[0])
508                 confirmed = self.storage.get_balance(address)
509                 unconfirmed = self.get_unconfirmed_value(address)
510                 result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
511             except BaseException, e:
512                 error = str(e) + ': ' + address
513                 print_log("error:", error)
514
515         elif method == 'blockchain.address.get_proof':
516             try:
517                 address = str(params[0])
518                 result = self.storage.get_proof(address)
519             except BaseException, e:
520                 error = str(e) + ': ' + address
521                 print_log("error:", error)
522
523         elif method == 'blockchain.address.listunspent':
524             try:
525                 address = str(params[0])
526                 result = self.storage.listunspent(address)
527             except BaseException, e:
528                 error = str(e) + ': ' + address
529                 print_log("error:", error)
530
531         elif method == 'blockchain.utxo.get_address':
532             try:
533                 txid = str(params[0])
534                 pos = int(params[1])
535                 txi = (txid + int_to_hex(pos, 4)).decode('hex')
536                 result = self.storage.get_address(txi)
537             except BaseException, e:
538                 error = str(e)
539                 print_log("error:", error, params)
540
541         elif method == 'blockchain.block.get_header':
542             if cache_only:
543                 result = -1
544             else:
545                 try:
546                     height = int(params[0])
547                     result = self.get_header(height)
548                 except BaseException, e:
549                     error = str(e) + ': %d' % height
550                     print_log("error:", error)
551
552         elif method == 'blockchain.block.get_chunk':
553             if cache_only:
554                 result = -1
555             else:
556                 try:
557                     index = int(params[0])
558                     result = self.get_chunk(index)
559                 except BaseException, e:
560                     error = str(e) + ': %d' % index
561                     print_log("error:", error)
562
563         elif method == 'blockchain.transaction.broadcast':
564             try:
565                 txo = self.bitcoind('sendrawtransaction', params)
566                 print_log("sent tx:", txo)
567                 result = txo
568             except BaseException, e:
569                 result = str(e)  # do not send an error
570                 print_log("error:", result, params)
571
572         elif method == 'blockchain.transaction.get_merkle':
573             if cache_only:
574                 result = -1
575             else:
576                 try:
577                     tx_hash = params[0]
578                     tx_height = params[1]
579                     result = self.get_merkle(tx_hash, tx_height)
580                 except BaseException, e:
581                     error = str(e) + ': ' + repr(params)
582                     print_log("get_merkle error:", error)
583
584         elif method == 'blockchain.transaction.get':
585             try:
586                 tx_hash = params[0]
587                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
588             except BaseException, e:
589                 error = str(e) + ': ' + repr(params)
590                 print_log("tx get error:", error)
591
592         else:
593             error = "unknown method:%s" % method
594
595         if cache_only and result == -1:
596             return -1
597
598         if error:
599             self.push_response(session, {'id': message_id, 'error': error})
600         elif result != '':
601             self.push_response(session, {'id': message_id, 'result': result})
602
603
604     def getfullblock(self, block_hash):
605         block = self.bitcoind('getblock', [block_hash])
606
607         rawtxreq = []
608         i = 0
609         for txid in block['tx']:
610             rawtxreq.append({
611                 "method": "getrawtransaction",
612                 "params": [txid],
613                 "id": i,
614             })
615             i += 1
616
617         postdata = dumps(rawtxreq)
618         try:
619             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
620         except:
621             print_log("bitcoind error (getfullblock)")
622             traceback.print_exc(file=sys.stdout)
623             self.shared.stop()
624
625         r = loads(respdata)
626         rawtxdata = []
627         for ir in r:
628             if ir['error'] is not None:
629                 self.shared.stop()
630                 print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
631                 raise BaseException(ir['error'])
632             rawtxdata.append(ir['result'])
633         block['tx'] = rawtxdata
634         return block
635
636     def catch_up(self, sync=True):
637
638         prev_root_hash = None
639         while not self.shared.stopped():
640
641             self.mtime('')
642
643             # are we done yet?
644             info = self.bitcoind('getinfo')
645             self.bitcoind_height = info.get('blocks')
646             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
647             if self.storage.last_hash == bitcoind_block_hash:
648                 self.up_to_date = True
649                 break
650
651             # not done..
652             self.up_to_date = False
653             next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
654             next_block = self.getfullblock(next_block_hash)
655             self.mtime('daemon')
656
657             # fixme: this is unsafe, if we revert when the undo info is not yet written
658             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
659
660             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
661
662                 prev_root_hash = self.storage.get_root_hash()
663
664                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
665                 self.storage.height = self.storage.height + 1
666                 self.write_header(self.block2header(next_block), sync)
667                 self.storage.last_hash = next_block_hash
668                 self.mtime('import')
669             
670                 if self.storage.height % 1000 == 0 and not sync:
671                     t_daemon = self.mtimes.get('daemon')
672                     t_import = self.mtimes.get('import')
673                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
674                     self.mtimes['daemon'] = 0
675                     self.mtimes['import'] = 0
676
677             else:
678
679                 # revert current block
680                 block = self.getfullblock(self.storage.last_hash)
681                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
682                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
683                 self.pop_header()
684                 self.flush_headers()
685
686                 self.storage.height -= 1
687
688                 # read previous header from disk
689                 self.header = self.read_header(self.storage.height)
690                 self.storage.last_hash = self.hash_header(self.header)
691
692                 if prev_root_hash:
693                     assert prev_root_hash == self.storage.get_root_hash()
694                     prev_root_hash = None
695
696
697         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
698         self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
699
700         if self.shared.stopped(): 
701             print_log( "closing database" )
702             self.storage.close()
703
704
705     def memorypool_update(self):
706         mempool_hashes = set(self.bitcoind('getrawmempool'))
707         touched_addresses = set([])
708
709         # get new transactions
710         new_tx = {}
711         for tx_hash in mempool_hashes:
712             if tx_hash in self.mempool_hashes:
713                 continue
714
715             tx = self.get_mempool_transaction(tx_hash)
716             if not tx:
717                 continue
718
719             new_tx[tx_hash] = tx
720             self.mempool_hashes.add(tx_hash)
721
722         # remove older entries from mempool_hashes
723         self.mempool_hashes = mempool_hashes
724
725
726         # check all tx outputs
727         for tx_hash, tx in new_tx.items():
728             mpa = self.mempool_addresses.get(tx_hash, {})
729             out_values = []
730             for x in tx.get('outputs'):
731                 out_values.append( x['value'] )
732
733                 addr = x.get('address')
734                 if not addr:
735                     continue
736                 v = mpa.get(addr,0)
737                 v += x['value']
738                 mpa[addr] = v
739                 touched_addresses.add(addr)
740
741             self.mempool_addresses[tx_hash] = mpa
742             self.mempool_values[tx_hash] = out_values
743
744         # check all inputs
745         for tx_hash, tx in new_tx.items():
746             mpa = self.mempool_addresses.get(tx_hash, {})
747             for x in tx.get('inputs'):
748                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
749                 addr = x.get('address')
750                 if not addr:
751                     continue
752
753                 v = self.mempool_values.get(x.get('prevout_hash'))
754                 if v:
755                     value = v[ x.get('prevout_n')]
756                 else:
757                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
758                     value = self.storage.get_utxo_value(addr,txi)
759
760                 v = mpa.get(addr,0)
761                 v -= value
762                 mpa[addr] = v
763                 touched_addresses.add(addr)
764
765             self.mempool_addresses[tx_hash] = mpa
766
767
768         # remove deprecated entries from mempool_addresses
769         for tx_hash, addresses in self.mempool_addresses.items():
770             if tx_hash not in self.mempool_hashes:
771                 self.mempool_addresses.pop(tx_hash)
772                 self.mempool_values.pop(tx_hash)
773                 for addr in addresses:
774                     touched_addresses.add(addr)
775
776         # rebuild mempool histories
777         new_mempool_hist = {}
778         for tx_hash, addresses in self.mempool_addresses.items():
779             for addr, delta in addresses.items():
780                 h = new_mempool_hist.get(addr, [])
781                 if tx_hash not in h:
782                     h.append((tx_hash, delta))
783                 new_mempool_hist[addr] = h
784
785         with self.mempool_lock:
786             self.mempool_hist = new_mempool_hist
787
788         # invalidate cache for touched addresses
789         for addr in touched_addresses:
790             self.invalidate_cache(addr)
791
792
793     def invalidate_cache(self, address):
794         with self.cache_lock:
795             if address in self.history_cache:
796                 print_log("cache: invalidating", address)
797                 self.history_cache.pop(address)
798
799         with self.watch_lock:
800             sessions = self.watched_addresses.get(address)
801
802         if sessions:
803             # TODO: update cache here. if new value equals cached value, do not send notification
804             self.address_queue.put((address,sessions))
805
806     
807     def close(self):
808         self.timer.join()
809         print_log("Closing database...")
810         self.storage.close()
811         print_log("Database is closed")
812
813
814     def main_iteration(self):
815         if self.shared.stopped():
816             print_log("Stopping timer")
817             return
818
819         with self.dblock:
820             t1 = time.time()
821             self.catch_up()
822             t2 = time.time()
823
824         self.memorypool_update()
825
826         if self.sent_height != self.storage.height:
827             self.sent_height = self.storage.height
828             for session in self.watch_blocks:
829                 self.push_response(session, {
830                         'id': None,
831                         'method': 'blockchain.numblocks.subscribe',
832                         'params': [self.storage.height],
833                         })
834
835         if self.sent_header != self.header:
836             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
837             self.sent_header = self.header
838             for session in self.watch_headers:
839                 self.push_response(session, {
840                         'id': None,
841                         'method': 'blockchain.headers.subscribe',
842                         'params': [self.header],
843                         })
844
845         while True:
846             try:
847                 addr, sessions = self.address_queue.get(False)
848             except:
849                 break
850
851             status = self.get_status(addr)
852             for session in sessions:
853                 self.push_response(session, {
854                         'id': None,
855                         'method': 'blockchain.address.subscribe',
856                         'params': [addr, status],
857                         })
858
859         # next iteration 
860         self.timer = threading.Timer(10, self.main_iteration)
861         self.timer.start()
862