error = None
if method == 'blockchain.numblocks.subscribe':
- if session not in self.watch_headers:
- with self.watch_lock:
+ with self.watch_lock:
+ if session not in self.watch_blocks:
self.watch_blocks.append(session)
result = self.height
elif method == 'blockchain.headers.subscribe':
- if session not in self.watch_headers:
- with self.watch_lock:
+ with self.watch_lock:
+ if session not in self.watch_headers:
self.watch_headers.append(session)
result = self.header
print_log("cache: invalidating", address)
self.history_cache.pop(address)
- if address in self.watched_addresses:
+ with self.watch_lock:
+ sessions = self.watched_addresses.get(address)
+
+ if sessions:
# TODO: update cache here. if new value equals cached value, do not send notification
- self.address_queue.put(address)
+ self.address_queue.put((address,sessions))
def main_iteration(self):
if self.shared.stopped():
while True:
try:
- addr = self.address_queue.get(False)
+ addr, sessions = self.address_queue.get(False)
except:
break
status = self.get_status(addr)
- for session in self.watched_addresses[addr]:
+ for session in sessions:
self.push_response(session, {
'id': None,
'method': 'blockchain.address.subscribe',