From fcf7076e578cffcd672a6bcf23d0459fc076fef5 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 15 Nov 2012 11:17:22 +0400 Subject: [PATCH] fix memory pool --- backends/bitcoind/blockchain_processor.py | 66 ++++++++++++++++++++++------- 1 files changed, 51 insertions(+), 15 deletions(-) diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 9218fba..78ef690 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -62,8 +62,11 @@ class BlockchainProcessor(Processor): self.cache_lock = threading.Lock() self.headers_data = '' + self.mempool_addresses = {} self.mempool_hist = {} - self.known_mempool_hashes = [] + self.mempool_hashes = [] + self.mempool_lock = threading.Lock() + self.address_queue = Queue() self.dbpath = config.get('leveldb', 'path') @@ -264,8 +267,9 @@ class BlockchainProcessor(Processor): # check uniqueness too... # add memory pool - for txid in self.mempool_hist.get(addr,[]): - hist.append((txid, 0, 0)) + with self.mempool_lock: + for txid in self.mempool_hist.get(addr,[]): + hist.append((txid, 0, 0)) hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist) # add something to distinguish between unused and empty addresses @@ -453,8 +457,7 @@ class BlockchainProcessor(Processor): "write:%.2f "%(t3-t2), "max:", max_len, max_addr) - # invalidate cache - for addr in self.batch_list.keys(): self.update_history_cache(addr) + for addr in self.batch_list.keys(): self.invalidate_cache(addr) @@ -632,24 +635,54 @@ class BlockchainProcessor(Processor): mempool_hashes = self.bitcoind('getrawmempool') for tx_hash in mempool_hashes: - if tx_hash in self.known_mempool_hashes: continue - self.known_mempool_hashes.append(tx_hash) + if tx_hash in self.mempool_hashes: continue tx = self.get_transaction(tx_hash) if not tx: continue - for x in tx.get('inputs') + tx.get('outputs'): + for x in tx.get('inputs'): + txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') + try: + addr = self.db.Get(txi) + except: + continue + l = self.mempool_addresses.get(tx_hash, []) + if addr not in l: + l.append( addr ) + self.mempool_addresses[tx_hash] = l + + for x in tx.get('outputs'): addr = x.get('address') - hist = self.mempool_hist.get(addr, []) - if tx_hash not in hist: - hist.append( tx_hash ) - self.mempool_hist[addr] = hist - self.update_history_cache(addr) + l = self.mempool_addresses.get(tx_hash, []) + if addr not in l: + l.append( addr ) + self.mempool_addresses[tx_hash] = l + + self.mempool_hashes.append(tx_hash) - self.known_mempool_hashes = mempool_hashes + # remove older entries from mempool_hashes + self.mempool_hashes = mempool_hashes + # remove deprecated entries from mempool_addresses + for tx_hash, addresses in self.mempool_addresses.items(): + if tx_hash not in self.mempool_hashes: + self.mempool_addresses.pop(tx_hash) - def update_history_cache(self, address): + # rebuild histories + with self.mempool_lock: + self.mempool_hist = {} + for tx_hash, addresses in self.mempool_addresses.items(): + for addr in addresses: + h = self.mempool_hist.get(addr, []) + if tx_hash not in h: + h.append( tx_hash ) + self.mempool_hist[addr] = h + self.invalidate_cache(addr) + + + + + def invalidate_cache(self, address): with self.cache_lock: if self.history_cache.has_key(address): print_log( "cache: invalidating", address ) @@ -669,6 +702,9 @@ class BlockchainProcessor(Processor): t2 = time.time() self.memorypool_update() + t3 = time.time() + print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2) + if self.sent_height != self.height: self.sent_height = self.height -- 1.7.1