semi-pruning server
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import leveldb
5 import os
6 from Queue import Queue
7 import random
8 import sys
9 import time
10 import threading
11 import traceback
12 import urllib
13
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class BlockchainProcessor(Processor):
20
21     def __init__(self, config, shared):
22         Processor.__init__(self)
23
24         self.shared = shared
25         self.config = config
26         self.up_to_date = False
27         self.watched_addresses = []
28         self.history_cache = {}
29         self.chunk_cache = {}
30         self.cache_lock = threading.Lock()
31         self.headers_data = ''
32
33         self.mempool_addresses = {}
34         self.mempool_hist = {}
35         self.mempool_hashes = []
36         self.mempool_lock = threading.Lock()
37
38         self.address_queue = Queue()
39         self.dbpath = config.get('leveldb', 'path')
40         self.pruning_limit = config.getint('leveldb', 'pruning_limit')
41         self.db_version = 1 # increase this when database needs to be updated
42
43         self.dblock = threading.Lock()
44         try:
45             self.db = leveldb.LevelDB(self.dbpath)
46         except:
47             traceback.print_exc(file=sys.stdout)
48             self.shared.stop()
49
50         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
51             config.get('bitcoind', 'user'),
52             config.get('bitcoind', 'password'),
53             config.get('bitcoind', 'host'),
54             config.get('bitcoind', 'port'))
55
56         self.height = 0
57         self.is_test = False
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('height'))
63             self.last_hash, self.height, db_version = hist[0]
64             print_log("Database version", self.db_version)
65             print_log("Blockchain height", self.height)
66         except:
67             traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
71
72         # check version
73         if self.db_version != db_version:
74             print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
75             self.shared.stop()
76             return
77
78         # catch_up headers
79         self.init_headers(self.height)
80
81         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
82         while not shared.stopped() and not self.up_to_date:
83             try:
84                 time.sleep(1)
85             except:
86                 print "keyboard interrupt: stopping threads"
87                 shared.stop()
88                 sys.exit(0)
89
90         print_log("blockchain is up to date.")
91
92         threading.Timer(10, self.main_iteration).start()
93
94     def bitcoind(self, method, params=[]):
95         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
96         try:
97             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
98         except:
99             traceback.print_exc(file=sys.stdout)
100             self.shared.stop()
101
102         r = loads(respdata)
103         if r['error'] is not None:
104             raise BaseException(r['error'])
105         return r.get('result')
106
107     def serialize(self, h):
108         s = ''
109         for txid, txpos, height in h:
110             s += self.serialize_item(txid, txpos, height)
111         return s
112
113     def serialize_item(self, txid, txpos, height, spent=chr(0)):
114         s = (txid + int_to_hex(txpos, 4) + int_to_hex(height, 3)).decode('hex') + spent 
115         return s
116
117     def deserialize_item(self,s):
118         txid = s[0:32].encode('hex')
119         txpos = int(rev_hex(s[32:36].encode('hex')), 16)
120         height = int(rev_hex(s[36:39].encode('hex')), 16)
121         spent = s[39:40]
122         return (txid, txpos, height, spent)
123
124     def deserialize(self, s):
125         h = []
126         while s:
127             txid, txpos, height, spent = self.deserialize_item(s[0:40])
128             h.append((txid, txpos, height))
129             if spent == chr(1):
130                 txid, txpos, height, spent = self.deserialize_item(s[40:80])
131                 h.append((txid, txpos, height))
132             s = s[80:]
133         return h
134
135     def block2header(self, b):
136         return {
137             "block_height": b.get('height'),
138             "version": b.get('version'),
139             "prev_block_hash": b.get('previousblockhash'),
140             "merkle_root": b.get('merkleroot'),
141             "timestamp": b.get('time'),
142             "bits": int(b.get('bits'), 16),
143             "nonce": b.get('nonce'),
144         }
145
146     def get_header(self, height):
147         block_hash = self.bitcoind('getblockhash', [height])
148         b = self.bitcoind('getblock', [block_hash])
149         return self.block2header(b)
150
151     def init_headers(self, db_height):
152         self.chunk_cache = {}
153         self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
154
155         if os.path.exists(self.headers_filename):
156             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
157             if height > 0:
158                 prev_hash = self.hash_header(self.read_header(height))
159             else:
160                 prev_hash = None
161         else:
162             open(self.headers_filename, 'wb').close()
163             prev_hash = None
164             height = -1
165
166         if height < db_height:
167             print_log("catching up missing headers:", height, db_height)
168
169         try:
170             while height < db_height:
171                 height = height + 1
172                 header = self.get_header(height)
173                 if height > 1:
174                     assert prev_hash == header.get('prev_block_hash')
175                 self.write_header(header, sync=False)
176                 prev_hash = self.hash_header(header)
177                 if (height % 1000) == 0:
178                     print_log("headers file:", height)
179         except KeyboardInterrupt:
180             self.flush_headers()
181             sys.exit()
182
183         self.flush_headers()
184
185     def hash_header(self, header):
186         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
187
188     def read_header(self, block_height):
189         if os.path.exists(self.headers_filename):
190             with open(self.headers_filename, 'rb') as f:
191                 f.seek(block_height * 80)
192                 h = f.read(80)
193             if len(h) == 80:
194                 h = header_from_string(h)
195                 return h
196
197     def read_chunk(self, index):
198         with open(self.headers_filename, 'rb') as f:
199             f.seek(index*2016*80)
200             chunk = f.read(2016*80)
201         return chunk.encode('hex')
202
203     def write_header(self, header, sync=True):
204         if not self.headers_data:
205             self.headers_offset = header.get('block_height')
206
207         self.headers_data += header_to_string(header).decode('hex')
208         if sync or len(self.headers_data) > 40*100:
209             self.flush_headers()
210
211         with self.cache_lock:
212             chunk_index = header.get('block_height')/2016
213             if self.chunk_cache.get(chunk_index):
214                 self.chunk_cache.pop(chunk_index)
215
216     def pop_header(self):
217         # we need to do this only if we have not flushed
218         if self.headers_data:
219             self.headers_data = self.headers_data[:-40]
220
221     def flush_headers(self):
222         if not self.headers_data:
223             return
224         with open(self.headers_filename, 'rb+') as f:
225             f.seek(self.headers_offset*80)
226             f.write(self.headers_data)
227         self.headers_data = ''
228
229     def get_chunk(self, i):
230         # store them on disk; store the current chunk in memory
231         with self.cache_lock:
232             chunk = self.chunk_cache.get(i)
233             if not chunk:
234                 chunk = self.read_chunk(i)
235                 self.chunk_cache[i] = chunk
236
237         return chunk
238
239     def get_mempool_transaction(self, txid):
240         try:
241             raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
242         except:
243             return None
244
245         vds = deserialize.BCDataStream()
246         vds.write(raw_tx.decode('hex'))
247
248         return deserialize.parse_Transaction(vds, is_coinbase=False)
249
250     def get_history(self, addr, cache_only=False):
251         with self.cache_lock:
252             hist = self.history_cache.get(addr)
253         if hist is not None:
254             return hist
255         if cache_only:
256             return -1
257
258         with self.dblock:
259             try:
260                 hist = self.deserialize(self.db.Get(addr))
261                 is_known = True
262             except:
263                 hist = []
264                 is_known = False
265
266         # sort history, because redeeming transactions are next to the corresponding txout
267         hist.sort(key=lambda tup: tup[2])
268
269         # uniqueness
270         hist = set(map(lambda x: (x[0], x[2]), hist))
271
272         # add memory pool
273         with self.mempool_lock:
274             for txid in self.mempool_hist.get(addr, []):
275                 hist.append((txid, 0, 0))
276
277         # convert to dict
278         hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
279
280         # add something to distinguish between unused and empty addresses
281         if hist == [] and is_known:
282             hist = ['*']
283
284         with self.cache_lock:
285             self.history_cache[addr] = hist
286         return hist
287
288     def get_status(self, addr, cache_only=False):
289         tx_points = self.get_history(addr, cache_only)
290         if cache_only and tx_points == -1:
291             return -1
292
293         if not tx_points:
294             return None
295         if tx_points == ['*']:
296             return '*'
297         status = ''
298         for tx in tx_points:
299             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
300         return hashlib.sha256(status).digest().encode('hex')
301
302     def get_merkle(self, tx_hash, height):
303
304         block_hash = self.bitcoind('getblockhash', [height])
305         b = self.bitcoind('getblock', [block_hash])
306         tx_list = b.get('tx')
307         tx_pos = tx_list.index(tx_hash)
308
309         merkle = map(hash_decode, tx_list)
310         target_hash = hash_decode(tx_hash)
311         s = []
312         while len(merkle) != 1:
313             if len(merkle) % 2:
314                 merkle.append(merkle[-1])
315             n = []
316             while merkle:
317                 new_hash = Hash(merkle[0] + merkle[1])
318                 if merkle[0] == target_hash:
319                     s.append(hash_encode(merkle[1]))
320                     target_hash = new_hash
321                 elif merkle[1] == target_hash:
322                     s.append(hash_encode(merkle[0]))
323                     target_hash = new_hash
324                 n.append(new_hash)
325                 merkle = merkle[2:]
326             merkle = n
327
328         return {"block_height": height, "merkle": s, "pos": tx_pos}
329
330
331     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
332         # keep it sorted
333         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
334         assert len(s) == 80
335
336         serialized_hist = self.batch_list[addr]
337
338         l = len(serialized_hist)/80
339         for i in range(l-1, -1, -1):
340             item = serialized_hist[80*i:80*(i+1)]
341             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
342             if item_height < tx_height:
343                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
344                 break
345         else:
346             serialized_hist = s + serialized_hist
347
348         self.batch_list[addr] = serialized_hist
349
350         # backlink
351         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
352         self.batch_txio[txo] = addr
353
354
355
356     def revert_add_to_history(self, addr, tx_hash, tx_pos, tx_height):
357
358         serialized_hist = self.batch_list[addr]
359         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
360         if serialized_hist.find(s) == -1: raise
361         serialized_hist = serialized_hist.replace(s, '')
362         self.batch_list[addr] = serialized_hist
363
364
365
366     def prune_history(self, addr, undo):
367         # remove items that have bit set to one
368         if undo.get(addr) is None: undo[addr] = []
369
370         serialized_hist = self.batch_list[addr]
371         l = len(serialized_hist)/80
372         for i in range(l):
373             if len(serialized_hist)/80 < self.pruning_limit: break
374             item = serialized_hist[80*i:80*(i+1)] 
375             if item[39:40] == chr(1):
376                 assert item[79:80] == chr(2)
377                 serialized_hist = serialized_hist[0:80*i] + serialized_hist[80*(i+1):]
378                 undo[addr].append(item)  # items are ordered
379         self.batch_list[addr] = serialized_hist
380
381
382     def revert_prune_history(self, addr, undo):
383         # restore removed items
384         serialized_hist = self.batch_list[addr]
385
386         if undo.get(addr) is not None: 
387             itemlist = undo.pop(addr)
388         else:
389             return 
390
391         if not itemlist: return
392
393         l = len(serialized_hist)/80
394         tx_item = ''
395         for i in range(l-1, -1, -1):
396             if tx_item == '':
397                 if not itemlist: 
398                     break
399                 else:
400                     tx_item = itemlist.pop(-1) # get the last element
401                     tx_height = int(rev_hex(tx_item[36:39].encode('hex')), 16)
402             
403             item = serialized_hist[80*i:80*(i+1)]
404             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
405
406             if item_height < tx_height:
407                 serialized_hist = serialized_hist[0:80*(i+1)] + tx_item + serialized_hist[80*(i+1):]
408                 tx_item = ''
409
410         else:
411             serialized_hist = ''.join(itemlist) + tx_item + serialized_hist
412
413         self.batch_list[addr] = serialized_hist
414
415
416     def set_spent_bit(self, addr, txi, is_spent, txid=None, index=None, height=None):
417         serialized_hist = self.batch_list[addr]
418         l = len(serialized_hist)/80
419         for i in range(l):
420             item = serialized_hist[80*i:80*(i+1)]
421             if item[0:36] == txi:
422                 if is_spent:
423                     new_item = item[0:39] + chr(1) + self.serialize_item(txid, index, height, chr(2))
424                 else:
425                     new_item = item[0:39] + chr(0) + chr(0)*40 
426                 serialized_hist = serialized_hist[0:80*i] + new_item + serialized_hist[80*(i+1):]
427                 break
428         else:
429             hist = self.deserialize(serialized_hist)
430             raise BaseException("prevout not found", addr, hist, txi.encode('hex'))
431
432         self.batch_list[addr] = serialized_hist
433
434
435     def unset_spent_bit(self, addr, txi):
436         self.set_spent_bit(addr, txi, False)
437         self.batch_txio[txi] = addr
438
439
440     def deserialize_block(self, block):
441         txlist = block.get('tx')
442         tx_hashes = []  # ordered txids
443         txdict = {}     # deserialized tx
444         is_coinbase = True
445         for raw_tx in txlist:
446             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
447             tx_hashes.append(tx_hash)
448             vds = deserialize.BCDataStream()
449             vds.write(raw_tx.decode('hex'))
450             tx = deserialize.parse_Transaction(vds, is_coinbase)
451             txdict[tx_hash] = tx
452             is_coinbase = False
453         return tx_hashes, txdict
454
455     def get_undo_info(self, height):
456         s = self.db.Get("undo%d" % (height % 100))
457         return eval(s)
458
459     def write_undo_info(self, batch, height, undo_info):
460         if self.is_test or height > self.bitcoind_height - 100:
461             batch.Put("undo%d" % (height % 100), repr(undo_info))
462
463     def import_block(self, block, block_hash, block_height, sync, revert=False):
464
465         self.batch_list = {}  # address -> history
466         self.batch_txio = {}  # transaction i/o -> address
467
468         block_inputs = []
469         block_outputs = []
470         addr_to_read = []
471
472         # deserialize transactions
473         t0 = time.time()
474         tx_hashes, txdict = self.deserialize_block(block)
475
476         t00 = time.time()
477
478         # undo info
479         if revert:
480             undo_info = self.get_undo_info(block_height)
481         else:
482             undo_info = {}
483
484
485         if not revert:
486             # read addresses of tx inputs
487             for tx in txdict.values():
488                 for x in tx.get('inputs'):
489                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
490                     block_inputs.append(txi)
491
492             block_inputs.sort()
493             for txi in block_inputs:
494                 try:
495                     addr = self.db.Get(txi)
496                 except:
497                     # print "addr not in db", txi.encode('hex')
498                     # the input could come from the same block
499                     continue
500                 self.batch_txio[txi] = addr
501                 addr_to_read.append(addr)
502
503         else:
504             for txid, tx in txdict.items():
505                 for x in tx.get('outputs'):
506                     txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
507                     block_outputs.append(txo)
508                     addr_to_read.append( x.get('address') )
509
510                 undo = undo_info.get(txid)
511                 for i, x in enumerate(tx.get('inputs')):
512                     addr = undo['prev_addr'][i]
513                     addr_to_read.append(addr)
514
515
516
517
518
519         # read histories of addresses
520         for txid, tx in txdict.items():
521             for x in tx.get('outputs'):
522                 addr_to_read.append(x.get('address'))
523
524         addr_to_read.sort()
525         for addr in addr_to_read:
526             try:
527                 self.batch_list[addr] = self.db.Get(addr)
528             except:
529                 self.batch_list[addr] = ''
530
531
532         # process
533         t1 = time.time()
534
535         if revert:
536             tx_hashes = tx_hashes[::-1]
537
538
539         for txid in tx_hashes:  # must be ordered
540             tx = txdict[txid]
541             if not revert:
542
543                 undo = { 'prev_addr':[] } # contains the list of pruned items for each address in the tx; also, 'prev_addr' is a list of prev addresses
544                 
545                 prev_addr = []
546                 for i, x in enumerate(tx.get('inputs')):
547                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
548                     addr = self.batch_txio[txi]
549
550                     # add redeem item to the history.
551                     # add it right next to the input txi? this will break history sorting, but it's ok if I neglect tx inputs during search
552                     self.set_spent_bit(addr, txi, True, txid, i, block_height)
553
554                     # when I prune, prune a pair
555                     self.prune_history(addr, undo)
556                     prev_addr.append(addr)
557
558                 undo['prev_addr'] = prev_addr 
559
560                 # here I add only the outputs to history; maybe I want to add inputs too (that's in the other loop)
561                 for x in tx.get('outputs'):
562                     addr = x.get('address')
563                     self.add_to_history(addr, txid, x.get('index'), block_height)
564                     self.prune_history(addr, undo)  # prune here because we increased the length of the history
565
566                 undo_info[txid] = undo
567
568             else:
569
570                 undo = undo_info.pop(txid)
571
572                 for x in tx.get('outputs'):
573                     addr = x.get('address')
574                     self.revert_prune_history(addr, undo)
575                     self.revert_add_to_history(addr, txid, x.get('index'), block_height)
576
577                 prev_addr = undo.pop('prev_addr')
578                 for i, x in enumerate(tx.get('inputs')):
579                     addr = prev_addr[i]
580                     self.revert_prune_history(addr, undo)
581                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
582                     self.unset_spent_bit(addr, txi)
583
584                 assert undo == {}
585
586         if revert: 
587             assert undo_info == {}
588
589
590         # write
591         max_len = 0
592         max_addr = ''
593         t2 = time.time()
594
595         batch = leveldb.WriteBatch()
596         for addr, serialized_hist in self.batch_list.items():
597             batch.Put(addr, serialized_hist)
598             l = len(serialized_hist)/80
599             if l > max_len:
600                 max_len = l
601                 max_addr = addr
602
603         if not revert:
604             # add new created outputs
605             for txio, addr in self.batch_txio.items():
606                 batch.Put(txio, addr)
607             # delete spent inputs
608             for txi in block_inputs:
609                 batch.Delete(txi)
610             # add undo info
611             self.write_undo_info(batch, block_height, undo_info)
612         else:
613             # restore spent inputs
614             for txio, addr in self.batch_txio.items():
615                 # print "restoring spent input", repr(txio)
616                 batch.Put(txio, addr)
617             # delete spent outputs
618             for txo in block_outputs:
619                 batch.Delete(txo)
620
621         # add the max
622         batch.Put('height', self.serialize([(block_hash, block_height, self.db_version)]))
623
624         # actual write
625         self.db.Write(batch, sync=sync)
626
627         t3 = time.time()
628         if t3 - t0 > 10 and not sync:
629             print_log("block", block_height,
630                       "parse:%0.2f " % (t00 - t0),
631                       "read:%0.2f " % (t1 - t00),
632                       "proc:%.2f " % (t2-t1),
633                       "write:%.2f " % (t3-t2),
634                       "max:", max_len, max_addr)
635
636         for addr in self.batch_list.keys():
637             self.invalidate_cache(addr)
638
639     def add_request(self, request):
640         # see if we can get if from cache. if not, add to queue
641         if self.process(request, cache_only=True) == -1:
642             self.queue.put(request)
643
644     def process(self, request, cache_only=False):
645         #print "abe process", request
646
647         message_id = request['id']
648         method = request['method']
649         params = request.get('params', [])
650         result = None
651         error = None
652
653         if method == 'blockchain.numblocks.subscribe':
654             result = self.height
655
656         elif method == 'blockchain.headers.subscribe':
657             result = self.header
658
659         elif method == 'blockchain.address.subscribe':
660             try:
661                 address = params[0]
662                 result = self.get_status(address, cache_only)
663                 self.watch_address(address)
664             except BaseException, e:
665                 error = str(e) + ': ' + address
666                 print_log("error:", error)
667
668         elif method == 'blockchain.address.unsubscribe':
669             try:
670                 password = params[0]
671                 address = params[1]
672                 if password == self.config.get('server', 'password'):
673                     self.watched_addresses.remove(address)
674                     # print_log('unsubscribed', address)
675                     result = "ok"
676                 else:
677                     print_log('incorrect password')
678                     result = "authentication error"
679             except BaseException, e:
680                 error = str(e) + ': ' + address
681                 print_log("error:", error)
682
683         elif method == 'blockchain.address.get_history':
684             try:
685                 address = params[0]
686                 result = self.get_history(address, cache_only)
687             except BaseException, e:
688                 error = str(e) + ': ' + address
689                 print_log("error:", error)
690
691         elif method == 'blockchain.block.get_header':
692             if cache_only:
693                 result = -1
694             else:
695                 try:
696                     height = params[0]
697                     result = self.get_header(height)
698                 except BaseException, e:
699                     error = str(e) + ': %d' % height
700                     print_log("error:", error)
701
702         elif method == 'blockchain.block.get_chunk':
703             if cache_only:
704                 result = -1
705             else:
706                 try:
707                     index = params[0]
708                     result = self.get_chunk(index)
709                 except BaseException, e:
710                     error = str(e) + ': %d' % index
711                     print_log("error:", error)
712
713         elif method == 'blockchain.transaction.broadcast':
714             try:
715                 txo = self.bitcoind('sendrawtransaction', params)
716                 print_log("sent tx:", txo)
717                 result = txo
718             except BaseException, e:
719                 result = str(e)  # do not send an error
720                 print_log("error:", result, params)
721
722         elif method == 'blockchain.transaction.get_merkle':
723             if cache_only:
724                 result = -1
725             else:
726                 try:
727                     tx_hash = params[0]
728                     tx_height = params[1]
729                     result = self.get_merkle(tx_hash, tx_height)
730                 except BaseException, e:
731                     error = str(e) + ': ' + repr(params)
732                     print_log("get_merkle error:", error)
733
734         elif method == 'blockchain.transaction.get':
735             try:
736                 tx_hash = params[0]
737                 height = params[1]
738                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
739             except BaseException, e:
740                 error = str(e) + ': ' + repr(params)
741                 print_log("tx get error:", error)
742
743         else:
744             error = "unknown method:%s" % method
745
746         if cache_only and result == -1:
747             return -1
748
749         if error:
750             self.push_response({'id': message_id, 'error': error})
751         elif result != '':
752             self.push_response({'id': message_id, 'result': result})
753
754     def watch_address(self, addr):
755         if addr not in self.watched_addresses:
756             self.watched_addresses.append(addr)
757
758     def catch_up(self, sync=True):
759         t1 = time.time()
760
761         while not self.shared.stopped():
762             # are we done yet?
763             info = self.bitcoind('getinfo')
764             self.bitcoind_height = info.get('blocks')
765             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
766             if self.last_hash == bitcoind_block_hash:
767                 self.up_to_date = True
768                 break
769
770             # not done..
771             self.up_to_date = False
772             next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
773             next_block = self.bitcoind('getblock', [next_block_hash, 1])
774
775             # fixme: this is unsafe, if we revert when the undo info is not yet written
776             revert = (random.randint(1, 100) == 1) if self.is_test else False
777
778             if (next_block.get('previousblockhash') == self.last_hash) and not revert:
779
780                 self.import_block(next_block, next_block_hash, self.height+1, sync)
781                 self.height = self.height + 1
782                 self.write_header(self.block2header(next_block), sync)
783                 self.last_hash = next_block_hash
784
785                 if self.height % 100 == 0 and not sync:
786                     t2 = time.time()
787                     print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
788                     t1 = t2
789
790             else:
791                 # revert current block
792                 block = self.bitcoind('getblock', [self.last_hash, 1])
793                 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
794                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
795                 self.pop_header()
796                 self.flush_headers()
797
798                 self.height -= 1
799
800                 # read previous header from disk
801                 self.header = self.read_header(self.height)
802                 self.last_hash = self.hash_header(self.header)
803
804         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
805
806     def memorypool_update(self):
807         mempool_hashes = self.bitcoind('getrawmempool')
808
809         for tx_hash in mempool_hashes:
810             if tx_hash in self.mempool_hashes:
811                 continue
812
813             tx = self.get_mempool_transaction(tx_hash)
814             if not tx:
815                 continue
816
817             for x in tx.get('inputs'):
818                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
819                 try:
820                     addr = self.db.Get(txi)
821                 except:
822                     continue
823                 l = self.mempool_addresses.get(tx_hash, [])
824                 if addr not in l:
825                     l.append(addr)
826                     self.mempool_addresses[tx_hash] = l
827
828             for x in tx.get('outputs'):
829                 addr = x.get('address')
830                 l = self.mempool_addresses.get(tx_hash, [])
831                 if addr not in l:
832                     l.append(addr)
833                     self.mempool_addresses[tx_hash] = l
834
835             self.mempool_hashes.append(tx_hash)
836
837         # remove older entries from mempool_hashes
838         self.mempool_hashes = mempool_hashes
839
840         # remove deprecated entries from mempool_addresses
841         for tx_hash, addresses in self.mempool_addresses.items():
842             if tx_hash not in self.mempool_hashes:
843                 self.mempool_addresses.pop(tx_hash)
844
845         # rebuild mempool histories
846         new_mempool_hist = {}
847         for tx_hash, addresses in self.mempool_addresses.items():
848             for addr in addresses:
849                 h = new_mempool_hist.get(addr, [])
850                 if tx_hash not in h:
851                     h.append(tx_hash)
852                 new_mempool_hist[addr] = h
853
854         # invalidate cache for mempool addresses whose mempool history has changed
855         for addr in new_mempool_hist.keys():
856             if addr in self.mempool_hist.keys():
857                 if self.mempool_hist[addr] != new_mempool_hist[addr]:
858                     self.invalidate_cache(addr)
859             else:
860                 self.invalidate_cache(addr)
861
862         # invalidate cache for addresses that are removed from mempool ?
863         # this should not be necessary if they go into a block, but they might not
864         for addr in self.mempool_hist.keys():
865             if addr not in new_mempool_hist.keys():
866                 self.invalidate_cache(addr)
867         
868
869         with self.mempool_lock:
870             self.mempool_hist = new_mempool_hist
871
872     def invalidate_cache(self, address):
873         with self.cache_lock:
874             if address in self.history_cache:
875                 print_log("cache: invalidating", address)
876                 self.history_cache.pop(address)
877
878         if address in self.watched_addresses:
879             self.address_queue.put(address)
880
881     def main_iteration(self):
882         if self.shared.stopped():
883             print_log("blockchain processor terminating")
884             return
885
886         with self.dblock:
887             t1 = time.time()
888             self.catch_up()
889             t2 = time.time()
890
891         self.memorypool_update()
892         t3 = time.time()
893         # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
894
895         if self.sent_height != self.height:
896             self.sent_height = self.height
897             self.push_response({
898                 'id': None,
899                 'method': 'blockchain.numblocks.subscribe',
900                 'params': [self.height],
901             })
902
903         if self.sent_header != self.header:
904             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
905             self.sent_header = self.header
906             self.push_response({
907                 'id': None,
908                 'method': 'blockchain.headers.subscribe',
909                 'params': [self.header],
910             })
911
912         while True:
913             try:
914                 addr = self.address_queue.get(False)
915             except:
916                 break
917             if addr in self.watched_addresses:
918                 status = self.get_status(addr)
919                 self.push_response({
920                     'id': None,
921                     'method': 'blockchain.address.subscribe',
922                     'params': [addr, status],
923                 })
924
925         if not self.shared.stopped():
926             threading.Timer(10, self.main_iteration).start()
927         else:
928             print_log("blockchain processor terminating")