fb68f255b496b1c54a40bbefb3b7f799f6dfd2bc
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import os
5 from Queue import Queue
6 import random
7 import sys
8 import time
9 import threading
10 import traceback
11 import urllib
12 from ltc_scrypt import getPoWHash
13 from backends.bitcoind import deserialize
14 from processor import Processor, print_log
15 from utils import *
16
17 from storage import Storage
18
19
20 class BlockchainProcessor(Processor):
21
22     def __init__(self, config, shared):
23         Processor.__init__(self)
24
25         self.mtimes = {} # monitoring
26         self.shared = shared
27         self.config = config
28         self.up_to_date = False
29
30         self.watch_lock = threading.Lock()
31         self.watch_blocks = []
32         self.watch_headers = []
33         self.watched_addresses = {}
34
35         self.history_cache = {}
36         self.chunk_cache = {}
37         self.cache_lock = threading.Lock()
38         self.headers_data = ''
39         self.headers_path = config.get('leveldb', 'path_fulltree')
40
41         self.mempool_values = {}
42         self.mempool_addresses = {}
43         self.mempool_hist = {}
44         self.mempool_hashes = set([])
45         self.mempool_lock = threading.Lock()
46
47         self.address_queue = Queue()
48
49         try:
50             self.test_reorgs = config.getboolean('leveldb', 'test_reorgs')   # simulate random blockchain reorgs
51         except:
52             self.test_reorgs = False
53         self.storage = Storage(config, shared, self.test_reorgs)
54
55         self.dblock = threading.Lock()
56
57         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
58             config.get('bitcoind', 'user'),
59             config.get('bitcoind', 'password'),
60             config.get('bitcoind', 'host'),
61             config.get('bitcoind', 'port'))
62
63         while True:
64             try:
65                 self.bitcoind('getinfo')
66                 break
67             except:
68                 print_log('cannot contact novacoind...')
69                 time.sleep(5)
70                 continue
71
72         self.sent_height = 0
73         self.sent_header = None
74
75         # catch_up headers
76         self.init_headers(self.storage.height)
77
78         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
79         while not shared.stopped() and not self.up_to_date:
80             try:
81                 time.sleep(1)
82             except:
83                 print "keyboard interrupt: stopping threads"
84                 shared.stop()
85                 sys.exit(0)
86
87         print_log("Blockchain is up to date.")
88         self.memorypool_update()
89         print_log("Memory pool initialized.")
90
91         self.timer = threading.Timer(10, self.main_iteration)
92         self.timer.start()
93
94
95
96     def mtime(self, name):
97         now = time.time()
98         if name != '':
99             delta = now - self.now
100             t = self.mtimes.get(name, 0)
101             self.mtimes[name] = t + delta
102         self.now = now
103
104     def print_mtime(self):
105         s = ''
106         for k, v in self.mtimes.items():
107             s += k+':'+"%.2f"%v+' '
108         print_log(s)
109
110
111     def bitcoind(self, method, params=[]):
112         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
113         try:
114             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
115         except:
116             print_log("error calling novacoind")
117             traceback.print_exc(file=sys.stdout)
118             self.shared.stop()
119
120         r = loads(respdata)
121         if r['error'] is not None:
122             raise BaseException(r['error'])
123         return r.get('result')
124
125
126     def block2header(self, b):
127         return {
128             "block_height": b.get('height'),
129             "version": b.get('version'),
130             "prev_block_hash": b.get('previousblockhash'),
131             "merkle_root": b.get('merkleroot'),
132             "timestamp": b.get('time'),
133             "bits": int(b.get('bits'), 16),
134             "nonce": b.get('nonce'),
135         }
136
137     def get_header(self, height):
138         b = self.bitcoind('getblockbynumber', [height])
139         return self.block2header(b)
140
141     def init_headers(self, db_height):
142         self.chunk_cache = {}
143         self.headers_filename = os.path.join(self.headers_path, 'blockchain_headers')
144
145         if os.path.exists(self.headers_filename):
146             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
147             if height > 0:
148                 prev_hash = self.hash_header(self.read_header(height))
149             else:
150                 prev_hash = None
151         else:
152             open(self.headers_filename, 'wb').close()
153             prev_hash = None
154             height = -1
155
156         if height < db_height:
157             print_log("catching up missing headers:", height, db_height)
158
159         try:
160             while height < db_height:
161                 height = height + 1
162                 header = self.get_header(height)
163                 if height > 1:
164                     assert prev_hash == header.get('prev_block_hash')
165                 self.write_header(header, sync=False)
166                 prev_hash = self.hash_header(header)
167                 if (height % 1000) == 0:
168                     print_log("headers file:", height)
169         except KeyboardInterrupt:
170             self.flush_headers()
171             sys.exit()
172
173         self.flush_headers()
174
175     def hash_header(self, header):
176         return rev_hex(getPoWHash(header_to_string(header).decode('hex')).encode('hex'))
177
178     def read_header(self, block_height):
179         if os.path.exists(self.headers_filename):
180             with open(self.headers_filename, 'rb') as f:
181                 f.seek(block_height * 80)
182                 h = f.read(80)
183             if len(h) == 80:
184                 h = header_from_string(h)
185                 return h
186
187     def read_chunk(self, index):
188         with open(self.headers_filename, 'rb') as f:
189             f.seek(index*2016*80)
190             chunk = f.read(2016*80)
191         return chunk.encode('hex')
192
193     def write_header(self, header, sync=True):
194         if not self.headers_data:
195             self.headers_offset = header.get('block_height')
196
197         self.headers_data += header_to_string(header).decode('hex')
198         if sync or len(self.headers_data) > 40*100:
199             self.flush_headers()
200
201         with self.cache_lock:
202             chunk_index = header.get('block_height')/2016
203             if self.chunk_cache.get(chunk_index):
204                 self.chunk_cache.pop(chunk_index)
205
206     def pop_header(self):
207         # we need to do this only if we have not flushed
208         if self.headers_data:
209             self.headers_data = self.headers_data[:-40]
210
211     def flush_headers(self):
212         if not self.headers_data:
213             return
214         with open(self.headers_filename, 'rb+') as f:
215             f.seek(self.headers_offset*80)
216             f.write(self.headers_data)
217         self.headers_data = ''
218
219     def get_chunk(self, i):
220         # store them on disk; store the current chunk in memory
221         with self.cache_lock:
222             chunk = self.chunk_cache.get(i)
223             if not chunk:
224                 chunk = self.read_chunk(i)
225                 self.chunk_cache[i] = chunk
226
227         return chunk
228
229     def get_mempool_transaction(self, txid):
230         try:
231             raw_tx = self.bitcoind('getrawtransaction', [txid, 0])
232         except:
233             return None
234
235         vds = deserialize.BCDataStream()
236         vds.write(raw_tx.decode('hex'))
237         try:
238             return deserialize.parse_Transaction(vds, is_coinbase=False, is_coinstake=False)
239         except:
240             print_log("ERROR: cannot parse", txid)
241             return None
242
243
244     def get_history(self, addr, cache_only=False):
245         with self.cache_lock:
246             hist = self.history_cache.get(addr)
247         if hist is not None:
248             return hist
249         if cache_only:
250             return -1
251
252         with self.dblock:
253             try:
254                 hist = self.storage.get_history(addr)
255                 is_known = True
256             except:
257                 print_log("error get_history")
258                 traceback.print_exc(file=sys.stdout)
259                 raise
260             if hist:
261                 is_known = True
262             else:
263                 hist = []
264                 is_known = False
265
266         # add memory pool
267         with self.mempool_lock:
268             for txid, delta in self.mempool_hist.get(addr, []):
269                 hist.append({'tx_hash':txid, 'height':0})
270
271         # add something to distinguish between unused and empty addresses
272         if hist == [] and is_known:
273             hist = ['*']
274
275         with self.cache_lock:
276             self.history_cache[addr] = hist
277         return hist
278
279
280     def get_unconfirmed_value(self, addr):
281         v = 0
282         with self.mempool_lock:
283             for txid, delta in self.mempool_hist.get(addr, []):
284                 v += delta
285         return v
286
287
288     def get_status(self, addr, cache_only=False):
289         tx_points = self.get_history(addr, cache_only)
290         if cache_only and tx_points == -1:
291             return -1
292
293         if not tx_points:
294             return None
295         if tx_points == ['*']:
296             return '*'
297         status = ''
298         for tx in tx_points:
299             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
300         return hashlib.sha256(status).digest().encode('hex')
301
302     def get_merkle(self, tx_hash, height):
303
304         b = self.bitcoind('getblockbynumber', [height])
305         tx_list = b.get('tx')
306         tx_pos = tx_list.index(tx_hash)
307
308         merkle = map(hash_decode, tx_list)
309         target_hash = hash_decode(tx_hash)
310         s = []
311         while len(merkle) != 1:
312             if len(merkle) % 2:
313                 merkle.append(merkle[-1])
314             n = []
315             while merkle:
316                 new_hash = Hash(merkle[0] + merkle[1])
317                 if merkle[0] == target_hash:
318                     s.append(hash_encode(merkle[1]))
319                     target_hash = new_hash
320                 elif merkle[1] == target_hash:
321                     s.append(hash_encode(merkle[0]))
322                     target_hash = new_hash
323                 n.append(new_hash)
324                 merkle = merkle[2:]
325             merkle = n
326
327         return {"block_height": height, "merkle": s, "pos": tx_pos}
328
329
330     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
331         # keep it sorted
332         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
333         assert len(s) == 80
334
335         serialized_hist = self.batch_list[addr]
336
337         l = len(serialized_hist)/80
338         for i in range(l-1, -1, -1):
339             item = serialized_hist[80*i:80*(i+1)]
340             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
341             if item_height <= tx_height:
342                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
343                 break
344         else:
345             serialized_hist = s + serialized_hist
346
347         self.batch_list[addr] = serialized_hist
348
349         # backlink
350         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
351         self.batch_txio[txo] = addr
352
353
354
355
356
357
358     def deserialize_block(self, block):
359         is_stake_block = False
360         txlist = block.get('tx')
361         if "proof-of-stake" in block.get('flags'): # scan block flags list for
362             is_stake_block = True                  #    "proof-of-stake" substring
363
364         tx_hashes = []  # ordered txids
365         txdict = {}     # deserialized tx
366
367         for i in xrange(len(txlist)):
368             if is_stake_block and i == 0: # skip coinbase for
369                 continue                  #     stake block
370             tx_hash = hash_encode(Hash(txlist[i].decode('hex')))
371             vds = deserialize.BCDataStream()
372             vds.write(txlist[i].decode('hex'))
373             try:
374                 tx = deserialize.parse_Transaction(vds, i == 0, is_stake_block and i == 1) # first transaction is always coinbase
375             except:                                                                        # second transaction is coinstake if we have a stake block
376                 print_log("ERROR: cannot parse", tx_hash)
377                 continue
378             tx_hashes.append(tx_hash)
379             txdict[tx_hash] = tx
380         return tx_hashes, txdict
381
382
383
384     def import_block(self, block, block_hash, block_height, sync, revert=False):
385
386         touched_addr = set([])
387
388         # deserialize transactions
389         tx_hashes, txdict = self.deserialize_block(block)
390
391         # undo info
392         if revert:
393             undo_info = self.storage.get_undo_info(block_height)
394             tx_hashes.reverse()
395         else:
396             undo_info = {}
397
398         for txid in tx_hashes:  # must be ordered
399             tx = txdict[txid]
400             if not revert:
401                 undo = self.storage.import_transaction(txid, tx, block_height, touched_addr)
402                 undo_info[txid] = undo
403             else:
404                 undo = undo_info.pop(txid)
405                 self.storage.revert_transaction(txid, tx, block_height, touched_addr, undo)
406
407         if revert: 
408             assert undo_info == {}
409
410         # add undo info
411         if not revert:
412             self.storage.write_undo_info(block_height, self.bitcoind_height, undo_info)
413
414         # add the max
415         self.storage.db_undo.put('height', repr( (block_hash, block_height, self.storage.db_version) ))
416
417         for addr in touched_addr:
418             self.invalidate_cache(addr)
419
420         self.storage.update_hashes()
421
422
423     def add_request(self, session, request):
424         # see if we can get if from cache. if not, add to queue
425         if self.process(session, request, cache_only=True) == -1:
426             self.queue.put((session, request))
427
428
429     def do_subscribe(self, method, params, session):
430         with self.watch_lock:
431             if method == 'blockchain.numblocks.subscribe':
432                 if session not in self.watch_blocks:
433                     self.watch_blocks.append(session)
434
435             elif method == 'blockchain.headers.subscribe':
436                 if session not in self.watch_headers:
437                     self.watch_headers.append(session)
438
439             elif method == 'blockchain.address.subscribe':
440                 address = params[0]
441                 l = self.watched_addresses.get(address)
442                 if l is None:
443                     self.watched_addresses[address] = [session]
444                 elif session not in l:
445                     l.append(session)
446
447
448     def do_unsubscribe(self, method, params, session):
449         with self.watch_lock:
450             if method == 'blockchain.numblocks.subscribe':
451                 if session in self.watch_blocks:
452                     self.watch_blocks.remove(session)
453             elif method == 'blockchain.headers.subscribe':
454                 if session in self.watch_headers:
455                     self.watch_headers.remove(session)
456             elif method == "blockchain.address.subscribe":
457                 addr = params[0]
458                 l = self.watched_addresses.get(addr)
459                 if not l:
460                     return
461                 if session in l:
462                     l.remove(session)
463                 if session in l:
464                     print_log("error rc!!")
465                     self.shared.stop()
466                 if l == []:
467                     self.watched_addresses.pop(addr)
468
469
470     def process(self, session, request, cache_only=False):
471         
472         message_id = request['id']
473         method = request['method']
474         params = request.get('params', [])
475         result = None
476         error = None
477
478         if method == 'blockchain.numblocks.subscribe':
479             result = self.storage.height
480
481         elif method == 'blockchain.headers.subscribe':
482             result = self.header
483
484         elif method == 'blockchain.address.subscribe':
485             try:
486                 address = str(params[0])
487                 result = self.get_status(address, cache_only)
488             except BaseException, e:
489                 error = str(e) + ': ' + address
490                 print_log("error:", error)
491
492         elif method == 'blockchain.address.get_history':
493             try:
494                 address = str(params[0])
495                 result = self.get_history(address, cache_only)
496             except BaseException, e:
497                 error = str(e) + ': ' + address
498                 print_log("error:", error)
499
500         elif method == 'blockchain.address.get_mempool':
501             try:
502                 address = str(params[0])
503                 result = self.get_unconfirmed_history(address, cache_only)
504             except BaseException, e:
505                 error = str(e) + ': ' + address
506                 print_log("error:", error)
507
508         elif method == 'blockchain.address.get_balance':
509             try:
510                 address = str(params[0])
511                 confirmed = self.storage.get_balance(address)
512                 unconfirmed = self.get_unconfirmed_value(address)
513                 result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
514             except BaseException, e:
515                 error = str(e) + ': ' + address
516                 print_log("error:", error)
517
518         elif method == 'blockchain.address.get_proof':
519             try:
520                 address = str(params[0])
521                 result = self.storage.get_proof(address)
522             except BaseException, e:
523                 error = str(e) + ': ' + address
524                 print_log("error:", error)
525
526         elif method == 'blockchain.address.listunspent':
527             try:
528                 address = str(params[0])
529                 result = self.storage.listunspent(address)
530             except BaseException, e:
531                 error = str(e) + ': ' + address
532                 print_log("error:", error)
533
534         elif method == 'blockchain.utxo.get_address':
535             try:
536                 txid = str(params[0])
537                 pos = int(params[1])
538                 txi = (txid + int_to_hex(pos, 4)).decode('hex')
539                 result = self.storage.get_address(txi)
540             except BaseException, e:
541                 error = str(e)
542                 print_log("error:", error, params)
543
544         elif method == 'blockchain.block.get_header':
545             if cache_only:
546                 result = -1
547             else:
548                 try:
549                     height = int(params[0])
550                     result = self.get_header(height)
551                 except BaseException, e:
552                     error = str(e) + ': %d' % height
553                     print_log("error:", error)
554
555         elif method == 'blockchain.block.get_chunk':
556             if cache_only:
557                 result = -1
558             else:
559                 try:
560                     index = int(params[0])
561                     result = self.get_chunk(index)
562                 except BaseException, e:
563                     error = str(e) + ': %d' % index
564                     print_log("error:", error)
565
566         elif method == 'blockchain.transaction.broadcast':
567             try:
568                 txo = self.bitcoind('sendrawtransaction', params)
569                 print_log("sent tx:", txo)
570                 result = txo
571             except BaseException, e:
572                 result = str(e)  # do not send an error
573                 print_log("error:", result, params)
574
575         elif method == 'blockchain.transaction.get_merkle':
576             if cache_only:
577                 result = -1
578             else:
579                 try:
580                     tx_hash = params[0]
581                     tx_height = params[1]
582                     result = self.get_merkle(tx_hash, tx_height)
583                 except BaseException, e:
584                     error = str(e) + ': ' + repr(params)
585                     print_log("get_merkle error:", error)
586
587         elif method == 'blockchain.transaction.get':
588             try:
589                 tx_hash = params[0]
590                 result = self.bitcoind('getrawtransaction', [tx_hash, 0])
591             except BaseException, e:
592                 error = str(e) + ': ' + repr(params)
593                 print_log("tx get error:", error)
594
595         else:
596             error = "unknown method:%s" % method
597
598         if cache_only and result == -1:
599             return -1
600
601         if error:
602             self.push_response(session, {'id': message_id, 'error': error})
603         elif result != '':
604             self.push_response(session, {'id': message_id, 'result': result})
605
606     def catch_up(self, sync=True):
607
608         prev_root_hash = None
609         while not self.shared.stopped():
610
611             self.mtime('')
612
613             # are we done yet?
614             info = self.bitcoind('getinfo')
615             self.bitcoind_height = info.get('blocks')
616             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
617             if self.storage.last_hash == bitcoind_block_hash:
618                 self.up_to_date = True
619                 break
620
621             # not done..
622             self.up_to_date = False
623             next_block = self.bitcoind('getblockbynumber', [self.storage.height + 1, True])
624             next_block_hash = next_block.get('hash')
625             self.mtime('daemon')
626
627             # fixme: this is unsafe, if we revert when the undo info is not yet written
628             revert = (random.randint(1, 100) == 1) if self.test_reorgs else False
629
630             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
631
632                 prev_root_hash = self.storage.get_root_hash()
633
634                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
635                 self.storage.height = self.storage.height + 1
636                 self.write_header(self.block2header(next_block), sync)
637                 self.storage.last_hash = next_block_hash
638                 self.mtime('import')
639             
640                 if self.storage.height % 1000 == 0 and not sync:
641                     t_daemon = self.mtimes.get('daemon')
642                     t_import = self.mtimes.get('import')
643                     print_log("catch_up: block %d (%.3fs %.3fs)" % (self.storage.height, t_daemon, t_import), self.storage.get_root_hash().encode('hex'))
644                     self.mtimes['daemon'] = 0
645                     self.mtimes['import'] = 0
646
647             else:
648
649                 # revert current block
650                 block = self.bitcoind('getblock', [self.storage.last_hash, True])
651                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
652                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
653                 self.pop_header()
654                 self.flush_headers()
655
656                 self.storage.height -= 1
657
658                 # read previous header from disk
659                 self.header = self.read_header(self.storage.height)
660                 self.storage.last_hash = self.hash_header(self.header)
661
662                 if prev_root_hash:
663                     assert prev_root_hash == self.storage.get_root_hash()
664                     prev_root_hash = None
665
666
667         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
668         self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
669
670         if self.shared.stopped(): 
671             print_log( "closing database" )
672             self.storage.close()
673
674
675     def memorypool_update(self):
676         mempool_hashes = set(self.bitcoind('getrawmempool'))
677         touched_addresses = set([])
678
679         # get new transactions
680         new_tx = {}
681         for tx_hash in mempool_hashes:
682             if tx_hash in self.mempool_hashes:
683                 continue
684
685             tx = self.get_mempool_transaction(tx_hash)
686             if not tx:
687                 continue
688
689             new_tx[tx_hash] = tx
690             self.mempool_hashes.add(tx_hash)
691
692         # remove older entries from mempool_hashes
693         self.mempool_hashes = mempool_hashes
694
695
696         # check all tx outputs
697         for tx_hash, tx in new_tx.items():
698             mpa = self.mempool_addresses.get(tx_hash, {})
699             out_values = []
700             for x in tx.get('outputs'):
701                 out_values.append( x['value'] )
702
703                 addr = x.get('address')
704                 if not addr:
705                     continue
706                 v = mpa.get(addr,0)
707                 v += x['value']
708                 mpa[addr] = v
709                 touched_addresses.add(addr)
710
711             self.mempool_addresses[tx_hash] = mpa
712             self.mempool_values[tx_hash] = out_values
713
714         # check all inputs
715         for tx_hash, tx in new_tx.items():
716             mpa = self.mempool_addresses.get(tx_hash, {})
717             for x in tx.get('inputs'):
718                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
719                 addr = x.get('address')
720                 if not addr:
721                     continue
722
723                 v = self.mempool_values.get(x.get('prevout_hash'))
724                 if v:
725                     value = v[ x.get('prevout_n')]
726                 else:
727                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
728                     try:
729                         value = self.storage.get_utxo_value(addr,txi)
730                     except:
731                         print_log("utxo not in database; postponing mempool update")
732                         return
733
734                 v = mpa.get(addr,0)
735                 v -= value
736                 mpa[addr] = v
737                 touched_addresses.add(addr)
738
739             self.mempool_addresses[tx_hash] = mpa
740
741
742         # remove deprecated entries from mempool_addresses
743         for tx_hash, addresses in self.mempool_addresses.items():
744             if tx_hash not in self.mempool_hashes:
745                 self.mempool_addresses.pop(tx_hash)
746                 self.mempool_values.pop(tx_hash)
747                 for addr in addresses:
748                     touched_addresses.add(addr)
749
750         # rebuild mempool histories
751         new_mempool_hist = {}
752         for tx_hash, addresses in self.mempool_addresses.items():
753             for addr, delta in addresses.items():
754                 h = new_mempool_hist.get(addr, [])
755                 if tx_hash not in h:
756                     h.append((tx_hash, delta))
757                 new_mempool_hist[addr] = h
758
759         with self.mempool_lock:
760             self.mempool_hist = new_mempool_hist
761
762         # invalidate cache for touched addresses
763         for addr in touched_addresses:
764             self.invalidate_cache(addr)
765
766
767     def invalidate_cache(self, address):
768         with self.cache_lock:
769             if address in self.history_cache:
770                 print_log("cache: invalidating", address)
771                 self.history_cache.pop(address)
772
773         with self.watch_lock:
774             sessions = self.watched_addresses.get(address)
775
776         if sessions:
777             # TODO: update cache here. if new value equals cached value, do not send notification
778             self.address_queue.put((address,sessions))
779
780     
781     def close(self):
782         self.timer.join()
783         print_log("Closing database...")
784         self.storage.close()
785         print_log("Database is closed")
786
787
788     def main_iteration(self):
789         if self.shared.stopped():
790             print_log("Stopping timer")
791             return
792
793         with self.dblock:
794             t1 = time.time()
795             self.catch_up()
796             t2 = time.time()
797
798         self.memorypool_update()
799
800         if self.sent_height != self.storage.height:
801             self.sent_height = self.storage.height
802             for session in self.watch_blocks:
803                 self.push_response(session, {
804                         'id': None,
805                         'method': 'blockchain.numblocks.subscribe',
806                         'params': [self.storage.height],
807                         })
808
809         if self.sent_header != self.header:
810             print_log("blockchain: %d (%.3fs)" % (self.storage.height, t2 - t1))
811             self.sent_header = self.header
812             for session in self.watch_headers:
813                 self.push_response(session, {
814                         'id': None,
815                         'method': 'blockchain.headers.subscribe',
816                         'params': [self.header],
817                         })
818
819         while True:
820             try:
821                 addr, sessions = self.address_queue.get(False)
822             except:
823                 break
824
825             status = self.get_status(addr)
826             for session in sessions:
827                 self.push_response(session, {
828                         'id': None,
829                         'method': 'blockchain.address.subscribe',
830                         'params': [addr, status],
831                         })
832
833         # next iteration 
834         self.timer = threading.Timer(10, self.main_iteration)
835         self.timer.start()
836