self.mempool_addresses = {}
self.mempool_hist = {}
- self.mempool_hashes = []
+ self.mempool_hashes = set([])
self.mempool_lock = threading.Lock()
self.address_queue = Queue()
self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
+
def memorypool_update(self):
- mempool_hashes = self.bitcoind('getrawmempool')
+ mempool_hashes = set(self.bitcoind('getrawmempool'))
+ touched_addresses = set([])
- touched_addresses = []
for tx_hash in mempool_hashes:
if tx_hash in self.mempool_hashes:
continue
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
for x in tx.get('outputs'):
addr = x.get('address')
if addr and addr not in mpa:
mpa.append(addr)
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
self.mempool_addresses[tx_hash] = mpa
- self.mempool_hashes.append(tx_hash)
+ self.mempool_hashes.add(tx_hash)
# remove older entries from mempool_hashes
self.mempool_hashes = mempool_hashes
if tx_hash not in self.mempool_hashes:
self.mempool_addresses.pop(tx_hash)
for addr in addresses:
- touched_addresses.append(addr)
+ touched_addresses.add(addr)
# rebuild mempool histories
new_mempool_hist = {}
print_log("cache: invalidating", address)
self.history_cache.pop(address)
- if address in self.watched_addresses:
+ with self.watch_lock:
+ sessions = self.watched_addresses.get(address)
+
+ if sessions:
# TODO: update cache here. if new value equals cached value, do not send notification
- self.address_queue.put(address)
+ self.address_queue.put((address,sessions))
def main_iteration(self):
if self.shared.stopped():
while True:
try:
- addr = self.address_queue.get(False)
+ addr, sessions = self.address_queue.get(False)
except:
break
status = self.get_status(addr)
- for session in self.watched_addresses[addr]:
+ for session in sessions:
self.push_response(session, {
'id': None,
'method': 'blockchain.address.subscribe',