X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=backends%2Fbitcoind%2Fblockchain_processor.py;h=65af69807c5a0c1677b9c8f82f96dfb9a81556c9;hb=f7e00a67fa936e9c63c465d5ad7bfd7fb4eaa664;hp=416b6dbd73a93f3995952a5ddfebd009720454d8;hpb=eba756b57a9b31522d4a22e6caaa19a4aed67fdf;p=electrum-server.git diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 416b6db..65af698 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -62,8 +62,11 @@ class BlockchainProcessor(Processor): self.cache_lock = threading.Lock() self.headers_data = '' + self.mempool_addresses = {} self.mempool_hist = {} - self.known_mempool_hashes = [] + self.mempool_hashes = [] + self.mempool_lock = threading.Lock() + self.address_queue = Queue() self.dbpath = config.get('leveldb', 'path') @@ -264,8 +267,9 @@ class BlockchainProcessor(Processor): # check uniqueness too... # add memory pool - for txid in self.mempool_hist.get(addr,[]): - hist.append((txid, 0)) + with self.mempool_lock: + for txid in self.mempool_hist.get(addr,[]): + hist.append((txid, 0, 0)) hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist) # add something to distinguish between unused and empty addresses @@ -453,8 +457,7 @@ class BlockchainProcessor(Processor): "write:%.2f "%(t3-t2), "max:", max_len, max_addr) - # invalidate cache - for addr in self.batch_list.keys(): self.update_history_cache(addr) + for addr in self.batch_list.keys(): self.invalidate_cache(addr) @@ -529,7 +532,7 @@ class BlockchainProcessor(Processor): print_log( "error:", error) elif method == 'blockchain.transaction.broadcast': - txo = self.bitcoind('sendrawtransaction', params[0]) + txo = self.bitcoind('sendrawtransaction', params) print_log( "sent tx:", txo ) result = txo @@ -632,24 +635,54 @@ class BlockchainProcessor(Processor): mempool_hashes = self.bitcoind('getrawmempool') for tx_hash in mempool_hashes: - if tx_hash in self.known_mempool_hashes: continue - self.known_mempool_hashes.append(tx_hash) + if tx_hash in self.mempool_hashes: continue tx = self.get_transaction(tx_hash) if not tx: continue - for x in tx.get('inputs') + tx.get('outputs'): + for x in tx.get('inputs'): + txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex') + try: + addr = self.db.Get(txi) + except: + continue + l = self.mempool_addresses.get(tx_hash, []) + if addr not in l: + l.append( addr ) + self.mempool_addresses[tx_hash] = l + + for x in tx.get('outputs'): addr = x.get('address') - hist = self.mempool_hist.get(addr, []) - if tx_hash not in hist: - hist.append( tx_hash ) - self.mempool_hist[addr] = hist - self.update_history_cache(addr) + l = self.mempool_addresses.get(tx_hash, []) + if addr not in l: + l.append( addr ) + self.mempool_addresses[tx_hash] = l + + self.mempool_hashes.append(tx_hash) - self.known_mempool_hashes = mempool_hashes + # remove older entries from mempool_hashes + self.mempool_hashes = mempool_hashes + # remove deprecated entries from mempool_addresses + for tx_hash, addresses in self.mempool_addresses.items(): + if tx_hash not in self.mempool_hashes: + self.mempool_addresses.pop(tx_hash) - def update_history_cache(self, address): + # rebuild histories + with self.mempool_lock: + self.mempool_hist = {} + for tx_hash, addresses in self.mempool_addresses.items(): + for addr in addresses: + h = self.mempool_hist.get(addr, []) + if tx_hash not in h: + h.append( tx_hash ) + self.mempool_hist[addr] = h + self.invalidate_cache(addr) + + + + + def invalidate_cache(self, address): with self.cache_lock: if self.history_cache.has_key(address): print_log( "cache: invalidating", address ) @@ -669,6 +702,9 @@ class BlockchainProcessor(Processor): t2 = time.time() self.memorypool_update() + t3 = time.time() + # print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2) + if self.sent_height != self.height: self.sent_height = self.height