def pop_request(self):
return self.request_queue.get()
+ def collect_garbage(self):
+ # Deep copy entire sessions list and blank it
+ # This is done to minimise lock contention
+ with self.lock:
+ sessions = self.sessions[:]
+ self.sessions = []
+ for session in sessions:
+ if not session.stopped():
+ # If session is still alive then re-add it back
+ # to our internal register
+ self.add_session(session)
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
while not self.shared.stopped():
- # Deep copy entire sessions list and blank it
- # This is done to minimise lock contention
- with self.lock:
- sessions = self.sessions[:]
- self.sessions = []
- for session in sessions:
- if not session.stopped():
- # If session is still alive then re-add it back
- # to our internal register
- self.add_session(session)
-
+ self.collect_garbage()
session, request = self.pop_request()
self.process(session, request)
def update_from_blocknum(self,block_number):
for session in self.sessions:
- if session.numblocks_sub is not None:
- response = { 'id':session.numblocks_sub, 'result':block_number }
- self.push_response(session,response)
+ if not session.stopped():
+ if session.numblocks_sub is not None:
+ response = { 'id':session.numblocks_sub, 'result':block_number }
+ self.push_response(session,response)
def update_from_address(self,addr):
for session in self.sessions:
- m = session.addresses_sub.get(addr)
- if m:
- status = self.get_status( addr )
- message_id, last_status = m
- if status != last_status:
- session.subscribe_to_address(message_id, status)
- response = { 'id':message_id, 'result':status }
- self.push_response(session,response)
+ if not session.stopped():
+ m = session.addresses_sub.get(addr)
+ if m:
+ status = self.get_status( addr )
+ message_id, last_status = m
+ if status != last_status:
+ session.subscribe_to_address(addr,message_id, status)
+ response = { 'id':message_id, 'result':status }
+ self.push_response(session,response)
def get_status(self,addr):
# return status of an address