self.cache_lock = threading.Lock()
self.headers_data = ''
+ self.mempool_addresses = {}
self.mempool_hist = {}
- self.known_mempool_hashes = []
+ self.mempool_hashes = []
+ self.mempool_lock = threading.Lock()
+
self.address_queue = Queue()
self.dbpath = config.get('leveldb', 'path')
# check uniqueness too...
# add memory pool
- for txid in self.mempool_hist.get(addr,[]):
- hist.append((txid, 0))
+ with self.mempool_lock:
+ for txid in self.mempool_hist.get(addr,[]):
+ hist.append((txid, 0, 0))
hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
# add something to distinguish between unused and empty addresses
"write:%.2f "%(t3-t2),
"max:", max_len, max_addr)
- # invalidate cache
- for addr in self.batch_list.keys(): self.update_history_cache(addr)
+ for addr in self.batch_list.keys(): self.invalidate_cache(addr)
print_log( "error:", error)
elif method == 'blockchain.transaction.broadcast':
- txo = self.bitcoind('sendrawtransaction', params[0])
+ txo = self.bitcoind('sendrawtransaction', params)
print_log( "sent tx:", txo )
result = txo
mempool_hashes = self.bitcoind('getrawmempool')
for tx_hash in mempool_hashes:
- if tx_hash in self.known_mempool_hashes: continue
- self.known_mempool_hashes.append(tx_hash)
+ if tx_hash in self.mempool_hashes: continue
tx = self.get_transaction(tx_hash)
if not tx: continue
- for x in tx.get('inputs') + tx.get('outputs'):
+ for x in tx.get('inputs'):
+ txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+ try:
+ addr = self.db.Get(txi)
+ except:
+ continue
+ l = self.mempool_addresses.get(tx_hash, [])
+ if addr not in l:
+ l.append( addr )
+ self.mempool_addresses[tx_hash] = l
+
+ for x in tx.get('outputs'):
addr = x.get('address')
- hist = self.mempool_hist.get(addr, [])
- if tx_hash not in hist:
- hist.append( tx_hash )
- self.mempool_hist[addr] = hist
- self.update_history_cache(addr)
+ l = self.mempool_addresses.get(tx_hash, [])
+ if addr not in l:
+ l.append( addr )
+ self.mempool_addresses[tx_hash] = l
+
+ self.mempool_hashes.append(tx_hash)
- self.known_mempool_hashes = mempool_hashes
+ # remove older entries from mempool_hashes
+ self.mempool_hashes = mempool_hashes
+ # remove deprecated entries from mempool_addresses
+ for tx_hash, addresses in self.mempool_addresses.items():
+ if tx_hash not in self.mempool_hashes:
+ self.mempool_addresses.pop(tx_hash)
- def update_history_cache(self, address):
+ # rebuild histories
+ with self.mempool_lock:
+ self.mempool_hist = {}
+ for tx_hash, addresses in self.mempool_addresses.items():
+ for addr in addresses:
+ h = self.mempool_hist.get(addr, [])
+ if tx_hash not in h:
+ h.append( tx_hash )
+ self.mempool_hist[addr] = h
+ self.invalidate_cache(addr)
+
+
+
+
+ def invalidate_cache(self, address):
with self.cache_lock:
if self.history_cache.has_key(address):
print_log( "cache: invalidating", address )
t2 = time.time()
self.memorypool_update()
+ t3 = time.time()
+ print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
+
if self.sent_height != self.height:
self.sent_height = self.height