fix
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import leveldb
5 import os
6 from Queue import Queue
7 import random
8 import sys
9 import time
10 import threading
11 import traceback
12 import urllib
13
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class BlockchainProcessor(Processor):
20
21     def __init__(self, config, shared):
22         Processor.__init__(self)
23
24         self.shared = shared
25         self.config = config
26         self.up_to_date = False
27         self.watched_addresses = []
28         self.history_cache = {}
29         self.chunk_cache = {}
30         self.cache_lock = threading.Lock()
31         self.headers_data = ''
32
33         self.mempool_addresses = {}
34         self.mempool_hist = {}
35         self.mempool_hashes = []
36         self.mempool_lock = threading.Lock()
37
38         self.address_queue = Queue()
39         self.dbpath = config.get('leveldb', 'path')
40         self.pruning_limit = config.getint('leveldb', 'pruning_limit')
41         self.db_version = 1 # increase this when database needs to be updated
42
43         self.dblock = threading.Lock()
44         try:
45             self.db = leveldb.LevelDB(self.dbpath)
46         except:
47             traceback.print_exc(file=sys.stdout)
48             self.shared.stop()
49
50         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
51             config.get('bitcoind', 'user'),
52             config.get('bitcoind', 'password'),
53             config.get('bitcoind', 'host'),
54             config.get('bitcoind', 'port'))
55
56         self.height = 0
57         self.is_test = False
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('height'))
63             self.last_hash, self.height, db_version = hist[0]
64             print_log("Database version", self.db_version)
65             print_log("Blockchain height", self.height)
66         except:
67             traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
71             db_version = self.db_version
72
73         # check version
74         if self.db_version != db_version:
75             print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
76             self.shared.stop()
77             return
78
79         # catch_up headers
80         self.init_headers(self.height)
81
82         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
83         while not shared.stopped() and not self.up_to_date:
84             try:
85                 time.sleep(1)
86             except:
87                 print "keyboard interrupt: stopping threads"
88                 shared.stop()
89                 sys.exit(0)
90
91         print_log("blockchain is up to date.")
92
93         threading.Timer(10, self.main_iteration).start()
94
95     def bitcoind(self, method, params=[]):
96         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
97         try:
98             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
99         except:
100             traceback.print_exc(file=sys.stdout)
101             self.shared.stop()
102
103         r = loads(respdata)
104         if r['error'] is not None:
105             raise BaseException(r['error'])
106         return r.get('result')
107
108     def serialize(self, h):
109         s = ''
110         for txid, txpos, height in h:
111             s += self.serialize_item(txid, txpos, height)
112         return s
113
114     def serialize_item(self, txid, txpos, height, spent=chr(0)):
115         s = (txid + int_to_hex(txpos, 4) + int_to_hex(height, 3)).decode('hex') + spent 
116         return s
117
118     def deserialize_item(self,s):
119         txid = s[0:32].encode('hex')
120         txpos = int(rev_hex(s[32:36].encode('hex')), 16)
121         height = int(rev_hex(s[36:39].encode('hex')), 16)
122         spent = s[39:40]
123         return (txid, txpos, height, spent)
124
125     def deserialize(self, s):
126         h = []
127         while s:
128             txid, txpos, height, spent = self.deserialize_item(s[0:40])
129             h.append((txid, txpos, height))
130             if spent == chr(1):
131                 txid, txpos, height, spent = self.deserialize_item(s[40:80])
132                 h.append((txid, txpos, height))
133             s = s[80:]
134         return h
135
136     def block2header(self, b):
137         return {
138             "block_height": b.get('height'),
139             "version": b.get('version'),
140             "prev_block_hash": b.get('previousblockhash'),
141             "merkle_root": b.get('merkleroot'),
142             "timestamp": b.get('time'),
143             "bits": int(b.get('bits'), 16),
144             "nonce": b.get('nonce'),
145         }
146
147     def get_header(self, height):
148         block_hash = self.bitcoind('getblockhash', [height])
149         b = self.bitcoind('getblock', [block_hash])
150         return self.block2header(b)
151
152     def init_headers(self, db_height):
153         self.chunk_cache = {}
154         self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
155
156         if os.path.exists(self.headers_filename):
157             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
158             if height > 0:
159                 prev_hash = self.hash_header(self.read_header(height))
160             else:
161                 prev_hash = None
162         else:
163             open(self.headers_filename, 'wb').close()
164             prev_hash = None
165             height = -1
166
167         if height < db_height:
168             print_log("catching up missing headers:", height, db_height)
169
170         try:
171             while height < db_height:
172                 height = height + 1
173                 header = self.get_header(height)
174                 if height > 1:
175                     assert prev_hash == header.get('prev_block_hash')
176                 self.write_header(header, sync=False)
177                 prev_hash = self.hash_header(header)
178                 if (height % 1000) == 0:
179                     print_log("headers file:", height)
180         except KeyboardInterrupt:
181             self.flush_headers()
182             sys.exit()
183
184         self.flush_headers()
185
186     def hash_header(self, header):
187         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
188
189     def read_header(self, block_height):
190         if os.path.exists(self.headers_filename):
191             with open(self.headers_filename, 'rb') as f:
192                 f.seek(block_height * 80)
193                 h = f.read(80)
194             if len(h) == 80:
195                 h = header_from_string(h)
196                 return h
197
198     def read_chunk(self, index):
199         with open(self.headers_filename, 'rb') as f:
200             f.seek(index*2016*80)
201             chunk = f.read(2016*80)
202         return chunk.encode('hex')
203
204     def write_header(self, header, sync=True):
205         if not self.headers_data:
206             self.headers_offset = header.get('block_height')
207
208         self.headers_data += header_to_string(header).decode('hex')
209         if sync or len(self.headers_data) > 40*100:
210             self.flush_headers()
211
212         with self.cache_lock:
213             chunk_index = header.get('block_height')/2016
214             if self.chunk_cache.get(chunk_index):
215                 self.chunk_cache.pop(chunk_index)
216
217     def pop_header(self):
218         # we need to do this only if we have not flushed
219         if self.headers_data:
220             self.headers_data = self.headers_data[:-40]
221
222     def flush_headers(self):
223         if not self.headers_data:
224             return
225         with open(self.headers_filename, 'rb+') as f:
226             f.seek(self.headers_offset*80)
227             f.write(self.headers_data)
228         self.headers_data = ''
229
230     def get_chunk(self, i):
231         # store them on disk; store the current chunk in memory
232         with self.cache_lock:
233             chunk = self.chunk_cache.get(i)
234             if not chunk:
235                 chunk = self.read_chunk(i)
236                 self.chunk_cache[i] = chunk
237
238         return chunk
239
240     def get_mempool_transaction(self, txid):
241         try:
242             raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
243         except:
244             return None
245
246         vds = deserialize.BCDataStream()
247         vds.write(raw_tx.decode('hex'))
248
249         return deserialize.parse_Transaction(vds, is_coinbase=False)
250
251     def get_history(self, addr, cache_only=False):
252         with self.cache_lock:
253             hist = self.history_cache.get(addr)
254         if hist is not None:
255             return hist
256         if cache_only:
257             return -1
258
259         with self.dblock:
260             try:
261                 hist = self.deserialize(self.db.Get(addr))
262                 is_known = True
263             except:
264                 hist = []
265                 is_known = False
266
267         # sort history, because redeeming transactions are next to the corresponding txout
268         hist.sort(key=lambda tup: tup[2])
269
270         # add memory pool
271         with self.mempool_lock:
272             for txid in self.mempool_hist.get(addr, []):
273                 hist.append((txid, 0, 0))
274
275         # uniqueness
276         hist = set(map(lambda x: (x[0], x[2]), hist))
277
278         # convert to dict
279         hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
280
281         # add something to distinguish between unused and empty addresses
282         if hist == [] and is_known:
283             hist = ['*']
284
285         with self.cache_lock:
286             self.history_cache[addr] = hist
287         return hist
288
289     def get_status(self, addr, cache_only=False):
290         tx_points = self.get_history(addr, cache_only)
291         if cache_only and tx_points == -1:
292             return -1
293
294         if not tx_points:
295             return None
296         if tx_points == ['*']:
297             return '*'
298         status = ''
299         for tx in tx_points:
300             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
301         return hashlib.sha256(status).digest().encode('hex')
302
303     def get_merkle(self, tx_hash, height):
304
305         block_hash = self.bitcoind('getblockhash', [height])
306         b = self.bitcoind('getblock', [block_hash])
307         tx_list = b.get('tx')
308         tx_pos = tx_list.index(tx_hash)
309
310         merkle = map(hash_decode, tx_list)
311         target_hash = hash_decode(tx_hash)
312         s = []
313         while len(merkle) != 1:
314             if len(merkle) % 2:
315                 merkle.append(merkle[-1])
316             n = []
317             while merkle:
318                 new_hash = Hash(merkle[0] + merkle[1])
319                 if merkle[0] == target_hash:
320                     s.append(hash_encode(merkle[1]))
321                     target_hash = new_hash
322                 elif merkle[1] == target_hash:
323                     s.append(hash_encode(merkle[0]))
324                     target_hash = new_hash
325                 n.append(new_hash)
326                 merkle = merkle[2:]
327             merkle = n
328
329         return {"block_height": height, "merkle": s, "pos": tx_pos}
330
331
332     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
333         # keep it sorted
334         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
335         assert len(s) == 80
336
337         serialized_hist = self.batch_list[addr]
338
339         l = len(serialized_hist)/80
340         for i in range(l-1, -1, -1):
341             item = serialized_hist[80*i:80*(i+1)]
342             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
343             if item_height < tx_height:
344                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
345                 break
346         else:
347             serialized_hist = s + serialized_hist
348
349         self.batch_list[addr] = serialized_hist
350
351         # backlink
352         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
353         self.batch_txio[txo] = addr
354
355
356
357     def revert_add_to_history(self, addr, tx_hash, tx_pos, tx_height):
358
359         serialized_hist = self.batch_list[addr]
360         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
361         if serialized_hist.find(s) == -1: raise
362         serialized_hist = serialized_hist.replace(s, '')
363         self.batch_list[addr] = serialized_hist
364
365
366
367     def prune_history(self, addr, undo):
368         # remove items that have bit set to one
369         if undo.get(addr) is None: undo[addr] = []
370
371         serialized_hist = self.batch_list[addr]
372         l = len(serialized_hist)/80
373         for i in range(l):
374             if len(serialized_hist)/80 < self.pruning_limit: break
375             item = serialized_hist[80*i:80*(i+1)] 
376             if item[39:40] == chr(1):
377                 assert item[79:80] == chr(2)
378                 serialized_hist = serialized_hist[0:80*i] + serialized_hist[80*(i+1):]
379                 undo[addr].append(item)  # items are ordered
380         self.batch_list[addr] = serialized_hist
381
382
383     def revert_prune_history(self, addr, undo):
384         # restore removed items
385         serialized_hist = self.batch_list[addr]
386
387         if undo.get(addr) is not None: 
388             itemlist = undo.pop(addr)
389         else:
390             return 
391
392         if not itemlist: return
393
394         l = len(serialized_hist)/80
395         tx_item = ''
396         for i in range(l-1, -1, -1):
397             if tx_item == '':
398                 if not itemlist: 
399                     break
400                 else:
401                     tx_item = itemlist.pop(-1) # get the last element
402                     tx_height = int(rev_hex(tx_item[36:39].encode('hex')), 16)
403             
404             item = serialized_hist[80*i:80*(i+1)]
405             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
406
407             if item_height < tx_height:
408                 serialized_hist = serialized_hist[0:80*(i+1)] + tx_item + serialized_hist[80*(i+1):]
409                 tx_item = ''
410
411         else:
412             serialized_hist = ''.join(itemlist) + tx_item + serialized_hist
413
414         self.batch_list[addr] = serialized_hist
415
416
417     def set_spent_bit(self, addr, txi, is_spent, txid=None, index=None, height=None):
418         serialized_hist = self.batch_list[addr]
419         l = len(serialized_hist)/80
420         for i in range(l):
421             item = serialized_hist[80*i:80*(i+1)]
422             if item[0:36] == txi:
423                 if is_spent:
424                     new_item = item[0:39] + chr(1) + self.serialize_item(txid, index, height, chr(2))
425                 else:
426                     new_item = item[0:39] + chr(0) + chr(0)*40 
427                 serialized_hist = serialized_hist[0:80*i] + new_item + serialized_hist[80*(i+1):]
428                 break
429         else:
430             hist = self.deserialize(serialized_hist)
431             raise BaseException("prevout not found", addr, hist, txi.encode('hex'))
432
433         self.batch_list[addr] = serialized_hist
434
435
436     def unset_spent_bit(self, addr, txi):
437         self.set_spent_bit(addr, txi, False)
438         self.batch_txio[txi] = addr
439
440
441     def deserialize_block(self, block):
442         txlist = block.get('tx')
443         tx_hashes = []  # ordered txids
444         txdict = {}     # deserialized tx
445         is_coinbase = True
446         for raw_tx in txlist:
447             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
448             tx_hashes.append(tx_hash)
449             vds = deserialize.BCDataStream()
450             vds.write(raw_tx.decode('hex'))
451             tx = deserialize.parse_Transaction(vds, is_coinbase)
452             txdict[tx_hash] = tx
453             is_coinbase = False
454         return tx_hashes, txdict
455
456     def get_undo_info(self, height):
457         s = self.db.Get("undo%d" % (height % 100))
458         return eval(s)
459
460     def write_undo_info(self, batch, height, undo_info):
461         if self.is_test or height > self.bitcoind_height - 100:
462             batch.Put("undo%d" % (height % 100), repr(undo_info))
463
464     def import_block(self, block, block_hash, block_height, sync, revert=False):
465
466         self.batch_list = {}  # address -> history
467         self.batch_txio = {}  # transaction i/o -> address
468
469         block_inputs = []
470         block_outputs = []
471         addr_to_read = []
472
473         # deserialize transactions
474         t0 = time.time()
475         tx_hashes, txdict = self.deserialize_block(block)
476
477         t00 = time.time()
478
479         # undo info
480         if revert:
481             undo_info = self.get_undo_info(block_height)
482         else:
483             undo_info = {}
484
485
486         if not revert:
487             # read addresses of tx inputs
488             for tx in txdict.values():
489                 for x in tx.get('inputs'):
490                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
491                     block_inputs.append(txi)
492
493             block_inputs.sort()
494             for txi in block_inputs:
495                 try:
496                     addr = self.db.Get(txi)
497                 except:
498                     # print "addr not in db", txi.encode('hex')
499                     # the input could come from the same block
500                     continue
501                 self.batch_txio[txi] = addr
502                 addr_to_read.append(addr)
503
504         else:
505             for txid, tx in txdict.items():
506                 for x in tx.get('outputs'):
507                     txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
508                     block_outputs.append(txo)
509                     addr_to_read.append( x.get('address') )
510
511                 undo = undo_info.get(txid)
512                 for i, x in enumerate(tx.get('inputs')):
513                     addr = undo['prev_addr'][i]
514                     addr_to_read.append(addr)
515
516
517
518
519
520         # read histories of addresses
521         for txid, tx in txdict.items():
522             for x in tx.get('outputs'):
523                 addr_to_read.append(x.get('address'))
524
525         addr_to_read.sort()
526         for addr in addr_to_read:
527             try:
528                 self.batch_list[addr] = self.db.Get(addr)
529             except:
530                 self.batch_list[addr] = ''
531
532
533         # process
534         t1 = time.time()
535
536         if revert:
537             tx_hashes = tx_hashes[::-1]
538
539
540         for txid in tx_hashes:  # must be ordered
541             tx = txdict[txid]
542             if not revert:
543
544                 undo = { 'prev_addr':[] } # contains the list of pruned items for each address in the tx; also, 'prev_addr' is a list of prev addresses
545                 
546                 prev_addr = []
547                 for i, x in enumerate(tx.get('inputs')):
548                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
549                     addr = self.batch_txio[txi]
550
551                     # add redeem item to the history.
552                     # add it right next to the input txi? this will break history sorting, but it's ok if I neglect tx inputs during search
553                     self.set_spent_bit(addr, txi, True, txid, i, block_height)
554
555                     # when I prune, prune a pair
556                     self.prune_history(addr, undo)
557                     prev_addr.append(addr)
558
559                 undo['prev_addr'] = prev_addr 
560
561                 # here I add only the outputs to history; maybe I want to add inputs too (that's in the other loop)
562                 for x in tx.get('outputs'):
563                     addr = x.get('address')
564                     self.add_to_history(addr, txid, x.get('index'), block_height)
565                     self.prune_history(addr, undo)  # prune here because we increased the length of the history
566
567                 undo_info[txid] = undo
568
569             else:
570
571                 undo = undo_info.pop(txid)
572
573                 for x in tx.get('outputs'):
574                     addr = x.get('address')
575                     self.revert_prune_history(addr, undo)
576                     self.revert_add_to_history(addr, txid, x.get('index'), block_height)
577
578                 prev_addr = undo.pop('prev_addr')
579                 for i, x in enumerate(tx.get('inputs')):
580                     addr = prev_addr[i]
581                     self.revert_prune_history(addr, undo)
582                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
583                     self.unset_spent_bit(addr, txi)
584
585                 assert undo == {}
586
587         if revert: 
588             assert undo_info == {}
589
590
591         # write
592         max_len = 0
593         max_addr = ''
594         t2 = time.time()
595
596         batch = leveldb.WriteBatch()
597         for addr, serialized_hist in self.batch_list.items():
598             batch.Put(addr, serialized_hist)
599             l = len(serialized_hist)/80
600             if l > max_len:
601                 max_len = l
602                 max_addr = addr
603
604         if not revert:
605             # add new created outputs
606             for txio, addr in self.batch_txio.items():
607                 batch.Put(txio, addr)
608             # delete spent inputs
609             for txi in block_inputs:
610                 batch.Delete(txi)
611             # add undo info
612             self.write_undo_info(batch, block_height, undo_info)
613         else:
614             # restore spent inputs
615             for txio, addr in self.batch_txio.items():
616                 # print "restoring spent input", repr(txio)
617                 batch.Put(txio, addr)
618             # delete spent outputs
619             for txo in block_outputs:
620                 batch.Delete(txo)
621
622         # add the max
623         batch.Put('height', self.serialize([(block_hash, block_height, self.db_version)]))
624
625         # actual write
626         self.db.Write(batch, sync=sync)
627
628         t3 = time.time()
629         if t3 - t0 > 10 and not sync:
630             print_log("block", block_height,
631                       "parse:%0.2f " % (t00 - t0),
632                       "read:%0.2f " % (t1 - t00),
633                       "proc:%.2f " % (t2-t1),
634                       "write:%.2f " % (t3-t2),
635                       "max:", max_len, max_addr)
636
637         for addr in self.batch_list.keys():
638             self.invalidate_cache(addr)
639
640     def add_request(self, request):
641         # see if we can get if from cache. if not, add to queue
642         if self.process(request, cache_only=True) == -1:
643             self.queue.put(request)
644
645     def process(self, request, cache_only=False):
646         #print "abe process", request
647
648         message_id = request['id']
649         method = request['method']
650         params = request.get('params', [])
651         result = None
652         error = None
653
654         if method == 'blockchain.numblocks.subscribe':
655             result = self.height
656
657         elif method == 'blockchain.headers.subscribe':
658             result = self.header
659
660         elif method == 'blockchain.address.subscribe':
661             try:
662                 address = params[0]
663                 result = self.get_status(address, cache_only)
664                 self.watch_address(address)
665             except BaseException, e:
666                 error = str(e) + ': ' + address
667                 print_log("error:", error)
668
669         elif method == 'blockchain.address.unsubscribe':
670             try:
671                 password = params[0]
672                 address = params[1]
673                 if password == self.config.get('server', 'password'):
674                     self.watched_addresses.remove(address)
675                     # print_log('unsubscribed', address)
676                     result = "ok"
677                 else:
678                     print_log('incorrect password')
679                     result = "authentication error"
680             except BaseException, e:
681                 error = str(e) + ': ' + address
682                 print_log("error:", error)
683
684         elif method == 'blockchain.address.get_history':
685             try:
686                 address = params[0]
687                 result = self.get_history(address, cache_only)
688             except BaseException, e:
689                 error = str(e) + ': ' + address
690                 print_log("error:", error)
691
692         elif method == 'blockchain.block.get_header':
693             if cache_only:
694                 result = -1
695             else:
696                 try:
697                     height = params[0]
698                     result = self.get_header(height)
699                 except BaseException, e:
700                     error = str(e) + ': %d' % height
701                     print_log("error:", error)
702
703         elif method == 'blockchain.block.get_chunk':
704             if cache_only:
705                 result = -1
706             else:
707                 try:
708                     index = params[0]
709                     result = self.get_chunk(index)
710                 except BaseException, e:
711                     error = str(e) + ': %d' % index
712                     print_log("error:", error)
713
714         elif method == 'blockchain.transaction.broadcast':
715             try:
716                 txo = self.bitcoind('sendrawtransaction', params)
717                 print_log("sent tx:", txo)
718                 result = txo
719             except BaseException, e:
720                 result = str(e)  # do not send an error
721                 print_log("error:", result, params)
722
723         elif method == 'blockchain.transaction.get_merkle':
724             if cache_only:
725                 result = -1
726             else:
727                 try:
728                     tx_hash = params[0]
729                     tx_height = params[1]
730                     result = self.get_merkle(tx_hash, tx_height)
731                 except BaseException, e:
732                     error = str(e) + ': ' + repr(params)
733                     print_log("get_merkle error:", error)
734
735         elif method == 'blockchain.transaction.get':
736             try:
737                 tx_hash = params[0]
738                 height = params[1]
739                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
740             except BaseException, e:
741                 error = str(e) + ': ' + repr(params)
742                 print_log("tx get error:", error)
743
744         else:
745             error = "unknown method:%s" % method
746
747         if cache_only and result == -1:
748             return -1
749
750         if error:
751             self.push_response({'id': message_id, 'error': error})
752         elif result != '':
753             self.push_response({'id': message_id, 'result': result})
754
755     def watch_address(self, addr):
756         if addr not in self.watched_addresses:
757             self.watched_addresses.append(addr)
758
759     def catch_up(self, sync=True):
760         t1 = time.time()
761
762         while not self.shared.stopped():
763             # are we done yet?
764             info = self.bitcoind('getinfo')
765             self.bitcoind_height = info.get('blocks')
766             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
767             if self.last_hash == bitcoind_block_hash:
768                 self.up_to_date = True
769                 break
770
771             # not done..
772             self.up_to_date = False
773             next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
774             next_block = self.bitcoind('getblock', [next_block_hash, 1])
775
776             # fixme: this is unsafe, if we revert when the undo info is not yet written
777             revert = (random.randint(1, 100) == 1) if self.is_test else False
778
779             if (next_block.get('previousblockhash') == self.last_hash) and not revert:
780
781                 self.import_block(next_block, next_block_hash, self.height+1, sync)
782                 self.height = self.height + 1
783                 self.write_header(self.block2header(next_block), sync)
784                 self.last_hash = next_block_hash
785
786                 if self.height % 100 == 0 and not sync:
787                     t2 = time.time()
788                     print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
789                     t1 = t2
790
791             else:
792                 # revert current block
793                 block = self.bitcoind('getblock', [self.last_hash, 1])
794                 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
795                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
796                 self.pop_header()
797                 self.flush_headers()
798
799                 self.height -= 1
800
801                 # read previous header from disk
802                 self.header = self.read_header(self.height)
803                 self.last_hash = self.hash_header(self.header)
804
805         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
806
807     def memorypool_update(self):
808         mempool_hashes = self.bitcoind('getrawmempool')
809
810         for tx_hash in mempool_hashes:
811             if tx_hash in self.mempool_hashes:
812                 continue
813
814             tx = self.get_mempool_transaction(tx_hash)
815             if not tx:
816                 continue
817
818             for x in tx.get('inputs'):
819                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
820                 try:
821                     addr = self.db.Get(txi)
822                 except:
823                     continue
824                 l = self.mempool_addresses.get(tx_hash, [])
825                 if addr not in l:
826                     l.append(addr)
827                     self.mempool_addresses[tx_hash] = l
828
829             for x in tx.get('outputs'):
830                 addr = x.get('address')
831                 l = self.mempool_addresses.get(tx_hash, [])
832                 if addr not in l:
833                     l.append(addr)
834                     self.mempool_addresses[tx_hash] = l
835
836             self.mempool_hashes.append(tx_hash)
837
838         # remove older entries from mempool_hashes
839         self.mempool_hashes = mempool_hashes
840
841         # remove deprecated entries from mempool_addresses
842         for tx_hash, addresses in self.mempool_addresses.items():
843             if tx_hash not in self.mempool_hashes:
844                 self.mempool_addresses.pop(tx_hash)
845
846         # rebuild mempool histories
847         new_mempool_hist = {}
848         for tx_hash, addresses in self.mempool_addresses.items():
849             for addr in addresses:
850                 h = new_mempool_hist.get(addr, [])
851                 if tx_hash not in h:
852                     h.append(tx_hash)
853                 new_mempool_hist[addr] = h
854
855         # invalidate cache for mempool addresses whose mempool history has changed
856         new_mempool_hist_keys = new_mempool_hist.keys()
857         self_mempool_hist_keys = self.mempool_hist.keys()
858         
859         for addr in new_mempool_hist_keys:
860             if addr in self_mempool_hist_keys:
861                 if self.mempool_hist[addr] != new_mempool_hist[addr]:
862                     self.invalidate_cache(addr)
863             else:
864                 self.invalidate_cache(addr)
865
866         # invalidate cache for addresses that are removed from mempool ?
867         # this should not be necessary if they go into a block, but they might not
868         for addr in self_mempool_hist_keys:
869             if addr not in new_mempool_hist_keys:
870                 self.invalidate_cache(addr)
871         
872
873         with self.mempool_lock:
874             self.mempool_hist = new_mempool_hist
875
876     def invalidate_cache(self, address):
877         with self.cache_lock:
878             if address in self.history_cache:
879                 print_log("cache: invalidating", address)
880                 self.history_cache.pop(address)
881
882         if address in self.watched_addresses:
883             self.address_queue.put(address)
884
885     def main_iteration(self):
886         if self.shared.stopped():
887             print_log("blockchain processor terminating")
888             return
889
890         with self.dblock:
891             t1 = time.time()
892             self.catch_up()
893             t2 = time.time()
894
895         self.memorypool_update()
896         t3 = time.time()
897         # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
898
899         if self.sent_height != self.height:
900             self.sent_height = self.height
901             self.push_response({
902                 'id': None,
903                 'method': 'blockchain.numblocks.subscribe',
904                 'params': [self.height],
905             })
906
907         if self.sent_header != self.header:
908             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
909             self.sent_header = self.header
910             self.push_response({
911                 'id': None,
912                 'method': 'blockchain.headers.subscribe',
913                 'params': [self.header],
914             })
915
916         while True:
917             try:
918                 addr = self.address_queue.get(False)
919             except:
920                 break
921             if addr in self.watched_addresses:
922                 status = self.get_status(addr)
923                 self.push_response({
924                     'id': None,
925                     'method': 'blockchain.address.subscribe',
926                     'params': [addr, status],
927                 })
928
929         if not self.shared.stopped():
930             threading.Timer(10, self.main_iteration).start()
931         else:
932             print_log("blockchain processor terminating")