don't remove dupes in touched_addresses, it is wasteful
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
1 import ast
2 import hashlib
3 from json import dumps, loads
4 import leveldb
5 import os
6 from Queue import Queue
7 import random
8 import sys
9 import time
10 import threading
11 import traceback
12 import urllib
13
14 from backends.bitcoind import deserialize
15 from processor import Processor, print_log
16 from utils import *
17
18
19 class BlockchainProcessor(Processor):
20
21     def __init__(self, config, shared):
22         Processor.__init__(self)
23
24         self.shared = shared
25         self.config = config
26         self.up_to_date = False
27         self.watched_addresses = []
28         self.history_cache = {}
29         self.chunk_cache = {}
30         self.cache_lock = threading.Lock()
31         self.headers_data = ''
32
33         self.mempool_addresses = {}
34         self.mempool_hist = {}
35         self.mempool_hashes = []
36         self.mempool_lock = threading.Lock()
37
38         self.address_queue = Queue()
39         self.dbpath = config.get('leveldb', 'path')
40         self.pruning_limit = config.getint('leveldb', 'pruning_limit')
41         self.db_version = 1 # increase this when database needs to be updated
42
43         self.dblock = threading.Lock()
44         try:
45             self.db = leveldb.LevelDB(self.dbpath)
46         except:
47             traceback.print_exc(file=sys.stdout)
48             self.shared.stop()
49
50         self.bitcoind_url = 'http://%s:%s@%s:%s/' % (
51             config.get('bitcoind', 'user'),
52             config.get('bitcoind', 'password'),
53             config.get('bitcoind', 'host'),
54             config.get('bitcoind', 'port'))
55
56         self.height = 0
57         self.is_test = False
58         self.sent_height = 0
59         self.sent_header = None
60
61         try:
62             hist = self.deserialize(self.db.Get('height'))
63             self.last_hash, self.height, db_version = hist[0]
64             print_log("Database version", self.db_version)
65             print_log("Blockchain height", self.height)
66         except:
67             traceback.print_exc(file=sys.stdout)
68             print_log('initializing database')
69             self.height = 0
70             self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
71             db_version = self.db_version
72
73         # check version
74         if self.db_version != db_version:
75             print_log("Your database '%s' is deprecated. Please create a new database"%self.dbpath)
76             self.shared.stop()
77             return
78
79         # catch_up headers
80         self.init_headers(self.height)
81
82         threading.Timer(0, lambda: self.catch_up(sync=False)).start()
83         while not shared.stopped() and not self.up_to_date:
84             try:
85                 time.sleep(1)
86             except:
87                 print "keyboard interrupt: stopping threads"
88                 shared.stop()
89                 sys.exit(0)
90
91         print_log("Blockchain is up to date.")
92         self.memorypool_update()
93         print_log("Memory pool initialized.")
94
95         threading.Timer(10, self.main_iteration).start()
96
97     def bitcoind(self, method, params=[]):
98         postdata = dumps({"method": method, 'params': params, 'id': 'jsonrpc'})
99         try:
100             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
101         except:
102             traceback.print_exc(file=sys.stdout)
103             self.shared.stop()
104
105         r = loads(respdata)
106         if r['error'] is not None:
107             raise BaseException(r['error'])
108         return r.get('result')
109
110     def serialize(self, h):
111         s = ''
112         for txid, txpos, height in h:
113             s += self.serialize_item(txid, txpos, height)
114         return s
115
116     def serialize_item(self, txid, txpos, height, spent=chr(0)):
117         s = (txid + int_to_hex(txpos, 4) + int_to_hex(height, 3)).decode('hex') + spent 
118         return s
119
120     def deserialize_item(self,s):
121         txid = s[0:32].encode('hex')
122         txpos = int(rev_hex(s[32:36].encode('hex')), 16)
123         height = int(rev_hex(s[36:39].encode('hex')), 16)
124         spent = s[39:40]
125         return (txid, txpos, height, spent)
126
127     def deserialize(self, s):
128         h = []
129         while s:
130             txid, txpos, height, spent = self.deserialize_item(s[0:40])
131             h.append((txid, txpos, height))
132             if spent == chr(1):
133                 txid, txpos, height, spent = self.deserialize_item(s[40:80])
134                 h.append((txid, txpos, height))
135             s = s[80:]
136         return h
137
138     def block2header(self, b):
139         return {
140             "block_height": b.get('height'),
141             "version": b.get('version'),
142             "prev_block_hash": b.get('previousblockhash'),
143             "merkle_root": b.get('merkleroot'),
144             "timestamp": b.get('time'),
145             "bits": int(b.get('bits'), 16),
146             "nonce": b.get('nonce'),
147         }
148
149     def get_header(self, height):
150         block_hash = self.bitcoind('getblockhash', [height])
151         b = self.bitcoind('getblock', [block_hash])
152         return self.block2header(b)
153
154     def init_headers(self, db_height):
155         self.chunk_cache = {}
156         self.headers_filename = os.path.join(self.dbpath, 'blockchain_headers')
157
158         if os.path.exists(self.headers_filename):
159             height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
160             if height > 0:
161                 prev_hash = self.hash_header(self.read_header(height))
162             else:
163                 prev_hash = None
164         else:
165             open(self.headers_filename, 'wb').close()
166             prev_hash = None
167             height = -1
168
169         if height < db_height:
170             print_log("catching up missing headers:", height, db_height)
171
172         try:
173             while height < db_height:
174                 height = height + 1
175                 header = self.get_header(height)
176                 if height > 1:
177                     assert prev_hash == header.get('prev_block_hash')
178                 self.write_header(header, sync=False)
179                 prev_hash = self.hash_header(header)
180                 if (height % 1000) == 0:
181                     print_log("headers file:", height)
182         except KeyboardInterrupt:
183             self.flush_headers()
184             sys.exit()
185
186         self.flush_headers()
187
188     def hash_header(self, header):
189         return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
190
191     def read_header(self, block_height):
192         if os.path.exists(self.headers_filename):
193             with open(self.headers_filename, 'rb') as f:
194                 f.seek(block_height * 80)
195                 h = f.read(80)
196             if len(h) == 80:
197                 h = header_from_string(h)
198                 return h
199
200     def read_chunk(self, index):
201         with open(self.headers_filename, 'rb') as f:
202             f.seek(index*2016*80)
203             chunk = f.read(2016*80)
204         return chunk.encode('hex')
205
206     def write_header(self, header, sync=True):
207         if not self.headers_data:
208             self.headers_offset = header.get('block_height')
209
210         self.headers_data += header_to_string(header).decode('hex')
211         if sync or len(self.headers_data) > 40*100:
212             self.flush_headers()
213
214         with self.cache_lock:
215             chunk_index = header.get('block_height')/2016
216             if self.chunk_cache.get(chunk_index):
217                 self.chunk_cache.pop(chunk_index)
218
219     def pop_header(self):
220         # we need to do this only if we have not flushed
221         if self.headers_data:
222             self.headers_data = self.headers_data[:-40]
223
224     def flush_headers(self):
225         if not self.headers_data:
226             return
227         with open(self.headers_filename, 'rb+') as f:
228             f.seek(self.headers_offset*80)
229             f.write(self.headers_data)
230         self.headers_data = ''
231
232     def get_chunk(self, i):
233         # store them on disk; store the current chunk in memory
234         with self.cache_lock:
235             chunk = self.chunk_cache.get(i)
236             if not chunk:
237                 chunk = self.read_chunk(i)
238                 self.chunk_cache[i] = chunk
239
240         return chunk
241
242     def get_mempool_transaction(self, txid):
243         try:
244             raw_tx = self.bitcoind('getrawtransaction', [txid, 0, -1])
245         except:
246             return None
247
248         vds = deserialize.BCDataStream()
249         vds.write(raw_tx.decode('hex'))
250
251         return deserialize.parse_Transaction(vds, is_coinbase=False)
252
253     def get_history(self, addr, cache_only=False):
254         with self.cache_lock:
255             hist = self.history_cache.get(addr)
256         if hist is not None:
257             return hist
258         if cache_only:
259             return -1
260
261         with self.dblock:
262             try:
263                 hist = self.deserialize(self.db.Get(addr))
264                 is_known = True
265             except:
266                 hist = []
267                 is_known = False
268
269         # sort history, because redeeming transactions are next to the corresponding txout
270         hist.sort(key=lambda tup: tup[2])
271
272         # add memory pool
273         with self.mempool_lock:
274             for txid in self.mempool_hist.get(addr, []):
275                 hist.append((txid, 0, 0))
276
277         # uniqueness
278         hist = set(map(lambda x: (x[0], x[2]), hist))
279
280         # convert to dict
281         hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
282
283         # add something to distinguish between unused and empty addresses
284         if hist == [] and is_known:
285             hist = ['*']
286
287         with self.cache_lock:
288             self.history_cache[addr] = hist
289         return hist
290
291     def get_status(self, addr, cache_only=False):
292         tx_points = self.get_history(addr, cache_only)
293         if cache_only and tx_points == -1:
294             return -1
295
296         if not tx_points:
297             return None
298         if tx_points == ['*']:
299             return '*'
300         status = ''
301         for tx in tx_points:
302             status += tx.get('tx_hash') + ':%d:' % tx.get('height')
303         return hashlib.sha256(status).digest().encode('hex')
304
305     def get_merkle(self, tx_hash, height):
306
307         block_hash = self.bitcoind('getblockhash', [height])
308         b = self.bitcoind('getblock', [block_hash])
309         tx_list = b.get('tx')
310         tx_pos = tx_list.index(tx_hash)
311
312         merkle = map(hash_decode, tx_list)
313         target_hash = hash_decode(tx_hash)
314         s = []
315         while len(merkle) != 1:
316             if len(merkle) % 2:
317                 merkle.append(merkle[-1])
318             n = []
319             while merkle:
320                 new_hash = Hash(merkle[0] + merkle[1])
321                 if merkle[0] == target_hash:
322                     s.append(hash_encode(merkle[1]))
323                     target_hash = new_hash
324                 elif merkle[1] == target_hash:
325                     s.append(hash_encode(merkle[0]))
326                     target_hash = new_hash
327                 n.append(new_hash)
328                 merkle = merkle[2:]
329             merkle = n
330
331         return {"block_height": height, "merkle": s, "pos": tx_pos}
332
333
334     def add_to_history(self, addr, tx_hash, tx_pos, tx_height):
335         # keep it sorted
336         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
337         assert len(s) == 80
338
339         serialized_hist = self.batch_list[addr]
340
341         l = len(serialized_hist)/80
342         for i in range(l-1, -1, -1):
343             item = serialized_hist[80*i:80*(i+1)]
344             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
345             if item_height <= tx_height:
346                 serialized_hist = serialized_hist[0:80*(i+1)] + s + serialized_hist[80*(i+1):]
347                 break
348         else:
349             serialized_hist = s + serialized_hist
350
351         self.batch_list[addr] = serialized_hist
352
353         # backlink
354         txo = (tx_hash + int_to_hex(tx_pos, 4)).decode('hex')
355         self.batch_txio[txo] = addr
356
357
358
359     def revert_add_to_history(self, addr, tx_hash, tx_pos, tx_height):
360
361         serialized_hist = self.batch_list[addr]
362         s = self.serialize_item(tx_hash, tx_pos, tx_height) + 40*chr(0)
363         if serialized_hist.find(s) == -1: raise
364         serialized_hist = serialized_hist.replace(s, '')
365         self.batch_list[addr] = serialized_hist
366
367
368
369     def prune_history(self, addr, undo):
370         # remove items that have bit set to one
371         if undo.get(addr) is None: undo[addr] = []
372
373         serialized_hist = self.batch_list[addr]
374         l = len(serialized_hist)/80
375         for i in range(l):
376             if len(serialized_hist)/80 < self.pruning_limit: break
377             item = serialized_hist[80*i:80*(i+1)] 
378             if item[39:40] == chr(1):
379                 assert item[79:80] == chr(2)
380                 serialized_hist = serialized_hist[0:80*i] + serialized_hist[80*(i+1):]
381                 undo[addr].append(item)  # items are ordered
382         self.batch_list[addr] = serialized_hist
383
384
385     def revert_prune_history(self, addr, undo):
386         # restore removed items
387         serialized_hist = self.batch_list[addr]
388
389         if undo.get(addr) is not None: 
390             itemlist = undo.pop(addr)
391         else:
392             return 
393
394         if not itemlist: return
395
396         l = len(serialized_hist)/80
397         tx_item = ''
398         for i in range(l-1, -1, -1):
399             if tx_item == '':
400                 if not itemlist: 
401                     break
402                 else:
403                     tx_item = itemlist.pop(-1) # get the last element
404                     tx_height = int(rev_hex(tx_item[36:39].encode('hex')), 16)
405             
406             item = serialized_hist[80*i:80*(i+1)]
407             item_height = int(rev_hex(item[36:39].encode('hex')), 16)
408
409             if item_height < tx_height:
410                 serialized_hist = serialized_hist[0:80*(i+1)] + tx_item + serialized_hist[80*(i+1):]
411                 tx_item = ''
412
413         else:
414             serialized_hist = ''.join(itemlist) + tx_item + serialized_hist
415
416         self.batch_list[addr] = serialized_hist
417
418
419     def set_spent_bit(self, addr, txi, is_spent, txid=None, index=None, height=None):
420         serialized_hist = self.batch_list[addr]
421         l = len(serialized_hist)/80
422         for i in range(l):
423             item = serialized_hist[80*i:80*(i+1)]
424             if item[0:36] == txi:
425                 if is_spent:
426                     new_item = item[0:39] + chr(1) + self.serialize_item(txid, index, height, chr(2))
427                 else:
428                     new_item = item[0:39] + chr(0) + chr(0)*40 
429                 serialized_hist = serialized_hist[0:80*i] + new_item + serialized_hist[80*(i+1):]
430                 break
431         else:
432             self.shared.stop()
433             hist = self.deserialize(serialized_hist)
434             raise BaseException("prevout not found", addr, hist, txi.encode('hex'))
435
436         self.batch_list[addr] = serialized_hist
437
438
439     def unset_spent_bit(self, addr, txi):
440         self.set_spent_bit(addr, txi, False)
441         self.batch_txio[txi] = addr
442
443
444     def deserialize_block(self, block):
445         txlist = block.get('tx')
446         tx_hashes = []  # ordered txids
447         txdict = {}     # deserialized tx
448         is_coinbase = True
449         for raw_tx in txlist:
450             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
451             tx_hashes.append(tx_hash)
452             vds = deserialize.BCDataStream()
453             vds.write(raw_tx.decode('hex'))
454             tx = deserialize.parse_Transaction(vds, is_coinbase)
455             txdict[tx_hash] = tx
456             is_coinbase = False
457         return tx_hashes, txdict
458
459     def get_undo_info(self, height):
460         s = self.db.Get("undo%d" % (height % 100))
461         return eval(s)
462
463     def write_undo_info(self, batch, height, undo_info):
464         if self.is_test or height > self.bitcoind_height - 100:
465             batch.Put("undo%d" % (height % 100), repr(undo_info))
466
467     def import_block(self, block, block_hash, block_height, sync, revert=False):
468
469         self.batch_list = {}  # address -> history
470         self.batch_txio = {}  # transaction i/o -> address
471
472         block_inputs = []
473         block_outputs = []
474         addr_to_read = []
475
476         # deserialize transactions
477         t0 = time.time()
478         tx_hashes, txdict = self.deserialize_block(block)
479
480         t00 = time.time()
481
482         # undo info
483         if revert:
484             undo_info = self.get_undo_info(block_height)
485         else:
486             undo_info = {}
487
488
489         if not revert:
490             # read addresses of tx inputs
491             for tx in txdict.values():
492                 for x in tx.get('inputs'):
493                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
494                     block_inputs.append(txi)
495
496             block_inputs.sort()
497             for txi in block_inputs:
498                 try:
499                     addr = self.db.Get(txi)
500                 except KeyError:
501                     # the input could come from the same block
502                     continue
503                 except:
504                     traceback.print_exc(file=sys.stdout)
505                     self.shared.stop()
506                     raise
507
508                 self.batch_txio[txi] = addr
509                 addr_to_read.append(addr)
510
511         else:
512             for txid, tx in txdict.items():
513                 for x in tx.get('outputs'):
514                     txo = (txid + int_to_hex(x.get('index'), 4)).decode('hex')
515                     block_outputs.append(txo)
516                     addr_to_read.append( x.get('address') )
517
518                 undo = undo_info.get(txid)
519                 for i, x in enumerate(tx.get('inputs')):
520                     addr = undo['prev_addr'][i]
521                     addr_to_read.append(addr)
522
523
524
525
526
527         # read histories of addresses
528         for txid, tx in txdict.items():
529             for x in tx.get('outputs'):
530                 addr_to_read.append(x.get('address'))
531
532         addr_to_read.sort()
533         for addr in addr_to_read:
534             try:
535                 self.batch_list[addr] = self.db.Get(addr)
536             except KeyError:
537                 self.batch_list[addr] = ''
538             except:
539                 traceback.print_exc(file=sys.stdout)
540                 self.shared.stop()
541                 raise
542
543
544         # process
545         t1 = time.time()
546
547         if revert:
548             tx_hashes = tx_hashes[::-1]
549
550
551         for txid in tx_hashes:  # must be ordered
552             tx = txdict[txid]
553             if not revert:
554
555                 undo = { 'prev_addr':[] } # contains the list of pruned items for each address in the tx; also, 'prev_addr' is a list of prev addresses
556                 
557                 prev_addr = []
558                 for i, x in enumerate(tx.get('inputs')):
559                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
560                     addr = self.batch_txio[txi]
561
562                     # add redeem item to the history.
563                     # add it right next to the input txi? this will break history sorting, but it's ok if I neglect tx inputs during search
564                     self.set_spent_bit(addr, txi, True, txid, i, block_height)
565
566                     # when I prune, prune a pair
567                     self.prune_history(addr, undo)
568                     prev_addr.append(addr)
569
570                 undo['prev_addr'] = prev_addr 
571
572                 # here I add only the outputs to history; maybe I want to add inputs too (that's in the other loop)
573                 for x in tx.get('outputs'):
574                     addr = x.get('address')
575                     self.add_to_history(addr, txid, x.get('index'), block_height)
576                     self.prune_history(addr, undo)  # prune here because we increased the length of the history
577
578                 undo_info[txid] = undo
579
580             else:
581
582                 undo = undo_info.pop(txid)
583
584                 for x in tx.get('outputs'):
585                     addr = x.get('address')
586                     self.revert_prune_history(addr, undo)
587                     self.revert_add_to_history(addr, txid, x.get('index'), block_height)
588
589                 prev_addr = undo.pop('prev_addr')
590                 for i, x in enumerate(tx.get('inputs')):
591                     addr = prev_addr[i]
592                     self.revert_prune_history(addr, undo)
593                     txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
594                     self.unset_spent_bit(addr, txi)
595
596                 assert undo == {}
597
598         if revert: 
599             assert undo_info == {}
600
601
602         # write
603         max_len = 0
604         max_addr = ''
605         t2 = time.time()
606
607         batch = leveldb.WriteBatch()
608         for addr, serialized_hist in self.batch_list.items():
609             batch.Put(addr, serialized_hist)
610             l = len(serialized_hist)/80
611             if l > max_len:
612                 max_len = l
613                 max_addr = addr
614
615         if not revert:
616             # add new created outputs
617             for txio, addr in self.batch_txio.items():
618                 batch.Put(txio, addr)
619             # delete spent inputs
620             for txi in block_inputs:
621                 batch.Delete(txi)
622             # add undo info
623             self.write_undo_info(batch, block_height, undo_info)
624         else:
625             # restore spent inputs
626             for txio, addr in self.batch_txio.items():
627                 # print "restoring spent input", repr(txio)
628                 batch.Put(txio, addr)
629             # delete spent outputs
630             for txo in block_outputs:
631                 batch.Delete(txo)
632
633         # add the max
634         batch.Put('height', self.serialize([(block_hash, block_height, self.db_version)]))
635
636         # actual write
637         self.db.Write(batch, sync=sync)
638
639         t3 = time.time()
640         if t3 - t0 > 10 and not sync:
641             print_log("block", block_height,
642                       "parse:%0.2f " % (t00 - t0),
643                       "read:%0.2f " % (t1 - t00),
644                       "proc:%.2f " % (t2-t1),
645                       "write:%.2f " % (t3-t2),
646                       "max:", max_len, max_addr)
647
648         for addr in self.batch_list.keys():
649             self.invalidate_cache(addr)
650
651     def add_request(self, request):
652         # see if we can get if from cache. if not, add to queue
653         if self.process(request, cache_only=True) == -1:
654             self.queue.put(request)
655
656     def process(self, request, cache_only=False):
657         #print "abe process", request
658
659         message_id = request['id']
660         method = request['method']
661         params = request.get('params', [])
662         result = None
663         error = None
664
665         if method == 'blockchain.numblocks.subscribe':
666             result = self.height
667
668         elif method == 'blockchain.headers.subscribe':
669             result = self.header
670
671         elif method == 'blockchain.address.subscribe':
672             try:
673                 address = params[0]
674                 result = self.get_status(address, cache_only)
675                 self.watch_address(address)
676             except BaseException, e:
677                 error = str(e) + ': ' + address
678                 print_log("error:", error)
679
680         elif method == 'blockchain.address.unsubscribe':
681             try:
682                 password = params[0]
683                 address = params[1]
684                 if password == self.config.get('server', 'password'):
685                     self.watched_addresses.remove(address)
686                     # print_log('unsubscribed', address)
687                     result = "ok"
688                 else:
689                     print_log('incorrect password')
690                     result = "authentication error"
691             except BaseException, e:
692                 error = str(e) + ': ' + address
693                 print_log("error:", error)
694
695         elif method == 'blockchain.address.get_history':
696             try:
697                 address = params[0]
698                 result = self.get_history(address, cache_only)
699             except BaseException, e:
700                 error = str(e) + ': ' + address
701                 print_log("error:", error)
702
703         elif method == 'blockchain.block.get_header':
704             if cache_only:
705                 result = -1
706             else:
707                 try:
708                     height = params[0]
709                     result = self.get_header(height)
710                 except BaseException, e:
711                     error = str(e) + ': %d' % height
712                     print_log("error:", error)
713
714         elif method == 'blockchain.block.get_chunk':
715             if cache_only:
716                 result = -1
717             else:
718                 try:
719                     index = params[0]
720                     result = self.get_chunk(index)
721                 except BaseException, e:
722                     error = str(e) + ': %d' % index
723                     print_log("error:", error)
724
725         elif method == 'blockchain.transaction.broadcast':
726             try:
727                 txo = self.bitcoind('sendrawtransaction', params)
728                 print_log("sent tx:", txo)
729                 result = txo
730             except BaseException, e:
731                 result = str(e)  # do not send an error
732                 print_log("error:", result, params)
733
734         elif method == 'blockchain.transaction.get_merkle':
735             if cache_only:
736                 result = -1
737             else:
738                 try:
739                     tx_hash = params[0]
740                     tx_height = params[1]
741                     result = self.get_merkle(tx_hash, tx_height)
742                 except BaseException, e:
743                     error = str(e) + ': ' + repr(params)
744                     print_log("get_merkle error:", error)
745
746         elif method == 'blockchain.transaction.get':
747             try:
748                 tx_hash = params[0]
749                 height = params[1]
750                 result = self.bitcoind('getrawtransaction', [tx_hash, 0, height])
751             except BaseException, e:
752                 error = str(e) + ': ' + repr(params)
753                 print_log("tx get error:", error)
754
755         else:
756             error = "unknown method:%s" % method
757
758         if cache_only and result == -1:
759             return -1
760
761         if error:
762             self.push_response({'id': message_id, 'error': error})
763         elif result != '':
764             self.push_response({'id': message_id, 'result': result})
765
766     def watch_address(self, addr):
767         if addr not in self.watched_addresses:
768             self.watched_addresses.append(addr)
769
770     def catch_up(self, sync=True):
771         t1 = time.time()
772
773         while not self.shared.stopped():
774             # are we done yet?
775             info = self.bitcoind('getinfo')
776             self.bitcoind_height = info.get('blocks')
777             bitcoind_block_hash = self.bitcoind('getblockhash', [self.bitcoind_height])
778             if self.last_hash == bitcoind_block_hash:
779                 self.up_to_date = True
780                 break
781
782             # not done..
783             self.up_to_date = False
784             next_block_hash = self.bitcoind('getblockhash', [self.height + 1])
785             next_block = self.bitcoind('getblock', [next_block_hash, 1])
786
787             # fixme: this is unsafe, if we revert when the undo info is not yet written
788             revert = (random.randint(1, 100) == 1) if self.is_test else False
789
790             if (next_block.get('previousblockhash') == self.last_hash) and not revert:
791
792                 self.import_block(next_block, next_block_hash, self.height+1, sync)
793                 self.height = self.height + 1
794                 self.write_header(self.block2header(next_block), sync)
795                 self.last_hash = next_block_hash
796
797                 if self.height % 100 == 0 and not sync:
798                     t2 = time.time()
799                     print_log("catch_up: block %d (%.3fs)" % (self.height, t2 - t1))
800                     t1 = t2
801
802             else:
803                 # revert current block
804                 block = self.bitcoind('getblock', [self.last_hash, 1])
805                 print_log("blockchain reorg", self.height, block.get('previousblockhash'), self.last_hash)
806                 self.import_block(block, self.last_hash, self.height, sync, revert=True)
807                 self.pop_header()
808                 self.flush_headers()
809
810                 self.height -= 1
811
812                 # read previous header from disk
813                 self.header = self.read_header(self.height)
814                 self.last_hash = self.hash_header(self.header)
815
816         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
817
818     def memorypool_update(self):
819         mempool_hashes = self.bitcoind('getrawmempool')
820
821         touched_addresses = []
822         for tx_hash in mempool_hashes:
823             if tx_hash in self.mempool_hashes:
824                 continue
825
826             tx = self.get_mempool_transaction(tx_hash)
827             if not tx:
828                 continue
829
830             for x in tx.get('inputs'):
831                 txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
832                 try:
833                     addr = self.db.Get(txi)
834                 except:
835                     tx_prev = self.get_mempool_transaction(x.get('prevout_hash'))
836                     try:
837                         addr = tx_prev['outputs'][x.get('prevout_n')]['address']
838                         if not addr: continue
839                     except:
840                         continue
841                 l = self.mempool_addresses.get(tx_hash, [])
842                 if addr not in l:
843                     l.append(addr)
844                     self.mempool_addresses[tx_hash] = l
845                     #if addr not in touched_addresses: 
846                     touched_addresses.append(addr)
847
848             for x in tx.get('outputs'):
849                 addr = x.get('address')
850                 l = self.mempool_addresses.get(tx_hash, [])
851                 if addr not in l:
852                     l.append(addr)
853                     self.mempool_addresses[tx_hash] = l
854                     #if addr not in touched_addresses: 
855                     touched_addresses.append(addr)
856
857             self.mempool_hashes.append(tx_hash)
858
859         # remove older entries from mempool_hashes
860         self.mempool_hashes = mempool_hashes
861
862         # remove deprecated entries from mempool_addresses
863         for tx_hash, addresses in self.mempool_addresses.items():
864             if tx_hash not in self.mempool_hashes:
865                 self.mempool_addresses.pop(tx_hash)
866                 for addr in addresses:
867                     #if addr not in touched_addresses: 
868                     touched_addresses.append(addr)
869
870         # rebuild mempool histories
871         new_mempool_hist = {}
872         for tx_hash, addresses in self.mempool_addresses.items():
873             for addr in addresses:
874                 h = new_mempool_hist.get(addr, [])
875                 if tx_hash not in h:
876                     h.append(tx_hash)
877                 new_mempool_hist[addr] = h
878
879         with self.mempool_lock:
880             self.mempool_hist = new_mempool_hist
881
882         # invalidate cache for touched addresses
883         for addr in touched_addresses:
884             self.invalidate_cache(addr)
885
886
887     def invalidate_cache(self, address):
888         with self.cache_lock:
889             if address in self.history_cache:
890                 print_log("cache: invalidating", address)
891                 self.history_cache.pop(address)
892
893         if address in self.watched_addresses:
894             # TODO: update cache here. if new value equals cached value, do not send notification
895             self.address_queue.put(address)
896
897     def main_iteration(self):
898         if self.shared.stopped():
899             print_log("blockchain processor terminating")
900             return
901
902         with self.dblock:
903             t1 = time.time()
904             self.catch_up()
905             t2 = time.time()
906
907         self.memorypool_update()
908
909         if self.sent_height != self.height:
910             self.sent_height = self.height
911             self.push_response({
912                 'id': None,
913                 'method': 'blockchain.numblocks.subscribe',
914                 'params': [self.height],
915             })
916
917         if self.sent_header != self.header:
918             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
919             self.sent_header = self.header
920             self.push_response({
921                 'id': None,
922                 'method': 'blockchain.headers.subscribe',
923                 'params': [self.header],
924             })
925
926         while True:
927             try:
928                 addr = self.address_queue.get(False)
929             except:
930                 break
931             if addr in self.watched_addresses:
932                 status = self.get_status(addr)
933                 self.push_response({
934                     'id': None,
935                     'method': 'blockchain.address.subscribe',
936                     'params': [addr, status],
937                 })
938
939         if not self.shared.stopped():
940             threading.Timer(10, self.main_iteration).start()
941         else:
942             print_log("blockchain processor terminating")