обработка исключения при реорге
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12 from ltc_scrypt import getPoWHash
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_values = {}
42         self.mempool_addresses = {}
43         self.mempool_hist = {}
44         self.mempool_hashes = set([])
45         self.mempool_lock = threading.Lock()
46
47         self.address_queue = Queue()
48
49         try:
50             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
51         except:
52             self.test_reorgs = False
53         self.storage = Storage(config, shared, self.test_reorgs)
54
55         self.dblock = threading.Lock()
56
57         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
58             config.get('bitcoind', 'user'),
59             config.get('bitcoind', 'password'),
60             config.get('bitcoind', 'host'),
61             config.get('bitcoind', 'port'))
62
63         while True:
64             try:
65                 self.bitcoind('getinfo')
66                 break
67             except:
68                 print_log('cannot contact novacoind...')
69                 time.sleep(5)
70                 continue
71
72         self.sent_height = 0
73         self.sent_header = None
74
75         # catch_up headers
76         self.init_headers(self.storage.height)
77
78         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
79         while not shared.stopped() and not self.up_to_date:
80             try:
81                 time.sleep(1)
82             except:
83                 print "keyboard interrupt: stopping threads"
84                 shared.stop()
85                 sys.exit(0)
86
87         print_log("Blockchain is up to date.")
88         self.memorypool_update()
89         print_log("Memory pool initialized.")
90
91         self.timer = threading.Timer(10, self.main_iteration)
92         self.timer.start()
93
94
95
96     def mtime(self, name):
97         now = time.time()
98         if name != '':
99             delta = now - self.now
100             t = self.mtimes.get(name, 0)
101             self.mtimes[name] = t + delta
102         self.now = now
103
104     def print_mtime(self):
105         s = ''
106         for k, v in self.mtimes.items():
107             s += k+':'+"%.2f"%v+' '
108         print_log(s)
109
110
111     def bitcoind(self, method, params=[]):
112         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
113         try:
114             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
115         except:
116             print_log("error calling novacoind")
117             traceback.print_exc(file=sys.stdout)
118             self.shared.stop()
119
120         r = loads(respdata)
121         if r['error'] is not None:
122             raise BaseException(r['error'])
123         return r.get('result')
124
125
126     def block2header(self, b):
127         return {
128             "block_height": b.get('height'),
129             "version": b.get('version'),
130             "prev_block_hash": b.get('previousblockhash'),
131             "merkle_root": b.get('merkleroot'),
132             "timestamp": b.get('time'),
133             "bits": int(b.get('bits'), 16),
134             "nonce": b.get('nonce'),
135         }
136
137     def get_header(self, height):
138         b = self.bitcoind('getblockbynumber', [height])
139         return self.block2header(b)
140
141     def init_headers(self, db_height):
142         self.chunk_cache = {}
143         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
144
145         if os.path.exists(self.headers_filename):
146             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
147             if height > 0:
148                 prev_hash = self.hash_header(self.read_header(height))
149             else:
150                 prev_hash = None
151         else:
152             open(self.headers_filename, 'wb').close()
153             prev_hash = None
154             height = -1
155
156         if height < db_height:
157             print_log("catching up missing headers:", height, db_height)
158
159         try:
160             while height < db_height:
161                 height += 1
162                 header = self.get_header(height)
163                 if height > 1:
164                     if prev_hash != header.get('prev_block_hash'):
165                         # The prev_hash block is orphaned, go back
166                         print_log("reorganizing, a block in file is orphaned:", prev_hash)
167                         # Go to the parent of the orphaned block
168                         height -= 2
169                         prev_hash = self.hash_header(self.read_header(height))
170                         continue
171
172                 self.write_header(header, sync=False)
173                 prev_hash = self.hash_header(header)
174                 if (height % 1000) == 0:
175                     print_log("headers file:", height)
176         except KeyboardInterrupt:
177             self.flush_headers()
178             sys.exit()
179
180         self.flush_headers()
181
182     def hash_header(self, header):
183         return rev_hex(getPoWHash(header_to_string(header).decode('hex')).encode('hex'))
184
185     def read_header(self, block_height):
186         if os.path.exists(self.headers_filename):
187             with open(self.headers_filename, 'rb') as f:
188                 f.seek(block_height * 80)
189                 h = f.read(80)
190             if len(h) == 80:
191                 h = header_from_string(h)
192                 return h
193
194     def read_chunk(self, index):
195         with open(self.headers_filename, 'rb') as f:
196             f.seek(index*2016*80)
197             chunk = f.read(2016*80)
198         return chunk.encode('hex')
199
200     def write_header(self, header, sync=True):
201         if not self.headers_data:
202             self.headers_offset = header.get('block_height')
203
204         self.headers_data += header_to_string(header).decode('hex')
205         if sync or len(self.headers_data) > 40*100:
206             self.flush_headers()
207
208         with self.cache_lock:
209             chunk_index = header.get('block_height')/2016
210             if self.chunk_cache.get(chunk_index):
211                 self.chunk_cache.pop(chunk_index)
212
213     def pop_header(self):
214         # we need to do this only if we have not flushed
215         if self.headers_data:
216             self.headers_data = self.headers_data[:-40]
217
218     def flush_headers(self):
219         if not self.headers_data:
220             return
221         with open(self.headers_filename, 'rb+') as f:
222             f.seek(self.headers_offset*80)
223             f.write(self.headers_data)
224         self.headers_data = ''
225
226     def get_chunk(self, i):
227         # store them on disk; store the current chunk in memory
228         with self.cache_lock:
229             chunk = self.chunk_cache.get(i)
230             if not chunk:
231                 chunk = self.read_chunk(i)
232                 self.chunk_cache[i] = chunk
233
234         return chunk
235
236     def get_mempool_transaction(self, txid):
237         try:
238             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
239         except:
240             return None
241
242         vds = deserialize.BCDataStream()
243         vds.write(raw_tx.decode('hex'))
244         try:
245             return deserialize.parse_Transaction(vds, is_coinbase=False)
246         except:
247             print_log("ERROR: cannot parse", txid)
248             return None
249
250
251     def get_history(self, addr, cache_only=False):
252         with self.cache_lock:
253             hist = self.history_cache.get(addr)
254         if hist is not None:
255             return hist
256         if cache_only:
257             return -1
258
259         with self.dblock:
260             try:
261                 hist = self.storage.get_history(addr)
262                 is_known = True
263             except:
264                 print_log("error get_history")
265                 traceback.print_exc(file=sys.stdout)
266                 raise
267             if hist:
268                 is_known = True
269             else:
270                 hist = []
271                 is_known = False
272
273         # add memory pool
274         with self.mempool_lock:
275             for txid, delta in self.mempool_hist.get(addr, []):
276                 hist.append({'tx_hash':txid, 'height':0})
277
278         # add something to distinguish between unused and empty addresses
279         if hist == [] and is_known:
280             hist = ['*']
281
282         with self.cache_lock:
283             self.history_cache[addr] = hist
284         return hist
285
286
287     def get_unconfirmed_value(self, addr):
288         v = 0
289         with self.mempool_lock:
290             for txid, delta in self.mempool_hist.get(addr, []):
291                 v += delta
292         return v
293
294
295     def get_status(self, addr, cache_only=False):
296         tx_points = self.get_history(addr, cache_only)
297         if cache_only and tx_points == -1:
298             return -1
299
300         if not tx_points:
301             return None
302         if tx_points == ['*']:
303             return '*'
304         status = ''
305         for tx in tx_points:
306             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
307         return hashlib.sha256(status).digest().encode('hex')
308
309     def get_merkle(self, tx_hash, height):
310
311         b = self.bitcoind('getblockbynumber', [height])
312         tx_list = b.get('tx')
313         tx_pos = tx_list.index(tx_hash)
314
315         merkle = map(hash_decode, tx_list)
316         target_hash = hash_decode(tx_hash)
317         s = []
318         while len(merkle) != 1:
319             if len(merkle) % 2:
320                 merkle.append(merkle[-1])
321             n = []
322             while merkle:
323                 new_hash = Hash(merkle[0] + merkle[1])
324                 if merkle[0] == target_hash:
325                     s.append(hash_encode(merkle[1]))
326                     target_hash = new_hash
327                 elif merkle[1] == target_hash:
328                     s.append(hash_encode(merkle[0]))
329                     target_hash = new_hash
330                 n.append(new_hash)
331                 merkle = merkle[2:]
332             merkle = n
333
334         return {"block_height": height, "merkle": s, "pos": tx_pos}
335
336
337     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
338         # keep it sorted
339         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
340         assert len(s) == 80
341
342         serialized_hist = self.batch_list[addr]
343
344         l = len(serialized_hist)/80
345         for i in range(l-1, -1, -1):
346             item = serialized_hist[80*i:80*(i+1)]
347             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
348             if item_height <= tx_height:
349                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
350                 break
351         else:
352             serialized_hist = s + serialized_hist
353
354         self.batch_list[addr] = serialized_hist
355
356         # backlink
357         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
358         self.batch_txio[txo] = addr
359
360
361
362
363
364
365     def deserialize_block(self, block):
366         txlist = block.get('tx')
367
368         tx_hashes = []  # ordered txids
369         txdict = {}     # deserialized tx
370
371         for i, raw_tx in enumerate(txlist):
372             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
373             vds = deserialize.BCDataStream()
374             vds.write(raw_tx.decode('hex'))
375             try:
376                 tx = deserialize.parse_Transaction(vds, i == 0) # first transaction is always coinbase
377             except:
378                 print_log("ERROR: cannot parse", tx_hash)
379                 continue
380             tx_hashes.append(tx_hash)
381             txdict[tx_hash] = tx
382         return tx_hashes, txdict
383
384
385
386     def import_block(self, block, block_hash, block_height, sync, revert=False):
387
388         touched_addr = set([])
389
390         # deserialize transactions
391         tx_hashes, txdict = self.deserialize_block(block)
392
393         # undo info
394         if revert:
395             undo_info = self.storage.get_undo_info(block_height)
396             tx_hashes.reverse()
397         else:
398             undo_info = {}
399
400         for txid in tx_hashes:  # must be ordered
401             tx = txdict[txid]
402             if not revert:
403                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
404                 undo_info[txid] = undo
405             else:
406                 undo = undo_info.pop(txid)
407                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
408
409         if revert: 
410             assert undo_info == {}
411
412         # add undo info
413         if not revert:
414             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
415
416         # add the max
417         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
418
419         for addr in touched_addr:
420             self.invalidate_cache(addr)
421
422         self.storage.update_hashes()
423
424
425     def add_request(self, session, request):
426         # see if we can get if from cache. if not, add to queue
427         if self.process(session, request, cache_only=True) == -1:
428             self.queue.put((session, request))
429
430
431     def do_subscribe(self, method, params, session):
432         with self.watch_lock:
433             if method == 'blockchain.numblocks.subscribe':
434                 if session not in self.watch_blocks:
435                     self.watch_blocks.append(session)
436
437             elif method == 'blockchain.headers.subscribe':
438                 if session not in self.watch_headers:
439                     self.watch_headers.append(session)
440
441             elif method == 'blockchain.address.subscribe':
442                 address = params[0]
443                 l = self.watched_addresses.get(address)
444                 if l is None:
445                     self.watched_addresses[address] = [session]
446                 elif session not in l:
447                     l.append(session)
448
449
450     def do_unsubscribe(self, method, params, session):
451         with self.watch_lock:
452             if method == 'blockchain.numblocks.subscribe':
453                 if session in self.watch_blocks:
454                     self.watch_blocks.remove(session)
455             elif method == 'blockchain.headers.subscribe':
456                 if session in self.watch_headers:
457                     self.watch_headers.remove(session)
458             elif method == "blockchain.address.subscribe":
459                 addr = params[0]
460                 l = self.watched_addresses.get(addr)
461                 if not l:
462                     return
463                 if session in l:
464                     l.remove(session)
465                 if session in l:
466                     print_log("error rc!!")
467                     self.shared.stop()
468                 if l == []:
469                     self.watched_addresses.pop(addr)
470
471
472     def process(self, session, request, cache_only=False):
473         
474         message_id = request['id']
475         method = request['method']
476         params = request.get('params', [])
477         result = None
478         error = None
479
480         if method == 'blockchain.numblocks.subscribe':
481             result = self.storage.height
482
483         elif method == 'blockchain.headers.subscribe':
484             result = self.header
485
486         elif method == 'blockchain.address.subscribe':
487             try:
488                 address = str(params[0])
489                 result = self.get_status(address, cache_only)
490             except BaseException, e:
491                 error = str(e) + ': ' + address
492                 print_log("error:", error)
493
494         elif method == 'blockchain.address.get_history':
495             try:
496                 address = str(params[0])
497                 result = self.get_history(address, cache_only)
498             except BaseException, e:
499                 error = str(e) + ': ' + address
500                 print_log("error:", error)
501
502         elif method == 'blockchain.address.get_mempool':
503             try:
504                 address = str(params[0])
505                 result = self.get_unconfirmed_history(address, cache_only)
506             except BaseException, e:
507                 error = str(e) + ': ' + address
508                 print_log("error:", error)
509
510         elif method == 'blockchain.address.get_balance':
511             try:
512                 address = str(params[0])
513                 confirmed = self.storage.get_balance(address)
514                 unconfirmed = self.get_unconfirmed_value(address)
515                 result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
516             except BaseException, e:
517                 error = str(e) + ': ' + address
518                 print_log("error:", error)
519
520         elif method == 'blockchain.address.get_proof':
521             try:
522                 address = str(params[0])
523                 result = self.storage.get_proof(address)
524             except BaseException, e:
525                 error = str(e) + ': ' + address
526                 print_log("error:", error)
527
528         elif method == 'blockchain.address.listunspent':
529             try:
530                 address = str(params[0])
531                 result = self.storage.listunspent(address)
532             except BaseException, e:
533                 error = str(e) + ': ' + address
534                 print_log("error:", error)
535
536         elif method == 'blockchain.utxo.get_address':
537             try:
538                 txid = str(params[0])
539                 pos = int(params[1])
540                 txi = (txid + int_to_hex(pos, 4)).decode('hex')
541                 result = self.storage.get_address(txi)
542             except BaseException, e:
543                 error = str(e)
544                 print_log("error:", error, params)
545
546         elif method == 'blockchain.block.get_header':
547             if cache_only:
548                 result = -1
549             else:
550                 try:
551                     height = int(params[0])
552                     result = self.get_header(height)
553                 except BaseException, e:
554                     error = str(e) + ': %d' % height
555                     print_log("error:", error)
556
557         elif method == 'blockchain.block.get_chunk':
558             if cache_only:
559                 result = -1
560             else:
561                 try:
562                     index = int(params[0])
563                     result = self.get_chunk(index)
564                 except BaseException, e:
565                     error = str(e) + ': %d' % index
566                     print_log("error:", error)
567
568         elif method == 'blockchain.transaction.broadcast':
569             try:
570                 txo = self.bitcoind('sendrawtransaction', params)
571                 print_log("sent tx:", txo)
572                 result = txo
573             except BaseException, e:
574                 result = str(e)  # do not send an error
575                 print_log("error:", result, params)
576
577         elif method == 'blockchain.transaction.get_merkle':
578             if cache_only:
579                 result = -1
580             else:
581                 try:
582                     tx_hash = params[0]
583                     tx_height = params[1]
584                     result = self.get_merkle(tx_hash, tx_height)
585                 except BaseException, e:
586                     error = str(e) + ': ' + repr(params)
587                     print_log("get_merkle error:", error)
588
589         elif method == 'blockchain.transaction.get':
590             try:
591                 tx_hash = params[0]
592                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
593             except BaseException, e:
594                 error = str(e) + ': ' + repr(params)
595                 print_log("tx get error:", error)
596
597         else:
598             error = "unknown method:%s" % method
599
600         if cache_only and result == -1:
601             return -1
602
603         if error:
604             self.push_response(session, {'id': message_id, 'error': error})
605         elif result != '':
606             self.push_response(session, {'id': message_id, 'result': result})
607
608     def catch_up(self, sync=True):
609
610         prev_root_hash = None
611         while not self.shared.stopped():
612
613             self.mtime('')
614
615             # are we done yet?
616             info = self.bitcoind('getinfo')
617             self.bitcoind_height = info.get('blocks')
618             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
619             if self.storage.last_hash == bitcoind_block_hash:
620                 self.up_to_date = True
621                 break
622
623             # fixme: this is unsafe, if we revert when the undo info is not yet written
624             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
625
626             # not done..
627             self.up_to_date = False
628             try:
629                 next_block = self.bitcoind('getblockbynumber', [self.storage.height + 1, True])
630                 next_block_hash = next_block.get('hash')
631             except BaseException, e:
632                 revert = True
633                 next_block = next_block.get(self.storage.last_hash)
634
635             self.mtime('daemon')
636
637             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
638
639                 prev_root_hash = self.storage.get_root_hash()
640
641                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
642                 self.storage.height = self.storage.height + 1
643                 self.write_header(self.block2header(next_block), sync)
644                 self.storage.last_hash = next_block_hash
645                 self.mtime('import')
646             
647                 if self.storage.height % 1000 == 0 and not sync:
648                     t_daemon = self.mtimes.get('daemon')
649                     t_import = self.mtimes.get('import')
650                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
651                     self.mtimes['daemon'] = 0
652                     self.mtimes['import'] = 0
653
654             else:
655
656                 # revert current block
657                 block = self.bitcoind('getblock', [self.storage.last_hash, True])
658                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
659                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
660                 self.pop_header()
661                 self.flush_headers()
662
663                 self.storage.height -= 1
664
665                 # read previous header from disk
666                 self.header = self.read_header(self.storage.height)
667                 self.storage.last_hash = self.hash_header(self.header)
668
669                 if prev_root_hash:
670                     assert prev_root_hash == self.storage.get_root_hash()
671                     prev_root_hash = None
672
673
674         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
675         self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
676
677         if self.shared.stopped(): 
678             print_log( "closing database" )
679             self.storage.close()
680
681
682     def memorypool_update(self):
683         mempool_hashes = set(self.bitcoind('getrawmempool'))
684         touched_addresses = set([])
685
686         # get new transactions
687         new_tx = {}
688         for tx_hash in mempool_hashes:
689             if tx_hash in self.mempool_hashes:
690                 continue
691
692             tx = self.get_mempool_transaction(tx_hash)
693             if not tx:
694                 continue
695
696             new_tx[tx_hash] = tx
697             self.mempool_hashes.add(tx_hash)
698
699         # remove older entries from mempool_hashes
700         self.mempool_hashes = mempool_hashes
701
702
703         # check all tx outputs
704         for tx_hash, tx in new_tx.items():
705             mpa = self.mempool_addresses.get(tx_hash, {})
706             out_values = []
707             for x in tx.get('outputs'):
708                 out_values.append( x['value'] )
709
710                 addr = x.get('address')
711                 if not addr:
712                     continue
713                 v = mpa.get(addr,0)
714                 v += x['value']
715                 mpa[addr] = v
716                 touched_addresses.add(addr)
717
718             self.mempool_addresses[tx_hash] = mpa
719             self.mempool_values[tx_hash] = out_values
720
721         # check all inputs
722         for tx_hash, tx in new_tx.items():
723             mpa = self.mempool_addresses.get(tx_hash, {})
724             for x in tx.get('inputs'):
725                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
726                 addr = x.get('address')
727                 if not addr:
728                     continue
729
730                 v = self.mempool_values.get(x.get('prevout_hash'))
731                 if v:
732                     value = v[ x.get('prevout_n')]
733                 else:
734                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
735                     try:
736                         value = self.storage.get_utxo_value(addr,txi)
737                     except:
738                         print_log("utxo not in database; postponing mempool update")
739                         return
740
741                 v = mpa.get(addr,0)
742                 v -= value
743                 mpa[addr] = v
744                 touched_addresses.add(addr)
745
746             self.mempool_addresses[tx_hash] = mpa
747
748
749         # remove deprecated entries from mempool_addresses
750         for tx_hash, addresses in self.mempool_addresses.items():
751             if tx_hash not in self.mempool_hashes:
752                 self.mempool_addresses.pop(tx_hash)
753                 self.mempool_values.pop(tx_hash)
754                 for addr in addresses:
755                     touched_addresses.add(addr)
756
757         # rebuild mempool histories
758         new_mempool_hist = {}
759         for tx_hash, addresses in self.mempool_addresses.items():
760             for addr, delta in addresses.items():
761                 h = new_mempool_hist.get(addr, [])
762                 if tx_hash not in h:
763                     h.append((tx_hash, delta))
764                 new_mempool_hist[addr] = h
765
766         with self.mempool_lock:
767             self.mempool_hist = new_mempool_hist
768
769         # invalidate cache for touched addresses
770         for addr in touched_addresses:
771             self.invalidate_cache(addr)
772
773
774     def invalidate_cache(self, address):
775         with self.cache_lock:
776             if address in self.history_cache:
777                 print_log("cache: invalidating", address)
778                 self.history_cache.pop(address)
779
780         with self.watch_lock:
781             sessions = self.watched_addresses.get(address)
782
783         if sessions:
784             # TODO: update cache here. if new value equals cached value, do not send notification
785             self.address_queue.put((address,sessions))
786
787     
788     def close(self):
789         self.timer.join()
790         print_log("Closing database...")
791         self.storage.close()
792         print_log("Database is closed")
793
794
795     def main_iteration(self):
796         if self.shared.stopped():
797             print_log("Stopping timer")
798             return
799
800         with self.dblock:
801             t1 = time.time()
802             self.catch_up()
803             t2 = time.time()
804
805         self.memorypool_update()
806
807         if self.sent_height != self.storage.height:
808             self.sent_height = self.storage.height
809             for session in self.watch_blocks:
810                 self.push_response(session, {
811                         'id': None,
812                         'method': 'blockchain.numblocks.subscribe',
813                         'params': [self.storage.height],
814                         })
815
816         if self.sent_header != self.header:
817             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
818             self.sent_header = self.header
819             for session in self.watch_headers:
820                 self.push_response(session, {
821                         'id': None,
822                         'method': 'blockchain.headers.subscribe',
823                         'params': [self.header],
824                         })
825
826         while True:
827             try:
828                 addr, sessions = self.address_queue.get(False)
829             except:
830                 break
831
832             status = self.get_status(addr)
833             for session in sessions:
834                 self.push_response(session, {
835                         'id': None,
836                         'method': 'blockchain.address.subscribe',
837                         'params': [addr, status],
838                         })
839
840         # next iteration 
841         self.timer = threading.Timer(10, self.main_iteration)
842         self.timer.start()
843