X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fbitcoind%2Fblockchain_processor.py;h=f92f6e4134a61259967d8a240f6ac51582b2960e;hb=44c124628b5a24b4a3aea549f5dfd8b1bd23f3c4;hp=78ef690bf43747404c04a836602a0845769407b5;hpb=fcf7076e578cffcd672a6bcf23d0459fc076fef5;p=electrum-server.git diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 78ef690..f92f6e4 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -90,14 +90,13 @@ class BlockchainProcessor(Processor): try: hist = self.deserialize(self.db.Get('0')) - hh, self.height, _ = hist[0] - self.block_hashes = [hh] + self.last_hash, self.height, _ = hist[0] print_log( "hist", hist ) except: #traceback.print_exc(file=sys.stdout) print_log('initializing database') self.height = 0 - self.block_hashes = [ '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' ] + self.last_hash = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' # catch_up headers self.init_headers(self.height) @@ -111,7 +110,7 @@ class BlockchainProcessor(Processor): shared.stop() sys.exit(0) - print "blockchain is up to date." + print_log( "blockchain is up to date." ) threading.Timer(10, self.main_iteration).start() @@ -159,28 +158,29 @@ class BlockchainProcessor(Processor): self.chunk_cache = {} self.headers_filename = os.path.join( self.dbpath, 'blockchain_headers') - height = 0 if os.path.exists(self.headers_filename): - height = os.path.getsize(self.headers_filename)/80 - - if height: - prev_header = self.read_header(height -1) - prev_hash = self.hash_header(prev_header) + height = os.path.getsize(self.headers_filename)/80 - 1 # the current height + if height > 0: + prev_hash = self.hash_header(self.read_header(height)) + else: + prev_hash = None else: open(self.headers_filename,'wb').close() prev_hash = None + height = -1 - if height != db_height: + if height < db_height: print_log( "catching up missing headers:", height, db_height) - s = '' try: - for i in range(height, db_height): - header = self.get_header(i) - assert prev_hash == header.get('prev_block_hash') + while height != db_height: + height = height + 1 + header = self.get_header(height) + if height>1: + assert prev_hash == header.get('prev_block_hash') self.write_header(header, sync=False) prev_hash = self.hash_header(header) - if i%1000==0: print_log("headers file:",i) + if height%1000==0: print_log("headers file:",height) except KeyboardInterrupt: self.flush_headers() sys.exit() @@ -214,6 +214,7 @@ class BlockchainProcessor(Processor): def write_header(self, header, sync=True): if not self.headers_data: self.headers_offset = header.get('block_height') + self.headers_data += header_to_string(header).decode('hex') if sync or len(self.headers_data) > 40*100: self.flush_headers() @@ -449,7 +450,7 @@ class BlockchainProcessor(Processor): self.db.Write(batch, sync = sync) t3 = time.time() - if t3 - t0 > 10: + if t3 - t0 > 10 and not sync: print_log("block", block_height, "parse:%0.2f "%(t00 - t0), "read:%0.2f "%(t1 - t00), @@ -576,11 +577,16 @@ class BlockchainProcessor(Processor): - def last_hash(self): - return self.block_hashes[-1] + def catch_up(self, sync = True): + # + # -------> F ------> G -------> H + # / + # / + # A ------> B --------> C ------> E + # + # we always compare the hash in the headers file to the hash returned by bitcoind - def catch_up(self, sync = True): t1 = time.time() while not self.shared.stopped(): @@ -589,45 +595,44 @@ class BlockchainProcessor(Processor): info = self.bitcoind('getinfo') bitcoind_height = info.get('blocks') bitcoind_block_hash = self.bitcoind('getblockhash', [bitcoind_height]) - if self.last_hash() == bitcoind_block_hash: + if self.last_hash == bitcoind_block_hash: self.up_to_date = True break # not done.. self.up_to_date = False - block_hash = self.bitcoind('getblockhash', [self.height+1]) - block = self.bitcoind('getblock', [block_hash, 1]) + next_block_hash = self.bitcoind('getblockhash', [self.height+1]) + next_block = self.bitcoind('getblock', [block_hash, 1]) - if block.get('previousblockhash') == self.last_hash(): + if next_block.get('previousblockhash') == self.last_hash: - self.import_block(block, block_hash, self.height+1, sync) + self.import_block(next_block, next_block_hash, self.height+1, sync) self.height = self.height + 1 - self.write_header(self.block2header(block), sync) - - self.block_hashes.append(block_hash) - self.block_hashes = self.block_hashes[-10:] + self.write_header(self.block2header(next_block), sync) + self.last_hash = next_block_hash if (self.height+1)%100 == 0 and not sync: t2 = time.time() print_log( "catch_up: block %d (%.3fs)"%( self.height, t2 - t1 ) ) t1 = t2 - else: # revert current block - print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash() ) - block_hash = self.last_hash() - block = self.bitcoind('getblock', [block_hash, 1]) - self.height = self.height -1 + block = self.bitcoind('getblock', [self.last_hash, 1]) + print_log( "bc2: reorg", self.height, block.get('previousblockhash'), self.last_hash ) + self.import_block(block, self.last_hash, self.height, revert=True) self.pop_header() - self.block_hashes.remove(block_hash) - self.import_block(block, self.last_hash(), self.height, revert=True) + self.height = self.height -1 + + # read previous header from disk + self.header = self.read_header(self.height) + self.last_hash = self.hash_header(self.header) - self.header = self.block2header(self.bitcoind('getblock', [self.last_hash()])) + self.header = self.block2header(self.bitcoind('getblock', [self.last_hash])) + - def memorypool_update(self): @@ -669,16 +674,23 @@ class BlockchainProcessor(Processor): self.mempool_addresses.pop(tx_hash) # rebuild histories - with self.mempool_lock: - self.mempool_hist = {} - for tx_hash, addresses in self.mempool_addresses.items(): - for addr in addresses: - h = self.mempool_hist.get(addr, []) - if tx_hash not in h: - h.append( tx_hash ) - self.mempool_hist[addr] = h - self.invalidate_cache(addr) + new_mempool_hist = {} + for tx_hash, addresses in self.mempool_addresses.items(): + for addr in addresses: + h = new_mempool_hist.get(addr, []) + if tx_hash not in h: + h.append( tx_hash ) + new_mempool_hist[addr] = h + + for addr in new_mempool_hist.keys(): + if addr in self.mempool_hist.keys(): + if self.mempool_hist[addr] != new_mempool_hist[addr]: + self.invalidate_cache(addr) + else: + self.invalidate_cache(addr) + with self.mempool_lock: + self.mempool_hist = new_mempool_hist @@ -688,6 +700,9 @@ class BlockchainProcessor(Processor): print_log( "cache: invalidating", address ) self.history_cache.pop(address) + if address in self.watched_addresses: + self.address_queue.put(address) + def main_iteration(self): @@ -703,7 +718,7 @@ class BlockchainProcessor(Processor): self.memorypool_update() t3 = time.time() - print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2) + # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2) if self.sent_height != self.height: @@ -723,6 +738,7 @@ class BlockchainProcessor(Processor): if addr in self.watched_addresses: status = self.get_status( addr ) self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) + self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] }) if not self.shared.stopped():