new database format, that handles p2sh addresses
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import leveldb
5 import os
6 from Queue import Queue
7 import random
8 import sys
9 import time
10 import threading
11 import traceback
12 import urllib
13
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class BlockchainProcessor(Processor):
20
21     def __init__(self, config, shared):
22         Processor.__init__(self)
23
24         self.shared = shared
25         self.config = config
26         self.up_to_date = False
27         self.watched_addresses = []
28         self.history_cache = {}
29         self.chunk_cache = {}
30         self.cache_lock = threading.Lock()
31         self.headers_data = ''
32
33         self.mempool_addresses = {}
34         self.mempool_hist = {}
35         self.mempool_hashes = []
36         self.mempool_lock = threading.Lock()
37
38         self.address_queue = Queue()
39         self.dbpath = config.get('leveldb', 'path')
40
41         self.dblock = threading.Lock()
42         try:
43             self.db = leveldb.LevelDB(self.dbpath)
44         except:
45             traceback.print_exc(file=sys.stdout)
46             self.shared.stop()
47
48         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
49             config.get('bitcoind', 'user'),
50             config.get('bitcoind', 'password'),
51             config.get('bitcoind', 'host'),
52             config.get('bitcoind', 'port'))
53
54         self.height = 0
55         self.is_test = False
56         self.sent_height = 0
57         self.sent_header = None
58
59         try:
60             hash_160 = bc_address_to_hash_160("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa")
61             self.db.Get(hash_160)
62             print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
63             self.shared.stop()
64             return
65         except:
66             pass
67
68         try:
69             hist = self.deserialize(self.db.Get('height'))
70             self.last_hash, self.height, _ = hist[0]
71             print_log("hist", hist)
72         except:
73             #traceback.print_exc(file=sys.stdout)
74             print_log('initializing database')
75             self.height = 0
76             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
77
78         # catch_up headers
79         self.init_headers(self.height)
80
81         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
82         while not shared.stopped() and not self.up_to_date:
83             try:
84                 time.sleep(1)
85             except:
86                 print "keyboard interrupt: stopping threads"
87                 shared.stop()
88                 sys.exit(0)
89
90         print_log("blockchain is up to date.")
91
92         threading.Timer(10, self.main_iteration).start()
93
94     def bitcoind(self, method, params=[]):
95         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
96         try:
97             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
98         except:
99             traceback.print_exc(file=sys.stdout)
100             self.shared.stop()
101
102         r = loads(respdata)
103         if r['error'] is not None:
104             raise BaseException(r['error'])
105         return r.get('result')
106
107     def serialize(self, h):
108         s = ''
109         for txid, txpos, height in h:
110             s += txid + int_to_hex(txpos, 4) + int_to_hex(height, 4)
111         return s.decode('hex')
112
113     def deserialize(self, s):
114         h = []
115         while s:
116             txid = s[0:32].encode('hex')
117             txpos = int(rev_hex(s[32:36].encode('hex')), 16)
118             height = int(rev_hex(s[36:40].encode('hex')), 16)
119             h.append((txid, txpos, height))
120             s = s[40:]
121         return h
122
123     def block2header(self, b):
124         return {
125             "block_height": b.get('height'),
126             "version": b.get('version'),
127             "prev_block_hash": b.get('previousblockhash'),
128             "merkle_root": b.get('merkleroot'),
129             "timestamp": b.get('time'),
130             "bits": int(b.get('bits'), 16),
131             "nonce": b.get('nonce'),
132         }
133
134     def get_header(self, height):
135         block_hash = self.bitcoind('getblockhash', [height])
136         b = self.bitcoind('getblock', [block_hash])
137         return self.block2header(b)
138
139     def init_headers(self, db_height):
140         self.chunk_cache = {}
141         self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
142
143         if os.path.exists(self.headers_filename):
144             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
145             if height > 0:
146                 prev_hash = self.hash_header(self.read_header(height))
147             else:
148                 prev_hash = None
149         else:
150             open(self.headers_filename, 'wb').close()
151             prev_hash = None
152             height = -1
153
154         if height < db_height:
155             print_log("catching up missing headers:", height, db_height)
156
157         try:
158             while height < db_height:
159                 height = height + 1
160                 header = self.get_header(height)
161                 if height > 1:
162                     assert prev_hash == header.get('prev_block_hash')
163                 self.write_header(header, sync=False)
164                 prev_hash = self.hash_header(header)
165                 if (height % 1000) == 0:
166                     print_log("headers file:", height)
167         except KeyboardInterrupt:
168             self.flush_headers()
169             sys.exit()
170
171         self.flush_headers()
172
173     def hash_header(self, header):
174         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
175
176     def read_header(self, block_height):
177         if os.path.exists(self.headers_filename):
178             with open(self.headers_filename, 'rb') as f:
179                 f.seek(block_height * 80)
180                 h = f.read(80)
181             if len(h) == 80:
182                 h = header_from_string(h)
183                 return h
184
185     def read_chunk(self, index):
186         with open(self.headers_filename, 'rb') as f:
187             f.seek(index*2016*80)
188             chunk = f.read(2016*80)
189         return chunk.encode('hex')
190
191     def write_header(self, header, sync=True):
192         if not self.headers_data:
193             self.headers_offset = header.get('block_height')
194
195         self.headers_data += header_to_string(header).decode('hex')
196         if sync or len(self.headers_data) > 40*100:
197             self.flush_headers()
198
199         with self.cache_lock:
200             chunk_index = header.get('block_height')/2016
201             if self.chunk_cache.get(chunk_index):
202                 self.chunk_cache.pop(chunk_index)
203
204     def pop_header(self):
205         # we need to do this only if we have not flushed
206         if self.headers_data:
207             self.headers_data = self.headers_data[:-40]
208
209     def flush_headers(self):
210         if not self.headers_data:
211             return
212         with open(self.headers_filename, 'rb+') as f:
213             f.seek(self.headers_offset*80)
214             f.write(self.headers_data)
215         self.headers_data = ''
216
217     def get_chunk(self, i):
218         # store them on disk; store the current chunk in memory
219         with self.cache_lock:
220             chunk = self.chunk_cache.get(i)
221             if not chunk:
222                 chunk = self.read_chunk(i)
223                 self.chunk_cache[i] = chunk
224
225         return chunk
226
227     def get_mempool_transaction(self, txid):
228         try:
229             raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
230         except:
231             return None
232
233         vds = deserialize.BCDataStream()
234         vds.write(raw_tx.decode('hex'))
235
236         return deserialize.parse_Transaction(vds, is_coinbase=False)
237
238     def get_history(self, addr, cache_only=False):
239         with self.cache_lock:
240             hist = self.history_cache.get(addr)
241         if hist is not None:
242             return hist
243         if cache_only:
244             return -1
245
246         with self.dblock:
247             try:
248                 hist = self.deserialize(self.db.Get(addr))
249                 is_known = True
250             except:
251                 hist = []
252                 is_known = False
253
254         # should not be necessary
255         hist.sort(key=lambda tup: tup[2])
256         # check uniqueness too...
257
258         # add memory pool
259         with self.mempool_lock:
260             for txid in self.mempool_hist.get(addr, []):
261                 hist.append((txid, 0, 0))
262
263         hist = map(lambda x: {'tx_hash': x[0], 'height': x[2]}, hist)
264         # add something to distinguish between unused and empty addresses
265         if hist == [] and is_known:
266             hist = ['*']
267
268         with self.cache_lock:
269             self.history_cache[addr] = hist
270         return hist
271
272     def get_status(self, addr, cache_only=False):
273         tx_points = self.get_history(addr, cache_only)
274         if cache_only and tx_points == -1:
275             return -1
276
277         if not tx_points:
278             return None
279         if tx_points == ['*']:
280             return '*'
281         status = ''
282         for tx in tx_points:
283             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
284         return hashlib.sha256(status).digest().encode('hex')
285
286     def get_merkle(self, tx_hash, height):
287
288         block_hash = self.bitcoind('getblockhash', [height])
289         b = self.bitcoind('getblock', [block_hash])
290         tx_list = b.get('tx')
291         tx_pos = tx_list.index(tx_hash)
292
293         merkle = map(hash_decode, tx_list)
294         target_hash = hash_decode(tx_hash)
295         s = []
296         while len(merkle) != 1:
297             if len(merkle) % 2:
298                 merkle.append(merkle[-1])
299             n = []
300             while merkle:
301                 new_hash = Hash(merkle[0] + merkle[1])
302                 if merkle[0] == target_hash:
303                     s.append(hash_encode(merkle[1]))
304                     target_hash = new_hash
305                 elif merkle[1] == target_hash:
306                     s.append(hash_encode(merkle[0]))
307                     target_hash = new_hash
308                 n.append(new_hash)
309                 merkle = merkle[2:]
310             merkle = n
311
312         return {"block_height": height, "merkle": s, "pos": tx_pos}
313
314     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
315         # keep it sorted
316         s = (tx_hash + int_to_hex(tx_pos, 4) + int_to_hex(tx_height, 4)).decode('hex')
317
318         serialized_hist = self.batch_list[addr]
319
320         l = len(serialized_hist)/40
321         for i in range(l-1, -1, -1):
322             item = serialized_hist[40*i:40*(i+1)]
323             item_height = int(rev_hex(item[36:40].encode('hex')), 16)
324             if item_height < tx_height:
325                 serialized_hist = serialized_hist[0:40*(i+1)] + s + serialized_hist[40*(i+1):]
326                 break
327         else:
328             serialized_hist = s + serialized_hist
329
330         self.batch_list[addr] = serialized_hist
331
332         # backlink
333         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
334         self.batch_txio[txo] = addr
335
336     def remove_from_history(self, addr, tx_hash, tx_pos):
337         txi = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
338
339         if addr is None:
340             try:
341                 addr = self.batch_txio[txi]
342             except:
343                 raise BaseException(tx_hash, tx_pos)
344
345         serialized_hist = self.batch_list[addr]
346
347         l = len(serialized_hist)/40
348         for i in range(l):
349             item = serialized_hist[40*i:40*(i+1)]
350             if item[0:36] == txi:
351                 height = int(rev_hex(item[36:40].encode('hex')), 16)
352                 serialized_hist = serialized_hist[0:40*i] + serialized_hist[40*(i+1):]
353                 break
354         else:
355             hist = self.deserialize(serialized_hist)
356             raise BaseException("prevout not found", addr, hist, tx_hash, tx_pos)
357
358         self.batch_list[addr] = serialized_hist
359         return height, addr
360
361     def deserialize_block(self, block):
362         txlist = block.get('tx')
363         tx_hashes = []  # ordered txids
364         txdict = {}     # deserialized tx
365         is_coinbase = True
366         for raw_tx in txlist:
367             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
368             tx_hashes.append(tx_hash)
369             vds = deserialize.BCDataStream()
370             vds.write(raw_tx.decode('hex'))
371             tx = deserialize.parse_Transaction(vds, is_coinbase)
372             txdict[tx_hash] = tx
373             is_coinbase = False
374         return tx_hashes, txdict
375
376     def get_undo_info(self, height):
377         s = self.db.Get("undo%d" % (height % 100))
378         return eval(s)
379
380     def write_undo_info(self, batch, height, undo_info):
381         if self.is_test or height > self.bitcoind_height - 100:
382             batch.Put("undo%d" % (height % 100), repr(undo_info))
383
384     def import_block(self, block, block_hash, block_height, sync, revert=False):
385
386         self.batch_list = {}  # address -> history
387         self.batch_txio = {}  # transaction i/o -> address
388
389         block_inputs = []
390         block_outputs = []
391         addr_to_read = []
392
393         # deserialize transactions
394         t0 = time.time()
395         tx_hashes, txdict = self.deserialize_block(block)
396
397         t00 = time.time()
398
399         if not revert:
400             # read addresses of tx inputs
401             for tx in txdict.values():
402                 for x in tx.get('inputs'):
403                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
404                     block_inputs.append(txi)
405
406             block_inputs.sort()
407             for txi in block_inputs:
408                 try:
409                     addr = self.db.Get(txi)
410                 except:
411                     # the input could come from the same block
412                     continue
413                 self.batch_txio[txi] = addr
414                 addr_to_read.append(addr)
415
416         else:
417             for txid, tx in txdict.items():
418                 for x in tx.get('outputs'):
419                     txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
420                     block_outputs.append(txo)
421
422         # read histories of addresses
423         for txid, tx in txdict.items():
424             for x in tx.get('outputs'):
425                 addr_to_read.append(x.get('address'))
426
427         addr_to_read.sort()
428         for addr in addr_to_read:
429             try:
430                 self.batch_list[addr] = self.db.Get(addr)
431             except:
432                 self.batch_list[addr] = ''
433
434         if revert:
435             undo_info = self.get_undo_info(block_height)
436             # print "undo", block_height, undo_info
437         else:
438             undo_info = {}
439
440         # process
441         t1 = time.time()
442
443         if revert:
444             tx_hashes = tx_hashes[::-1]
445         for txid in tx_hashes:  # must be ordered
446             tx = txdict[txid]
447             if not revert:
448
449                 undo = []
450                 for x in tx.get('inputs'):
451                     prevout_height, prevout_addr = self.remove_from_history(None, x.get('prevout_hash'), x.get('prevout_n'))
452                     undo.append((prevout_height, prevout_addr))
453                 undo_info[txid] = undo
454
455                 for x in tx.get('outputs'):
456                     self.add_to_history(x.get('address'), txid, x.get('index'), block_height)
457
458             else:
459                 for x in tx.get('outputs'):
460                     self.remove_from_history(x.get('address'), txid, x.get('index'))
461
462                 i = 0
463                 for x in tx.get('inputs'):
464                     prevout_height, prevout_addr = undo_info.get(txid)[i]
465                     i += 1
466
467                     # read the history into batch list
468                     if self.batch_list.get(prevout_addr) is None:
469                         self.batch_list[prevout_addr] = self.db.Get(prevout_addr)
470
471                     # re-add them to the history
472                     self.add_to_history(prevout_addr, x.get('prevout_hash'), x.get('prevout_n'), prevout_height)
473                     # print_log("new hist for", prevout_addr, self.deserialize(self.batch_list[prevout_addr]) )
474
475         # write
476         max_len = 0
477         max_addr = ''
478         t2 = time.time()
479
480         batch = leveldb.WriteBatch()
481         for addr, serialized_hist in self.batch_list.items():
482             batch.Put(addr, serialized_hist)
483             l = len(serialized_hist)
484             if l > max_len:
485                 max_len = l
486                 max_addr = addr
487
488         if not revert:
489             # add new created outputs
490             for txio, addr in self.batch_txio.items():
491                 batch.Put(txio, addr)
492             # delete spent inputs
493             for txi in block_inputs:
494                 batch.Delete(txi)
495             # add undo info
496             self.write_undo_info(batch, block_height, undo_info)
497         else:
498             # restore spent inputs
499             for txio, addr in self.batch_txio.items():
500                 batch.Put(txio, addr)
501             # delete spent outputs
502             for txo in block_outputs:
503                 batch.Delete(txo)
504
505         # add the max
506         batch.Put('height', self.serialize([(block_hash, block_height, 0)]))
507
508         # actual write
509         self.db.Write(batch, sync=sync)
510
511         t3 = time.time()
512         if t3 - t0 > 10 and not sync:
513             print_log("block", block_height,
514                       "parse:%0.2f " % (t00 - t0),
515                       "read:%0.2f " % (t1 - t00),
516                       "proc:%.2f " % (t2-t1),
517                       "write:%.2f " % (t3-t2),
518                       "max:", max_len, max_addr)
519
520         for addr in self.batch_list.keys():
521             self.invalidate_cache(addr)
522
523     def add_request(self, request):
524         # see if we can get if from cache. if not, add to queue
525         if self.process(request, cache_only=True) == -1:
526             self.queue.put(request)
527
528     def process(self, request, cache_only=False):
529         #print "abe process", request
530
531         message_id = request['id']
532         method = request['method']
533         params = request.get('params', [])
534         result = None
535         error = None
536
537         if method == 'blockchain.numblocks.subscribe':
538             result = self.height
539
540         elif method == 'blockchain.headers.subscribe':
541             result = self.header
542
543         elif method == 'blockchain.address.subscribe':
544             try:
545                 address = params[0]
546                 result = self.get_status(address, cache_only)
547                 self.watch_address(address)
548             except BaseException, e:
549                 error = str(e) + ': ' + address
550                 print_log("error:", error)
551
552         elif method == 'blockchain.address.unsubscribe':
553             try:
554                 password = params[0]
555                 address = params[1]
556                 if password == self.config.get('server', 'password'):
557                     self.watched_addresses.remove(address)
558                     # print_log('unsubscribed', address)
559                     result = "ok"
560                 else:
561                     print_log('incorrect password')
562                     result = "authentication error"
563             except BaseException, e:
564                 error = str(e) + ': ' + address
565                 print_log("error:", error)
566
567         elif method == 'blockchain.address.get_history':
568             try:
569                 address = params[0]
570                 result = self.get_history(address, cache_only)
571             except BaseException, e:
572                 error = str(e) + ': ' + address
573                 print_log("error:", error)
574
575         elif method == 'blockchain.block.get_header':
576             if cache_only:
577                 result = -1
578             else:
579                 try:
580                     height = params[0]
581                     result = self.get_header(height)
582                 except BaseException, e:
583                     error = str(e) + ': %d' % height
584                     print_log("error:", error)
585
586         elif method == 'blockchain.block.get_chunk':
587             if cache_only:
588                 result = -1
589             else:
590                 try:
591                     index = params[0]
592                     result = self.get_chunk(index)
593                 except BaseException, e:
594                     error = str(e) + ': %d' % index
595                     print_log("error:", error)
596
597         elif method == 'blockchain.transaction.broadcast':
598             try:
599                 txo = self.bitcoind('sendrawtransaction', params)
600                 print_log("sent tx:", txo)
601                 result = txo
602             except BaseException, e:
603                 result = str(e)  # do not send an error
604                 print_log("error:", result, params)
605
606         elif method == 'blockchain.transaction.get_merkle':
607             if cache_only:
608                 result = -1
609             else:
610                 try:
611                     tx_hash = params[0]
612                     tx_height = params[1]
613                     result = self.get_merkle(tx_hash, tx_height)
614                 except BaseException, e:
615                     error = str(e) + ': ' + repr(params)
616                     print_log("get_merkle error:", error)
617
618         elif method == 'blockchain.transaction.get':
619             try:
620                 tx_hash = params[0]
621                 height = params[1]
622                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
623             except BaseException, e:
624                 error = str(e) + ': ' + repr(params)
625                 print_log("tx get error:", error)
626
627         else:
628             error = "unknown method:%s" % method
629
630         if cache_only and result == -1:
631             return -1
632
633         if error:
634             self.push_response({'id': message_id, 'error': error})
635         elif result != '':
636             self.push_response({'id': message_id, 'result': result})
637
638     def watch_address(self, addr):
639         if addr not in self.watched_addresses:
640             self.watched_addresses.append(addr)
641
642     def catch_up(self, sync=True):
643         t1 = time.time()
644
645         while not self.shared.stopped():
646             # are we done yet?
647             info = self.bitcoind('getinfo')
648             self.bitcoind_height = info.get('blocks')
649             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
650             if self.last_hash == bitcoind_block_hash:
651                 self.up_to_date = True
652                 break
653
654             # not done..
655             self.up_to_date = False
656             next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
657             next_block = self.bitcoind('getblock', [next_block_hash, 1])
658
659             # fixme: this is unsafe, if we revert when the undo info is not yet written
660             revert = (random.randint(1, 100) == 1) if self.is_test else False
661
662             if (next_block.get('previousblockhash') == self.last_hash) and not revert:
663
664                 self.import_block(next_block, next_block_hash, self.height+1, sync)
665                 self.height = self.height + 1
666                 self.write_header(self.block2header(next_block), sync)
667                 self.last_hash = next_block_hash
668
669                 if self.height % 100 == 0 and not sync:
670                     t2 = time.time()
671                     print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
672                     t1 = t2
673
674             else:
675                 # revert current block
676                 block = self.bitcoind('getblock', [self.last_hash, 1])
677                 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
678                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
679                 self.pop_header()
680                 self.flush_headers()
681
682                 self.height -= 1
683
684                 # read previous header from disk
685                 self.header = self.read_header(self.height)
686                 self.last_hash = self.hash_header(self.header)
687
688         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
689
690     def memorypool_update(self):
691         mempool_hashes = self.bitcoind('getrawmempool')
692
693         for tx_hash in mempool_hashes:
694             if tx_hash in self.mempool_hashes:
695                 continue
696
697             tx = self.get_mempool_transaction(tx_hash)
698             if not tx:
699                 continue
700
701             for x in tx.get('inputs'):
702                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
703                 try:
704                     addr = self.db.Get(txi)
705                 except:
706                     continue
707                 l = self.mempool_addresses.get(tx_hash, [])
708                 if addr not in l:
709                     l.append(addr)
710                     self.mempool_addresses[tx_hash] = l
711
712             for x in tx.get('outputs'):
713                 addr = x.get('address')
714                 l = self.mempool_addresses.get(tx_hash, [])
715                 if addr not in l:
716                     l.append(addr)
717                     self.mempool_addresses[tx_hash] = l
718
719             self.mempool_hashes.append(tx_hash)
720
721         # remove older entries from mempool_hashes
722         self.mempool_hashes = mempool_hashes
723
724         # remove deprecated entries from mempool_addresses
725         for tx_hash, addresses in self.mempool_addresses.items():
726             if tx_hash not in self.mempool_hashes:
727                 self.mempool_addresses.pop(tx_hash)
728
729         # rebuild mempool histories
730         new_mempool_hist = {}
731         for tx_hash, addresses in self.mempool_addresses.items():
732             for addr in addresses:
733                 h = new_mempool_hist.get(addr, [])
734                 if tx_hash not in h:
735                     h.append(tx_hash)
736                 new_mempool_hist[addr] = h
737
738         # invalidate cache for mempool addresses whose mempool history has changed
739         for addr in new_mempool_hist.keys():
740             if addr in self.mempool_hist.keys():
741                 if self.mempool_hist[addr] != new_mempool_hist[addr]:
742                     self.invalidate_cache(addr)
743             else:
744                 self.invalidate_cache(addr)
745
746         # invalidate cache for addresses that are removed from mempool ?
747         # this should not be necessary if they go into a block, but they might not
748         for addr in self.mempool_hist.keys():
749             if addr not in new_mempool_hist.keys():
750                 self.invalidate_cache(addr)
751         
752
753         with self.mempool_lock:
754             self.mempool_hist = new_mempool_hist
755
756     def invalidate_cache(self, address):
757         with self.cache_lock:
758             if address in self.history_cache:
759                 print_log("cache: invalidating", address)
760                 self.history_cache.pop(address)
761
762         if address in self.watched_addresses:
763             self.address_queue.put(address)
764
765     def main_iteration(self):
766         if self.shared.stopped():
767             print_log("blockchain processor terminating")
768             return
769
770         with self.dblock:
771             t1 = time.time()
772             self.catch_up()
773             t2 = time.time()
774
775         self.memorypool_update()
776         t3 = time.time()
777         # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
778
779         if self.sent_height != self.height:
780             self.sent_height = self.height
781             self.push_response({
782                 'id': None,
783                 'method': 'blockchain.numblocks.subscribe',
784                 'params': [self.height],
785             })
786
787         if self.sent_header != self.header:
788             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
789             self.sent_header = self.header
790             self.push_response({
791                 'id': None,
792                 'method': 'blockchain.headers.subscribe',
793                 'params': [self.header],
794             })
795
796         while True:
797             try:
798                 addr = self.address_queue.get(False)
799             except:
800                 break
801             if addr in self.watched_addresses:
802                 status = self.get_status(addr)
803                 self.push_response({
804                     'id': None,
805                     'method': 'blockchain.address.subscribe',
806                     'params': [addr, status],
807                 })
808
809         if not self.shared.stopped():
810             threading.Timer(10, self.main_iteration).start()
811         else:
812             print_log("blockchain processor terminating")