fix and improve clarity
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index 416b6db..f92f6e4 100644 (file)
@@ -62,8 +62,11 @@ class BlockchainProcessor(Processor):
         self.cache_lock = threading.Lock()
         self.headers_data = ''
 
+        self.mempool_addresses = {}
         self.mempool_hist = {}
-        self.known_mempool_hashes = []
+        self.mempool_hashes = []
+        self.mempool_lock = threading.Lock()
+
         self.address_queue = Queue()
         self.dbpath = config.get('leveldb', 'path')
 
@@ -87,14 +90,13 @@ class BlockchainProcessor(Processor):
 
         try:
             hist = self.deserialize(self.db.Get('0'))
-            hh, self.height, _ = hist[0] 
-            self.block_hashes = [hh]
+            self.last_hash, self.height, _ = hist[0] 
             print_log( "hist", hist )
         except:
             #traceback.print_exc(file=sys.stdout)
             print_log('initializing database')
             self.height = 0
-            self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ]
+            self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
 
         # catch_up headers
         self.init_headers(self.height)
@@ -108,7 +110,7 @@ class BlockchainProcessor(Processor):
                 shared.stop()
                 sys.exit(0)
 
-        print "blockchain is up to date."
+        print_log( "blockchain is up to date." )
 
         threading.Timer(10, self.main_iteration).start()
 
@@ -156,28 +158,29 @@ class BlockchainProcessor(Processor):
         self.chunk_cache = {}
         self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers')
 
-        height = 0
         if os.path.exists(self.headers_filename):
-            height = os.path.getsize(self.headers_filename)/80
-
-        if height:
-            prev_header = self.read_header(height -1)
-            prev_hash = self.hash_header(prev_header)
+            height = os.path.getsize(self.headers_filename)/80 - 1   # the current height
+            if height > 0:
+                prev_hash = self.hash_header(self.read_header(height))
+            else:
+                prev_hash = None
         else:
             open(self.headers_filename,'wb').close()
             prev_hash = None
+            height = -1
 
-        if height != db_height:
+        if height < db_height:
             print_log( "catching up missing headers:", height, db_height)
 
-        s = ''
         try:
-            for i in range(height, db_height):
-                header = self.get_header(i)
-                assert prev_hash == header.get('prev_block_hash')
+            while height != db_height:
+                height = height + 1
+                header = self.get_header(height)
+                if height>1: 
+                    assert prev_hash == header.get('prev_block_hash')
                 self.write_header(header, sync=False)
                 prev_hash = self.hash_header(header)
-                if i%1000==0: print_log("headers file:",i)
+                if height%1000==0: print_log("headers file:",height)
         except KeyboardInterrupt:
             self.flush_headers()
             sys.exit()
@@ -211,6 +214,7 @@ class BlockchainProcessor(Processor):
     def write_header(self, header, sync=True):
         if not self.headers_data:
             self.headers_offset = header.get('block_height')
+
         self.headers_data += header_to_string(header).decode('hex')
         if sync or len(self.headers_data) > 40*100:
             self.flush_headers()
@@ -264,8 +268,9 @@ class BlockchainProcessor(Processor):
         # check uniqueness too...
 
         # add memory pool
-        for txid in self.mempool_hist.get(addr,[]):
-            hist.append((txid, 0))
+        with self.mempool_lock:
+            for txid in self.mempool_hist.get(addr,[]):
+                hist.append((txid, 0, 0))
 
         hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
         # add something to distinguish between unused and empty addresses
@@ -445,7 +450,7 @@ class BlockchainProcessor(Processor):
         self.db.Write(batch, sync = sync)
 
         t3 = time.time()
-        if t3 - t0 > 10: 
+        if t3 - t0 > 10 and not sync: 
             print_log("block", block_height, 
                       "parse:%0.2f "%(t00 - t0), 
                       "read:%0.2f "%(t1 - t00), 
@@ -453,8 +458,7 @@ class BlockchainProcessor(Processor):
                       "write:%.2f "%(t3-t2), 
                       "max:", max_len, max_addr)
 
-        # invalidate cache
-        for addr in self.batch_list.keys(): self.update_history_cache(addr)
+        for addr in self.batch_list.keys(): self.invalidate_cache(addr)
 
 
 
@@ -529,7 +533,7 @@ class BlockchainProcessor(Processor):
                     print_log( "error:", error)
 
         elif method == 'blockchain.transaction.broadcast':
-            txo = self.bitcoind('sendrawtransaction', params[0])
+            txo = self.bitcoind('sendrawtransaction', params)
             print_log( "sent tx:", txo )
             result = txo 
 
@@ -573,11 +577,16 @@ class BlockchainProcessor(Processor):
 
 
 
-    def last_hash(self):
-        return self.block_hashes[-1]
+    def catch_up(self, sync = True):
+        #        
+        #                     -------> F ------> G -------> H
+        #                    /
+        #                   /
+        #        A ------> B --------> C ------> E
+        #        
+        #        we always compare the hash in the headers file to the hash returned by bitcoind
 
 
-    def catch_up(self, sync = True):
         t1 = time.time()
 
         while not self.shared.stopped():
@@ -586,45 +595,44 @@ class BlockchainProcessor(Processor):
             info = self.bitcoind('getinfo')
             bitcoind_height = info.get('blocks')
             bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height])
-            if self.last_hash() == bitcoind_block_hash: 
+            if self.last_hash == bitcoind_block_hash: 
                 self.up_to_date = True
                 break
 
             # not done..
             self.up_to_date = False
-            block_hash = self.bitcoind('getblockhash', [self.height+1])
-            block = self.bitcoind('getblock', [block_hash, 1])
+            next_block_hash = self.bitcoind('getblockhash', [self.height+1])
+            next_block = self.bitcoind('getblock', [block_hash, 1])
 
-            if block.get('previousblockhash') == self.last_hash():
+            if next_block.get('previousblockhash') == self.last_hash:
 
-                self.import_block(block, block_hash, self.height+1, sync)
+                self.import_block(next_block, next_block_hash, self.height+1, sync)
                 self.height = self.height + 1
-                self.write_header(self.block2header(block), sync)
-
-                self.block_hashes.append(block_hash)
-                self.block_hashes = self.block_hashes[-10:]
+                self.write_header(self.block2header(next_block), sync)
+                self.last_hash = next_block_hash
 
                 if (self.height+1)%100 == 0 and not sync: 
                     t2 = time.time()
                     print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) )
                     t1 = t2
-
                     
             else:
                 # revert current block
-                print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() )
-                block_hash = self.last_hash()
-                block = self.bitcoind('getblock', [block_hash, 1])
-                self.height = self.height -1
+                block = self.bitcoind('getblock', [self.last_hash, 1])
+                print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash )
+                self.import_block(block, self.last_hash, self.height, revert=True)
                 self.pop_header()
 
-                self.block_hashes.remove(block_hash)
-                self.import_block(block, self.last_hash(), self.height, revert=True)
+                self.height = self.height -1
+
+                # read previous header from disk
+                self.header = self.read_header(self.height) 
+                self.last_hash = self.hash_header(self.header)
         
 
-        self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()]))
+        self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
 
-        
 
             
     def memorypool_update(self):
@@ -632,29 +640,69 @@ class BlockchainProcessor(Processor):
         mempool_hashes = self.bitcoind('getrawmempool')
 
         for tx_hash in mempool_hashes:
-            if tx_hash in self.known_mempool_hashes: continue
-            self.known_mempool_hashes.append(tx_hash)
+            if tx_hash in self.mempool_hashes: continue
 
             tx = self.get_transaction(tx_hash)
             if not tx: continue
 
-            for x in tx.get('inputs') + tx.get('outputs'):
+            for x in tx.get('inputs'):
+                txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+                try:
+                    addr = self.db.Get(txi)    
+                except:
+                    continue
+                l = self.mempool_addresses.get(tx_hash, [])
+                if addr not in l: 
+                    l.append( addr )
+                    self.mempool_addresses[tx_hash] = l
+
+            for x in tx.get('outputs'):
                 addr = x.get('address')
-                hist = self.mempool_hist.get(addr, [])
-                if tx_hash not in hist: 
-                    hist.append( tx_hash )
-                    self.mempool_hist[addr] = hist
-                    self.update_history_cache(addr)
+                l = self.mempool_addresses.get(tx_hash, [])
+                if addr not in l: 
+                    l.append( addr )
+                    self.mempool_addresses[tx_hash] = l
+
+            self.mempool_hashes.append(tx_hash)
+
+        # remove older entries from mempool_hashes
+        self.mempool_hashes = mempool_hashes
+
+        # remove deprecated entries from mempool_addresses
+        for tx_hash, addresses in self.mempool_addresses.items():
+            if tx_hash not in self.mempool_hashes:
+                self.mempool_addresses.pop(tx_hash)
+
+        # rebuild histories
+        new_mempool_hist = {}
+        for tx_hash, addresses in self.mempool_addresses.items():
+            for addr in addresses:
+                h = new_mempool_hist.get(addr, [])
+                if tx_hash not in h: 
+                    h.append( tx_hash )
+                new_mempool_hist[addr] = h
+
+        for addr in new_mempool_hist.keys():
+            if addr in self.mempool_hist.keys():
+                if self.mempool_hist[addr] != new_mempool_hist[addr]: 
+                    self.invalidate_cache(addr)
+            else:
+                self.invalidate_cache(addr)
+
+        with self.mempool_lock:
+            self.mempool_hist = new_mempool_hist
 
-        self.known_mempool_hashes = mempool_hashes
 
 
-    def update_history_cache(self, address):
+    def invalidate_cache(self, address):
         with self.cache_lock:
             if self.history_cache.has_key(address):
                 print_log( "cache: invalidating", address )
                 self.history_cache.pop(address)
 
+        if address in self.watched_addresses:
+            self.address_queue.put(address)
+
 
 
     def main_iteration(self):
@@ -669,6 +717,9 @@ class BlockchainProcessor(Processor):
             t2 = time.time()
 
         self.memorypool_update()
+        t3 = time.time()
+        # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
+
 
         if self.sent_height != self.height:
             self.sent_height = self.height
@@ -687,6 +738,7 @@ class BlockchainProcessor(Processor):
             if addr in self.watched_addresses:
                 status = self.get_status( addr )
                 self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] })
+                self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] })
 
 
         if not self.shared.stopped():