3d93fd89acc54c9c4e6b74ab1ba4f36e4dc0b77c
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12 from ltc_scrypt import getPoWHash
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_values = {}
42         self.mempool_addresses = {}
43         self.mempool_hist = {}
44         self.mempool_hashes = set([])
45         self.mempool_lock = threading.Lock()
46
47         self.address_queue = Queue()
48
49         try:
50             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
51         except:
52             self.test_reorgs = False
53         self.storage = Storage(config, shared, self.test_reorgs)
54
55         self.dblock = threading.Lock()
56
57         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
58             config.get('bitcoind', 'user'),
59             config.get('bitcoind', 'password'),
60             config.get('bitcoind', 'host'),
61             config.get('bitcoind', 'port'))
62
63         while True:
64             try:
65                 self.bitcoind('getinfo')
66                 break
67             except:
68                 print_log('cannot contact novacoind...')
69                 time.sleep(5)
70                 continue
71
72         self.sent_height = 0
73         self.sent_header = None
74
75         # catch_up headers
76         self.init_headers(self.storage.height)
77
78         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
79         while not shared.stopped() and not self.up_to_date:
80             try:
81                 time.sleep(1)
82             except:
83                 print "keyboard interrupt: stopping threads"
84                 shared.stop()
85                 sys.exit(0)
86
87         print_log("Blockchain is up to date.")
88         self.memorypool_update()
89         print_log("Memory pool initialized.")
90
91         self.timer = threading.Timer(10, self.main_iteration)
92         self.timer.start()
93
94
95
96     def mtime(self, name):
97         now = time.time()
98         if name != '':
99             delta = now - self.now
100             t = self.mtimes.get(name, 0)
101             self.mtimes[name] = t + delta
102         self.now = now
103
104     def print_mtime(self):
105         s = ''
106         for k, v in self.mtimes.items():
107             s += k+':'+"%.2f"%v+' '
108         print_log(s)
109
110
111     def bitcoind(self, method, params=[]):
112         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
113         try:
114             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
115         except:
116             print_log("error calling novacoind")
117             traceback.print_exc(file=sys.stdout)
118             self.shared.stop()
119
120         r = loads(respdata)
121         if r['error'] is not None:
122             raise BaseException(r['error'])
123         return r.get('result')
124
125
126     def block2header(self, b):
127         return {
128             "block_height": b.get('height'),
129             "version": b.get('version'),
130             "prev_block_hash": b.get('previousblockhash'),
131             "merkle_root": b.get('merkleroot'),
132             "timestamp": b.get('time'),
133             "bits": int(b.get('bits'), 16),
134             "nonce": b.get('nonce'),
135         }
136
137     def get_header(self, height):
138         b = self.bitcoind('getblockbynumber', [height])
139         return self.block2header(b)
140
141     def init_headers(self, db_height):
142         self.chunk_cache = {}
143         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
144
145         if os.path.exists(self.headers_filename):
146             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
147             if height > 0:
148                 prev_hash = self.hash_header(self.read_header(height))
149             else:
150                 prev_hash = None
151         else:
152             open(self.headers_filename, 'wb').close()
153             prev_hash = None
154             height = -1
155
156         if height < db_height:
157             print_log("catching up missing headers:", height, db_height)
158
159         try:
160             while height < db_height:
161                 height = height + 1
162                 header = self.get_header(height)
163                 if height > 1:
164                     assert prev_hash == header.get('prev_block_hash')
165                 self.write_header(header, sync=False)
166                 prev_hash = self.hash_header(header)
167                 if (height % 1000) == 0:
168                     print_log("headers file:", height)
169         except KeyboardInterrupt:
170             self.flush_headers()
171             sys.exit()
172
173         self.flush_headers()
174
175     def hash_header(self, header):
176         return rev_hex(getPoWHash(header_to_string(header).decode('hex')).encode('hex'))
177
178     def read_header(self, block_height):
179         if os.path.exists(self.headers_filename):
180             with open(self.headers_filename, 'rb') as f:
181                 f.seek(block_height * 80)
182                 h = f.read(80)
183             if len(h) == 80:
184                 h = header_from_string(h)
185                 return h
186
187     def read_chunk(self, index):
188         with open(self.headers_filename, 'rb') as f:
189             f.seek(index*2016*80)
190             chunk = f.read(2016*80)
191         return chunk.encode('hex')
192
193     def write_header(self, header, sync=True):
194         if not self.headers_data:
195             self.headers_offset = header.get('block_height')
196
197         self.headers_data += header_to_string(header).decode('hex')
198         if sync or len(self.headers_data) > 40*100:
199             self.flush_headers()
200
201         with self.cache_lock:
202             chunk_index = header.get('block_height')/2016
203             if self.chunk_cache.get(chunk_index):
204                 self.chunk_cache.pop(chunk_index)
205
206     def pop_header(self):
207         # we need to do this only if we have not flushed
208         if self.headers_data:
209             self.headers_data = self.headers_data[:-40]
210
211     def flush_headers(self):
212         if not self.headers_data:
213             return
214         with open(self.headers_filename, 'rb+') as f:
215             f.seek(self.headers_offset*80)
216             f.write(self.headers_data)
217         self.headers_data = ''
218
219     def get_chunk(self, i):
220         # store them on disk; store the current chunk in memory
221         with self.cache_lock:
222             chunk = self.chunk_cache.get(i)
223             if not chunk:
224                 chunk = self.read_chunk(i)
225                 self.chunk_cache[i] = chunk
226
227         return chunk
228
229     def get_mempool_transaction(self, txid):
230         try:
231             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
232         except:
233             return None
234
235         vds = deserialize.BCDataStream()
236         vds.write(raw_tx.decode('hex'))
237         try:
238             return deserialize.parse_Transaction(vds, is_coinbase=False)
239         except:
240             print_log("ERROR: cannot parse", txid)
241             return None
242
243
244     def get_history(self, addr, cache_only=False):
245         with self.cache_lock:
246             hist = self.history_cache.get(addr)
247         if hist is not None:
248             return hist
249         if cache_only:
250             return -1
251
252         with self.dblock:
253             try:
254                 hist = self.storage.get_history(addr)
255                 is_known = True
256             except:
257                 print_log("error get_history")
258                 traceback.print_exc(file=sys.stdout)
259                 raise
260             if hist:
261                 is_known = True
262             else:
263                 hist = []
264                 is_known = False
265
266         # add memory pool
267         with self.mempool_lock:
268             for txid, delta in self.mempool_hist.get(addr, []):
269                 hist.append({'tx_hash':txid, 'height':0})
270
271         # add something to distinguish between unused and empty addresses
272         if hist == [] and is_known:
273             hist = ['*']
274
275         with self.cache_lock:
276             self.history_cache[addr] = hist
277         return hist
278
279
280     def get_unconfirmed_value(self, addr):
281         v = 0
282         with self.mempool_lock:
283             for txid, delta in self.mempool_hist.get(addr, []):
284                 v += delta
285         return v
286
287
288     def get_status(self, addr, cache_only=False):
289         tx_points = self.get_history(addr, cache_only)
290         if cache_only and tx_points == -1:
291             return -1
292
293         if not tx_points:
294             return None
295         if tx_points == ['*']:
296             return '*'
297         status = ''
298         for tx in tx_points:
299             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
300         return hashlib.sha256(status).digest().encode('hex')
301
302     def get_merkle(self, tx_hash, height):
303
304         b = self.bitcoind('getblockbynumber', [height])
305         tx_list = b.get('tx')
306         tx_pos = tx_list.index(tx_hash)
307
308         merkle = map(hash_decode, tx_list)
309         target_hash = hash_decode(tx_hash)
310         s = []
311         while len(merkle) != 1:
312             if len(merkle) % 2:
313                 merkle.append(merkle[-1])
314             n = []
315             while merkle:
316                 new_hash = Hash(merkle[0] + merkle[1])
317                 if merkle[0] == target_hash:
318                     s.append(hash_encode(merkle[1]))
319                     target_hash = new_hash
320                 elif merkle[1] == target_hash:
321                     s.append(hash_encode(merkle[0]))
322                     target_hash = new_hash
323                 n.append(new_hash)
324                 merkle = merkle[2:]
325             merkle = n
326
327         return {"block_height": height, "merkle": s, "pos": tx_pos}
328
329
330     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
331         # keep it sorted
332         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
333         assert len(s) == 80
334
335         serialized_hist = self.batch_list[addr]
336
337         l = len(serialized_hist)/80
338         for i in range(l-1, -1, -1):
339             item = serialized_hist[80*i:80*(i+1)]
340             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
341             if item_height <= tx_height:
342                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
343                 break
344         else:
345             serialized_hist = s + serialized_hist
346
347         self.batch_list[addr] = serialized_hist
348
349         # backlink
350         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
351         self.batch_txio[txo] = addr
352
353
354
355
356
357
358     def deserialize_block(self, block):
359         txlist = block.get('tx')
360
361         tx_hashes = []  # ordered txids
362         txdict = {}     # deserialized tx
363
364         for i, raw_tx in enumerate(txlist):
365             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
366             vds = deserialize.BCDataStream()
367             vds.write(raw_tx.decode('hex'))
368             try:
369                 tx = deserialize.parse_Transaction(vds, i == 0) # first transaction is always coinbase
370             except:
371                 print_log("ERROR: cannot parse", tx_hash)
372                 continue
373             tx_hashes.append(tx_hash)
374             txdict[tx_hash] = tx
375         return tx_hashes, txdict
376
377
378
379     def import_block(self, block, block_hash, block_height, sync, revert=False):
380
381         touched_addr = set([])
382
383         # deserialize transactions
384         tx_hashes, txdict = self.deserialize_block(block)
385
386         # undo info
387         if revert:
388             undo_info = self.storage.get_undo_info(block_height)
389             tx_hashes.reverse()
390         else:
391             undo_info = {}
392
393         for txid in tx_hashes:  # must be ordered
394             tx = txdict[txid]
395             if not revert:
396                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
397                 undo_info[txid] = undo
398             else:
399                 undo = undo_info.pop(txid)
400                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
401
402         if revert: 
403             assert undo_info == {}
404
405         # add undo info
406         if not revert:
407             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
408
409         # add the max
410         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
411
412         for addr in touched_addr:
413             self.invalidate_cache(addr)
414
415         self.storage.update_hashes()
416
417
418     def add_request(self, session, request):
419         # see if we can get if from cache. if not, add to queue
420         if self.process(session, request, cache_only=True) == -1:
421             self.queue.put((session, request))
422
423
424     def do_subscribe(self, method, params, session):
425         with self.watch_lock:
426             if method == 'blockchain.numblocks.subscribe':
427                 if session not in self.watch_blocks:
428                     self.watch_blocks.append(session)
429
430             elif method == 'blockchain.headers.subscribe':
431                 if session not in self.watch_headers:
432                     self.watch_headers.append(session)
433
434             elif method == 'blockchain.address.subscribe':
435                 address = params[0]
436                 l = self.watched_addresses.get(address)
437                 if l is None:
438                     self.watched_addresses[address] = [session]
439                 elif session not in l:
440                     l.append(session)
441
442
443     def do_unsubscribe(self, method, params, session):
444         with self.watch_lock:
445             if method == 'blockchain.numblocks.subscribe':
446                 if session in self.watch_blocks:
447                     self.watch_blocks.remove(session)
448             elif method == 'blockchain.headers.subscribe':
449                 if session in self.watch_headers:
450                     self.watch_headers.remove(session)
451             elif method == "blockchain.address.subscribe":
452                 addr = params[0]
453                 l = self.watched_addresses.get(addr)
454                 if not l:
455                     return
456                 if session in l:
457                     l.remove(session)
458                 if session in l:
459                     print_log("error rc!!")
460                     self.shared.stop()
461                 if l == []:
462                     self.watched_addresses.pop(addr)
463
464
465     def process(self, session, request, cache_only=False):
466         
467         message_id = request['id']
468         method = request['method']
469         params = request.get('params', [])
470         result = None
471         error = None
472
473         if method == 'blockchain.numblocks.subscribe':
474             result = self.storage.height
475
476         elif method == 'blockchain.headers.subscribe':
477             result = self.header
478
479         elif method == 'blockchain.address.subscribe':
480             try:
481                 address = str(params[0])
482                 result = self.get_status(address, cache_only)
483             except BaseException, e:
484                 error = str(e) + ': ' + address
485                 print_log("error:", error)
486
487         elif method == 'blockchain.address.get_history':
488             try:
489                 address = str(params[0])
490                 result = self.get_history(address, cache_only)
491             except BaseException, e:
492                 error = str(e) + ': ' + address
493                 print_log("error:", error)
494
495         elif method == 'blockchain.address.get_mempool':
496             try:
497                 address = str(params[0])
498                 result = self.get_unconfirmed_history(address, cache_only)
499             except BaseException, e:
500                 error = str(e) + ': ' + address
501                 print_log("error:", error)
502
503         elif method == 'blockchain.address.get_balance':
504             try:
505                 address = str(params[0])
506                 confirmed = self.storage.get_balance(address)
507                 unconfirmed = self.get_unconfirmed_value(address)
508                 result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
509             except BaseException, e:
510                 error = str(e) + ': ' + address
511                 print_log("error:", error)
512
513         elif method == 'blockchain.address.get_proof':
514             try:
515                 address = str(params[0])
516                 result = self.storage.get_proof(address)
517             except BaseException, e:
518                 error = str(e) + ': ' + address
519                 print_log("error:", error)
520
521         elif method == 'blockchain.address.listunspent':
522             try:
523                 address = str(params[0])
524                 result = self.storage.listunspent(address)
525             except BaseException, e:
526                 error = str(e) + ': ' + address
527                 print_log("error:", error)
528
529         elif method == 'blockchain.utxo.get_address':
530             try:
531                 txid = str(params[0])
532                 pos = int(params[1])
533                 txi = (txid + int_to_hex(pos, 4)).decode('hex')
534                 result = self.storage.get_address(txi)
535             except BaseException, e:
536                 error = str(e)
537                 print_log("error:", error, params)
538
539         elif method == 'blockchain.block.get_header':
540             if cache_only:
541                 result = -1
542             else:
543                 try:
544                     height = int(params[0])
545                     result = self.get_header(height)
546                 except BaseException, e:
547                     error = str(e) + ': %d' % height
548                     print_log("error:", error)
549
550         elif method == 'blockchain.block.get_chunk':
551             if cache_only:
552                 result = -1
553             else:
554                 try:
555                     index = int(params[0])
556                     result = self.get_chunk(index)
557                 except BaseException, e:
558                     error = str(e) + ': %d' % index
559                     print_log("error:", error)
560
561         elif method == 'blockchain.transaction.broadcast':
562             try:
563                 txo = self.bitcoind('sendrawtransaction', params)
564                 print_log("sent tx:", txo)
565                 result = txo
566             except BaseException, e:
567                 result = str(e)  # do not send an error
568                 print_log("error:", result, params)
569
570         elif method == 'blockchain.transaction.get_merkle':
571             if cache_only:
572                 result = -1
573             else:
574                 try:
575                     tx_hash = params[0]
576                     tx_height = params[1]
577                     result = self.get_merkle(tx_hash, tx_height)
578                 except BaseException, e:
579                     error = str(e) + ': ' + repr(params)
580                     print_log("get_merkle error:", error)
581
582         elif method == 'blockchain.transaction.get':
583             try:
584                 tx_hash = params[0]
585                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
586             except BaseException, e:
587                 error = str(e) + ': ' + repr(params)
588                 print_log("tx get error:", error)
589
590         else:
591             error = "unknown method:%s" % method
592
593         if cache_only and result == -1:
594             return -1
595
596         if error:
597             self.push_response(session, {'id': message_id, 'error': error})
598         elif result != '':
599             self.push_response(session, {'id': message_id, 'result': result})
600
601     def catch_up(self, sync=True):
602
603         prev_root_hash = None
604         while not self.shared.stopped():
605
606             self.mtime('')
607
608             # are we done yet?
609             info = self.bitcoind('getinfo')
610             self.bitcoind_height = info.get('blocks')
611             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
612             if self.storage.last_hash == bitcoind_block_hash:
613                 self.up_to_date = True
614                 break
615
616             # not done..
617             self.up_to_date = False
618             next_block = self.bitcoind('getblockbynumber', [self.storage.height + 1, True])
619             next_block_hash = next_block.get('hash')
620             self.mtime('daemon')
621
622             # fixme: this is unsafe, if we revert when the undo info is not yet written
623             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
624
625             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
626
627                 prev_root_hash = self.storage.get_root_hash()
628
629                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
630                 self.storage.height = self.storage.height + 1
631                 self.write_header(self.block2header(next_block), sync)
632                 self.storage.last_hash = next_block_hash
633                 self.mtime('import')
634             
635                 if self.storage.height % 1000 == 0 and not sync:
636                     t_daemon = self.mtimes.get('daemon')
637                     t_import = self.mtimes.get('import')
638                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
639                     self.mtimes['daemon'] = 0
640                     self.mtimes['import'] = 0
641
642             else:
643
644                 # revert current block
645                 block = self.bitcoind('getblock', [self.storage.last_hash, True])
646                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
647                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
648                 self.pop_header()
649                 self.flush_headers()
650
651                 self.storage.height -= 1
652
653                 # read previous header from disk
654                 self.header = self.read_header(self.storage.height)
655                 self.storage.last_hash = self.hash_header(self.header)
656
657                 if prev_root_hash:
658                     assert prev_root_hash == self.storage.get_root_hash()
659                     prev_root_hash = None
660
661
662         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
663         self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
664
665         if self.shared.stopped(): 
666             print_log( "closing database" )
667             self.storage.close()
668
669
670     def memorypool_update(self):
671         mempool_hashes = set(self.bitcoind('getrawmempool'))
672         touched_addresses = set([])
673
674         # get new transactions
675         new_tx = {}
676         for tx_hash in mempool_hashes:
677             if tx_hash in self.mempool_hashes:
678                 continue
679
680             tx = self.get_mempool_transaction(tx_hash)
681             if not tx:
682                 continue
683
684             new_tx[tx_hash] = tx
685             self.mempool_hashes.add(tx_hash)
686
687         # remove older entries from mempool_hashes
688         self.mempool_hashes = mempool_hashes
689
690
691         # check all tx outputs
692         for tx_hash, tx in new_tx.items():
693             mpa = self.mempool_addresses.get(tx_hash, {})
694             out_values = []
695             for x in tx.get('outputs'):
696                 out_values.append( x['value'] )
697
698                 addr = x.get('address')
699                 if not addr:
700                     continue
701                 v = mpa.get(addr,0)
702                 v += x['value']
703                 mpa[addr] = v
704                 touched_addresses.add(addr)
705
706             self.mempool_addresses[tx_hash] = mpa
707             self.mempool_values[tx_hash] = out_values
708
709         # check all inputs
710         for tx_hash, tx in new_tx.items():
711             mpa = self.mempool_addresses.get(tx_hash, {})
712             for x in tx.get('inputs'):
713                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
714                 addr = x.get('address')
715                 if not addr:
716                     continue
717
718                 v = self.mempool_values.get(x.get('prevout_hash'))
719                 if v:
720                     value = v[ x.get('prevout_n')]
721                 else:
722                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
723                     try:
724                         value = self.storage.get_utxo_value(addr,txi)
725                     except:
726                         print_log("utxo not in database; postponing mempool update")
727                         return
728
729                 v = mpa.get(addr,0)
730                 v -= value
731                 mpa[addr] = v
732                 touched_addresses.add(addr)
733
734             self.mempool_addresses[tx_hash] = mpa
735
736
737         # remove deprecated entries from mempool_addresses
738         for tx_hash, addresses in self.mempool_addresses.items():
739             if tx_hash not in self.mempool_hashes:
740                 self.mempool_addresses.pop(tx_hash)
741                 self.mempool_values.pop(tx_hash)
742                 for addr in addresses:
743                     touched_addresses.add(addr)
744
745         # rebuild mempool histories
746         new_mempool_hist = {}
747         for tx_hash, addresses in self.mempool_addresses.items():
748             for addr, delta in addresses.items():
749                 h = new_mempool_hist.get(addr, [])
750                 if tx_hash not in h:
751                     h.append((tx_hash, delta))
752                 new_mempool_hist[addr] = h
753
754         with self.mempool_lock:
755             self.mempool_hist = new_mempool_hist
756
757         # invalidate cache for touched addresses
758         for addr in touched_addresses:
759             self.invalidate_cache(addr)
760
761
762     def invalidate_cache(self, address):
763         with self.cache_lock:
764             if address in self.history_cache:
765                 print_log("cache: invalidating", address)
766                 self.history_cache.pop(address)
767
768         with self.watch_lock:
769             sessions = self.watched_addresses.get(address)
770
771         if sessions:
772             # TODO: update cache here. if new value equals cached value, do not send notification
773             self.address_queue.put((address,sessions))
774
775     
776     def close(self):
777         self.timer.join()
778         print_log("Closing database...")
779         self.storage.close()
780         print_log("Database is closed")
781
782
783     def main_iteration(self):
784         if self.shared.stopped():
785             print_log("Stopping timer")
786             return
787
788         with self.dblock:
789             t1 = time.time()
790             self.catch_up()
791             t2 = time.time()
792
793         self.memorypool_update()
794
795         if self.sent_height != self.storage.height:
796             self.sent_height = self.storage.height
797             for session in self.watch_blocks:
798                 self.push_response(session, {
799                         'id': None,
800                         'method': 'blockchain.numblocks.subscribe',
801                         'params': [self.storage.height],
802                         })
803
804         if self.sent_header != self.header:
805             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
806             self.sent_header = self.header
807             for session in self.watch_headers:
808                 self.push_response(session, {
809                         'id': None,
810                         'method': 'blockchain.headers.subscribe',
811                         'params': [self.header],
812                         })
813
814         while True:
815             try:
816                 addr, sessions = self.address_queue.get(False)
817             except:
818                 break
819
820             status = self.get_status(addr)
821             for session in sessions:
822                 self.push_response(session, {
823                         'id': None,
824                         'method': 'blockchain.address.subscribe',
825                         'params': [addr, status],
826                         })
827
828         # next iteration 
829         self.timer = threading.Timer(10, self.main_iteration)
830         self.timer.start()
831