stop if an exception occurs other than KeyError
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import leveldb
5 import os
6 from Queue import Queue
7 import random
8 import sys
9 import time
10 import threading
11 import traceback
12 import urllib
13
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class BlockchainProcessor(Processor):
20
21     def __init__(self, config, shared):
22         Processor.__init__(self)
23
24         self.shared = shared
25         self.config = config
26         self.up_to_date = False
27         self.watched_addresses = []
28         self.history_cache = {}
29         self.chunk_cache = {}
30         self.cache_lock = threading.Lock()
31         self.headers_data = ''
32
33         self.mempool_addresses = {}
34         self.mempool_hist = {}
35         self.mempool_hashes = []
36         self.mempool_lock = threading.Lock()
37
38         self.address_queue = Queue()
39         self.dbpath = config.get('leveldb', 'path')
40         self.pruning_limit = config.getint('leveldb', 'pruning_limit')
41         self.db_version = 1 # increase this when database needs to be updated
42
43         self.dblock = threading.Lock()
44         try:
45             self.db = leveldb.LevelDB(self.dbpath)
46         except:
47             traceback.print_exc(file=sys.stdout)
48             self.shared.stop()
49
50         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
51             config.get('bitcoind', 'user'),
52             config.get('bitcoind', 'password'),
53             config.get('bitcoind', 'host'),
54             config.get('bitcoind', 'port'))
55
56         self.height = 0
57         self.is_test = False
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('height'))
63             self.last_hash, self.height, db_version = hist[0]
64             print_log("Database version", self.db_version)
65             print_log("Blockchain height", self.height)
66         except:
67             traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
71             db_version = self.db_version
72
73         # check version
74         if self.db_version != db_version:
75             print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
76             self.shared.stop()
77             return
78
79         # catch_up headers
80         self.init_headers(self.height)
81
82         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
83         while not shared.stopped() and not self.up_to_date:
84             try:
85                 time.sleep(1)
86             except:
87                 print "keyboard interrupt: stopping threads"
88                 shared.stop()
89                 sys.exit(0)
90
91         print_log("blockchain is up to date.")
92
93         threading.Timer(10, self.main_iteration).start()
94
95     def bitcoind(self, method, params=[]):
96         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
97         try:
98             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
99         except:
100             traceback.print_exc(file=sys.stdout)
101             self.shared.stop()
102
103         r = loads(respdata)
104         if r['error'] is not None:
105             raise BaseException(r['error'])
106         return r.get('result')
107
108     def serialize(self, h):
109         s = ''
110         for txid, txpos, height in h:
111             s += self.serialize_item(txid, txpos, height)
112         return s
113
114     def serialize_item(self, txid, txpos, height, spent=chr(0)):
115         s = (txid + int_to_hex(txpos, 4) + int_to_hex(height, 3)).decode('hex') + spent 
116         return s
117
118     def deserialize_item(self,s):
119         txid = s[0:32].encode('hex')
120         txpos = int(rev_hex(s[32:36].encode('hex')), 16)
121         height = int(rev_hex(s[36:39].encode('hex')), 16)
122         spent = s[39:40]
123         return (txid, txpos, height, spent)
124
125     def deserialize(self, s):
126         h = []
127         while s:
128             txid, txpos, height, spent = self.deserialize_item(s[0:40])
129             h.append((txid, txpos, height))
130             if spent == chr(1):
131                 txid, txpos, height, spent = self.deserialize_item(s[40:80])
132                 h.append((txid, txpos, height))
133             s = s[80:]
134         return h
135
136     def block2header(self, b):
137         return {
138             "block_height": b.get('height'),
139             "version": b.get('version'),
140             "prev_block_hash": b.get('previousblockhash'),
141             "merkle_root": b.get('merkleroot'),
142             "timestamp": b.get('time'),
143             "bits": int(b.get('bits'), 16),
144             "nonce": b.get('nonce'),
145         }
146
147     def get_header(self, height):
148         block_hash = self.bitcoind('getblockhash', [height])
149         b = self.bitcoind('getblock', [block_hash])
150         return self.block2header(b)
151
152     def init_headers(self, db_height):
153         self.chunk_cache = {}
154         self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
155
156         if os.path.exists(self.headers_filename):
157             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
158             if height > 0:
159                 prev_hash = self.hash_header(self.read_header(height))
160             else:
161                 prev_hash = None
162         else:
163             open(self.headers_filename, 'wb').close()
164             prev_hash = None
165             height = -1
166
167         if height < db_height:
168             print_log("catching up missing headers:", height, db_height)
169
170         try:
171             while height < db_height:
172                 height = height + 1
173                 header = self.get_header(height)
174                 if height > 1:
175                     assert prev_hash == header.get('prev_block_hash')
176                 self.write_header(header, sync=False)
177                 prev_hash = self.hash_header(header)
178                 if (height % 1000) == 0:
179                     print_log("headers file:", height)
180         except KeyboardInterrupt:
181             self.flush_headers()
182             sys.exit()
183
184         self.flush_headers()
185
186     def hash_header(self, header):
187         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
188
189     def read_header(self, block_height):
190         if os.path.exists(self.headers_filename):
191             with open(self.headers_filename, 'rb') as f:
192                 f.seek(block_height * 80)
193                 h = f.read(80)
194             if len(h) == 80:
195                 h = header_from_string(h)
196                 return h
197
198     def read_chunk(self, index):
199         with open(self.headers_filename, 'rb') as f:
200             f.seek(index*2016*80)
201             chunk = f.read(2016*80)
202         return chunk.encode('hex')
203
204     def write_header(self, header, sync=True):
205         if not self.headers_data:
206             self.headers_offset = header.get('block_height')
207
208         self.headers_data += header_to_string(header).decode('hex')
209         if sync or len(self.headers_data) > 40*100:
210             self.flush_headers()
211
212         with self.cache_lock:
213             chunk_index = header.get('block_height')/2016
214             if self.chunk_cache.get(chunk_index):
215                 self.chunk_cache.pop(chunk_index)
216
217     def pop_header(self):
218         # we need to do this only if we have not flushed
219         if self.headers_data:
220             self.headers_data = self.headers_data[:-40]
221
222     def flush_headers(self):
223         if not self.headers_data:
224             return
225         with open(self.headers_filename, 'rb+') as f:
226             f.seek(self.headers_offset*80)
227             f.write(self.headers_data)
228         self.headers_data = ''
229
230     def get_chunk(self, i):
231         # store them on disk; store the current chunk in memory
232         with self.cache_lock:
233             chunk = self.chunk_cache.get(i)
234             if not chunk:
235                 chunk = self.read_chunk(i)
236                 self.chunk_cache[i] = chunk
237
238         return chunk
239
240     def get_mempool_transaction(self, txid):
241         try:
242             raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
243         except:
244             return None
245
246         vds = deserialize.BCDataStream()
247         vds.write(raw_tx.decode('hex'))
248
249         return deserialize.parse_Transaction(vds, is_coinbase=False)
250
251     def get_history(self, addr, cache_only=False):
252         with self.cache_lock:
253             hist = self.history_cache.get(addr)
254         if hist is not None:
255             return hist
256         if cache_only:
257             return -1
258
259         with self.dblock:
260             try:
261                 hist = self.deserialize(self.db.Get(addr))
262                 is_known = True
263             except:
264                 hist = []
265                 is_known = False
266
267         # sort history, because redeeming transactions are next to the corresponding txout
268         hist.sort(key=lambda tup: tup[2])
269
270         # add memory pool
271         with self.mempool_lock:
272             for txid in self.mempool_hist.get(addr, []):
273                 hist.append((txid, 0, 0))
274
275         # uniqueness
276         hist = set(map(lambda x: (x[0], x[2]), hist))
277
278         # convert to dict
279         hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
280
281         # add something to distinguish between unused and empty addresses
282         if hist == [] and is_known:
283             hist = ['*']
284
285         with self.cache_lock:
286             self.history_cache[addr] = hist
287         return hist
288
289     def get_status(self, addr, cache_only=False):
290         tx_points = self.get_history(addr, cache_only)
291         if cache_only and tx_points == -1:
292             return -1
293
294         if not tx_points:
295             return None
296         if tx_points == ['*']:
297             return '*'
298         status = ''
299         for tx in tx_points:
300             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
301         return hashlib.sha256(status).digest().encode('hex')
302
303     def get_merkle(self, tx_hash, height):
304
305         block_hash = self.bitcoind('getblockhash', [height])
306         b = self.bitcoind('getblock', [block_hash])
307         tx_list = b.get('tx')
308         tx_pos = tx_list.index(tx_hash)
309
310         merkle = map(hash_decode, tx_list)
311         target_hash = hash_decode(tx_hash)
312         s = []
313         while len(merkle) != 1:
314             if len(merkle) % 2:
315                 merkle.append(merkle[-1])
316             n = []
317             while merkle:
318                 new_hash = Hash(merkle[0] + merkle[1])
319                 if merkle[0] == target_hash:
320                     s.append(hash_encode(merkle[1]))
321                     target_hash = new_hash
322                 elif merkle[1] == target_hash:
323                     s.append(hash_encode(merkle[0]))
324                     target_hash = new_hash
325                 n.append(new_hash)
326                 merkle = merkle[2:]
327             merkle = n
328
329         return {"block_height": height, "merkle": s, "pos": tx_pos}
330
331
332     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
333         # keep it sorted
334         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
335         assert len(s) == 80
336
337         serialized_hist = self.batch_list[addr]
338
339         l = len(serialized_hist)/80
340         for i in range(l-1, -1, -1):
341             item = serialized_hist[80*i:80*(i+1)]
342             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
343             if item_height < tx_height:
344                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
345                 break
346         else:
347             serialized_hist = s + serialized_hist
348
349         self.batch_list[addr] = serialized_hist
350
351         # backlink
352         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
353         self.batch_txio[txo] = addr
354
355
356
357     def revert_add_to_history(self, addr, tx_hash, tx_pos, tx_height):
358
359         serialized_hist = self.batch_list[addr]
360         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
361         if serialized_hist.find(s) == -1: raise
362         serialized_hist = serialized_hist.replace(s, '')
363         self.batch_list[addr] = serialized_hist
364
365
366
367     def prune_history(self, addr, undo):
368         # remove items that have bit set to one
369         if undo.get(addr) is None: undo[addr] = []
370
371         serialized_hist = self.batch_list[addr]
372         l = len(serialized_hist)/80
373         for i in range(l):
374             if len(serialized_hist)/80 < self.pruning_limit: break
375             item = serialized_hist[80*i:80*(i+1)] 
376             if item[39:40] == chr(1):
377                 assert item[79:80] == chr(2)
378                 serialized_hist = serialized_hist[0:80*i] + serialized_hist[80*(i+1):]
379                 undo[addr].append(item)  # items are ordered
380         self.batch_list[addr] = serialized_hist
381
382
383     def revert_prune_history(self, addr, undo):
384         # restore removed items
385         serialized_hist = self.batch_list[addr]
386
387         if undo.get(addr) is not None: 
388             itemlist = undo.pop(addr)
389         else:
390             return 
391
392         if not itemlist: return
393
394         l = len(serialized_hist)/80
395         tx_item = ''
396         for i in range(l-1, -1, -1):
397             if tx_item == '':
398                 if not itemlist: 
399                     break
400                 else:
401                     tx_item = itemlist.pop(-1) # get the last element
402                     tx_height = int(rev_hex(tx_item[36:39].encode('hex')), 16)
403             
404             item = serialized_hist[80*i:80*(i+1)]
405             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
406
407             if item_height < tx_height:
408                 serialized_hist = serialized_hist[0:80*(i+1)] + tx_item + serialized_hist[80*(i+1):]
409                 tx_item = ''
410
411         else:
412             serialized_hist = ''.join(itemlist) + tx_item + serialized_hist
413
414         self.batch_list[addr] = serialized_hist
415
416
417     def set_spent_bit(self, addr, txi, is_spent, txid=None, index=None, height=None):
418         serialized_hist = self.batch_list[addr]
419         l = len(serialized_hist)/80
420         for i in range(l):
421             item = serialized_hist[80*i:80*(i+1)]
422             if item[0:36] == txi:
423                 if is_spent:
424                     new_item = item[0:39] + chr(1) + self.serialize_item(txid, index, height, chr(2))
425                 else:
426                     new_item = item[0:39] + chr(0) + chr(0)*40 
427                 serialized_hist = serialized_hist[0:80*i] + new_item + serialized_hist[80*(i+1):]
428                 break
429         else:
430             self.shared.stop()
431             hist = self.deserialize(serialized_hist)
432             raise BaseException("prevout not found", addr, hist, txi.encode('hex'))
433
434         self.batch_list[addr] = serialized_hist
435
436
437     def unset_spent_bit(self, addr, txi):
438         self.set_spent_bit(addr, txi, False)
439         self.batch_txio[txi] = addr
440
441
442     def deserialize_block(self, block):
443         txlist = block.get('tx')
444         tx_hashes = []  # ordered txids
445         txdict = {}     # deserialized tx
446         is_coinbase = True
447         for raw_tx in txlist:
448             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
449             tx_hashes.append(tx_hash)
450             vds = deserialize.BCDataStream()
451             vds.write(raw_tx.decode('hex'))
452             tx = deserialize.parse_Transaction(vds, is_coinbase)
453             txdict[tx_hash] = tx
454             is_coinbase = False
455         return tx_hashes, txdict
456
457     def get_undo_info(self, height):
458         s = self.db.Get("undo%d" % (height % 100))
459         return eval(s)
460
461     def write_undo_info(self, batch, height, undo_info):
462         if self.is_test or height > self.bitcoind_height - 100:
463             batch.Put("undo%d" % (height % 100), repr(undo_info))
464
465     def import_block(self, block, block_hash, block_height, sync, revert=False):
466
467         self.batch_list = {}  # address -> history
468         self.batch_txio = {}  # transaction i/o -> address
469
470         block_inputs = []
471         block_outputs = []
472         addr_to_read = []
473
474         # deserialize transactions
475         t0 = time.time()
476         tx_hashes, txdict = self.deserialize_block(block)
477
478         t00 = time.time()
479
480         # undo info
481         if revert:
482             undo_info = self.get_undo_info(block_height)
483         else:
484             undo_info = {}
485
486
487         if not revert:
488             # read addresses of tx inputs
489             for tx in txdict.values():
490                 for x in tx.get('inputs'):
491                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
492                     block_inputs.append(txi)
493
494             block_inputs.sort()
495             for txi in block_inputs:
496                 try:
497                     addr = self.db.Get(txi)
498                 except KeyError:
499                     # the input could come from the same block
500                     continue
501                 except:
502                     traceback.print_exc(file=sys.stdout)
503                     self.shared.stop()
504                     raise
505
506                 self.batch_txio[txi] = addr
507                 addr_to_read.append(addr)
508
509         else:
510             for txid, tx in txdict.items():
511                 for x in tx.get('outputs'):
512                     txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
513                     block_outputs.append(txo)
514                     addr_to_read.append( x.get('address') )
515
516                 undo = undo_info.get(txid)
517                 for i, x in enumerate(tx.get('inputs')):
518                     addr = undo['prev_addr'][i]
519                     addr_to_read.append(addr)
520
521
522
523
524
525         # read histories of addresses
526         for txid, tx in txdict.items():
527             for x in tx.get('outputs'):
528                 addr_to_read.append(x.get('address'))
529
530         addr_to_read.sort()
531         for addr in addr_to_read:
532             try:
533                 self.batch_list[addr] = self.db.Get(addr)
534             except KeyError:
535                 self.batch_list[addr] = ''
536             except:
537                 traceback.print_exc(file=sys.stdout)
538                 self.shared.stop()
539                 raise
540
541
542         # process
543         t1 = time.time()
544
545         if revert:
546             tx_hashes = tx_hashes[::-1]
547
548
549         for txid in tx_hashes:  # must be ordered
550             tx = txdict[txid]
551             if not revert:
552
553                 undo = { 'prev_addr':[] } # contains the list of pruned items for each address in the tx; also, 'prev_addr' is a list of prev addresses
554                 
555                 prev_addr = []
556                 for i, x in enumerate(tx.get('inputs')):
557                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
558                     addr = self.batch_txio[txi]
559
560                     # add redeem item to the history.
561                     # add it right next to the input txi? this will break history sorting, but it's ok if I neglect tx inputs during search
562                     self.set_spent_bit(addr, txi, True, txid, i, block_height)
563
564                     # when I prune, prune a pair
565                     self.prune_history(addr, undo)
566                     prev_addr.append(addr)
567
568                 undo['prev_addr'] = prev_addr 
569
570                 # here I add only the outputs to history; maybe I want to add inputs too (that's in the other loop)
571                 for x in tx.get('outputs'):
572                     addr = x.get('address')
573                     self.add_to_history(addr, txid, x.get('index'), block_height)
574                     self.prune_history(addr, undo)  # prune here because we increased the length of the history
575
576                 undo_info[txid] = undo
577
578             else:
579
580                 undo = undo_info.pop(txid)
581
582                 for x in tx.get('outputs'):
583                     addr = x.get('address')
584                     self.revert_prune_history(addr, undo)
585                     self.revert_add_to_history(addr, txid, x.get('index'), block_height)
586
587                 prev_addr = undo.pop('prev_addr')
588                 for i, x in enumerate(tx.get('inputs')):
589                     addr = prev_addr[i]
590                     self.revert_prune_history(addr, undo)
591                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
592                     self.unset_spent_bit(addr, txi)
593
594                 assert undo == {}
595
596         if revert: 
597             assert undo_info == {}
598
599
600         # write
601         max_len = 0
602         max_addr = ''
603         t2 = time.time()
604
605         batch = leveldb.WriteBatch()
606         for addr, serialized_hist in self.batch_list.items():
607             batch.Put(addr, serialized_hist)
608             l = len(serialized_hist)/80
609             if l > max_len:
610                 max_len = l
611                 max_addr = addr
612
613         if not revert:
614             # add new created outputs
615             for txio, addr in self.batch_txio.items():
616                 batch.Put(txio, addr)
617             # delete spent inputs
618             for txi in block_inputs:
619                 batch.Delete(txi)
620             # add undo info
621             self.write_undo_info(batch, block_height, undo_info)
622         else:
623             # restore spent inputs
624             for txio, addr in self.batch_txio.items():
625                 # print "restoring spent input", repr(txio)
626                 batch.Put(txio, addr)
627             # delete spent outputs
628             for txo in block_outputs:
629                 batch.Delete(txo)
630
631         # add the max
632         batch.Put('height', self.serialize([(block_hash, block_height, self.db_version)]))
633
634         # actual write
635         self.db.Write(batch, sync=sync)
636
637         t3 = time.time()
638         if t3 - t0 > 10 and not sync:
639             print_log("block", block_height,
640                       "parse:%0.2f " % (t00 - t0),
641                       "read:%0.2f " % (t1 - t00),
642                       "proc:%.2f " % (t2-t1),
643                       "write:%.2f " % (t3-t2),
644                       "max:", max_len, max_addr)
645
646         for addr in self.batch_list.keys():
647             self.invalidate_cache(addr)
648
649     def add_request(self, request):
650         # see if we can get if from cache. if not, add to queue
651         if self.process(request, cache_only=True) == -1:
652             self.queue.put(request)
653
654     def process(self, request, cache_only=False):
655         #print "abe process", request
656
657         message_id = request['id']
658         method = request['method']
659         params = request.get('params', [])
660         result = None
661         error = None
662
663         if method == 'blockchain.numblocks.subscribe':
664             result = self.height
665
666         elif method == 'blockchain.headers.subscribe':
667             result = self.header
668
669         elif method == 'blockchain.address.subscribe':
670             try:
671                 address = params[0]
672                 result = self.get_status(address, cache_only)
673                 self.watch_address(address)
674             except BaseException, e:
675                 error = str(e) + ': ' + address
676                 print_log("error:", error)
677
678         elif method == 'blockchain.address.unsubscribe':
679             try:
680                 password = params[0]
681                 address = params[1]
682                 if password == self.config.get('server', 'password'):
683                     self.watched_addresses.remove(address)
684                     # print_log('unsubscribed', address)
685                     result = "ok"
686                 else:
687                     print_log('incorrect password')
688                     result = "authentication error"
689             except BaseException, e:
690                 error = str(e) + ': ' + address
691                 print_log("error:", error)
692
693         elif method == 'blockchain.address.get_history':
694             try:
695                 address = params[0]
696                 result = self.get_history(address, cache_only)
697             except BaseException, e:
698                 error = str(e) + ': ' + address
699                 print_log("error:", error)
700
701         elif method == 'blockchain.block.get_header':
702             if cache_only:
703                 result = -1
704             else:
705                 try:
706                     height = params[0]
707                     result = self.get_header(height)
708                 except BaseException, e:
709                     error = str(e) + ': %d' % height
710                     print_log("error:", error)
711
712         elif method == 'blockchain.block.get_chunk':
713             if cache_only:
714                 result = -1
715             else:
716                 try:
717                     index = params[0]
718                     result = self.get_chunk(index)
719                 except BaseException, e:
720                     error = str(e) + ': %d' % index
721                     print_log("error:", error)
722
723         elif method == 'blockchain.transaction.broadcast':
724             try:
725                 txo = self.bitcoind('sendrawtransaction', params)
726                 print_log("sent tx:", txo)
727                 result = txo
728             except BaseException, e:
729                 result = str(e)  # do not send an error
730                 print_log("error:", result, params)
731
732         elif method == 'blockchain.transaction.get_merkle':
733             if cache_only:
734                 result = -1
735             else:
736                 try:
737                     tx_hash = params[0]
738                     tx_height = params[1]
739                     result = self.get_merkle(tx_hash, tx_height)
740                 except BaseException, e:
741                     error = str(e) + ': ' + repr(params)
742                     print_log("get_merkle error:", error)
743
744         elif method == 'blockchain.transaction.get':
745             try:
746                 tx_hash = params[0]
747                 height = params[1]
748                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
749             except BaseException, e:
750                 error = str(e) + ': ' + repr(params)
751                 print_log("tx get error:", error)
752
753         else:
754             error = "unknown method:%s" % method
755
756         if cache_only and result == -1:
757             return -1
758
759         if error:
760             self.push_response({'id': message_id, 'error': error})
761         elif result != '':
762             self.push_response({'id': message_id, 'result': result})
763
764     def watch_address(self, addr):
765         if addr not in self.watched_addresses:
766             self.watched_addresses.append(addr)
767
768     def catch_up(self, sync=True):
769         t1 = time.time()
770
771         while not self.shared.stopped():
772             # are we done yet?
773             info = self.bitcoind('getinfo')
774             self.bitcoind_height = info.get('blocks')
775             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
776             if self.last_hash == bitcoind_block_hash:
777                 self.up_to_date = True
778                 break
779
780             # not done..
781             self.up_to_date = False
782             next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
783             next_block = self.bitcoind('getblock', [next_block_hash, 1])
784
785             # fixme: this is unsafe, if we revert when the undo info is not yet written
786             revert = (random.randint(1, 100) == 1) if self.is_test else False
787
788             if (next_block.get('previousblockhash') == self.last_hash) and not revert:
789
790                 self.import_block(next_block, next_block_hash, self.height+1, sync)
791                 self.height = self.height + 1
792                 self.write_header(self.block2header(next_block), sync)
793                 self.last_hash = next_block_hash
794
795                 if self.height % 100 == 0 and not sync:
796                     t2 = time.time()
797                     print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
798                     t1 = t2
799
800             else:
801                 # revert current block
802                 block = self.bitcoind('getblock', [self.last_hash, 1])
803                 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
804                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
805                 self.pop_header()
806                 self.flush_headers()
807
808                 self.height -= 1
809
810                 # read previous header from disk
811                 self.header = self.read_header(self.height)
812                 self.last_hash = self.hash_header(self.header)
813
814         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
815
816     def memorypool_update(self):
817         mempool_hashes = self.bitcoind('getrawmempool')
818
819         for tx_hash in mempool_hashes:
820             if tx_hash in self.mempool_hashes:
821                 continue
822
823             tx = self.get_mempool_transaction(tx_hash)
824             if not tx:
825                 continue
826
827             for x in tx.get('inputs'):
828                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
829                 try:
830                     addr = self.db.Get(txi)
831                 except:
832                     continue
833                 l = self.mempool_addresses.get(tx_hash, [])
834                 if addr not in l:
835                     l.append(addr)
836                     self.mempool_addresses[tx_hash] = l
837
838             for x in tx.get('outputs'):
839                 addr = x.get('address')
840                 l = self.mempool_addresses.get(tx_hash, [])
841                 if addr not in l:
842                     l.append(addr)
843                     self.mempool_addresses[tx_hash] = l
844
845             self.mempool_hashes.append(tx_hash)
846
847         # remove older entries from mempool_hashes
848         self.mempool_hashes = mempool_hashes
849
850         # remove deprecated entries from mempool_addresses
851         for tx_hash, addresses in self.mempool_addresses.items():
852             if tx_hash not in self.mempool_hashes:
853                 self.mempool_addresses.pop(tx_hash)
854
855         # rebuild mempool histories
856         new_mempool_hist = {}
857         for tx_hash, addresses in self.mempool_addresses.items():
858             for addr in addresses:
859                 h = new_mempool_hist.get(addr, [])
860                 if tx_hash not in h:
861                     h.append(tx_hash)
862                 new_mempool_hist[addr] = h
863
864         # invalidate cache for mempool addresses whose mempool history has changed
865         new_mempool_hist_keys = new_mempool_hist.keys()
866         self_mempool_hist_keys = self.mempool_hist.keys()
867         
868         for addr in new_mempool_hist_keys:
869             if addr in self_mempool_hist_keys:
870                 if self.mempool_hist[addr] != new_mempool_hist[addr]:
871                     self.invalidate_cache(addr)
872             else:
873                 self.invalidate_cache(addr)
874
875         # invalidate cache for addresses that are removed from mempool ?
876         # this should not be necessary if they go into a block, but they might not
877         for addr in self_mempool_hist_keys:
878             if addr not in new_mempool_hist_keys:
879                 self.invalidate_cache(addr)
880         
881
882         with self.mempool_lock:
883             self.mempool_hist = new_mempool_hist
884
885     def invalidate_cache(self, address):
886         with self.cache_lock:
887             if address in self.history_cache:
888                 print_log("cache: invalidating", address)
889                 self.history_cache.pop(address)
890
891         if address in self.watched_addresses:
892             self.address_queue.put(address)
893
894     def main_iteration(self):
895         if self.shared.stopped():
896             print_log("blockchain processor terminating")
897             return
898
899         with self.dblock:
900             t1 = time.time()
901             self.catch_up()
902             t2 = time.time()
903
904         self.memorypool_update()
905         t3 = time.time()
906         # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
907
908         if self.sent_height != self.height:
909             self.sent_height = self.height
910             self.push_response({
911                 'id': None,
912                 'method': 'blockchain.numblocks.subscribe',
913                 'params': [self.height],
914             })
915
916         if self.sent_header != self.header:
917             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
918             self.sent_header = self.header
919             self.push_response({
920                 'id': None,
921                 'method': 'blockchain.headers.subscribe',
922                 'params': [self.header],
923             })
924
925         while True:
926             try:
927                 addr = self.address_queue.get(False)
928             except:
929                 break
930             if addr in self.watched_addresses:
931                 status = self.get_status(addr)
932                 self.push_response({
933                     'id': None,
934                     'method': 'blockchain.address.subscribe',
935                     'params': [addr, status],
936                 })
937
938         if not self.shared.stopped():
939             threading.Timer(10, self.main_iteration).start()
940         else:
941             print_log("blockchain processor terminating")