def pop_request(self):
return self.request_queue.get()
+ def collect_garbage(self):
+ # Deep copy entire sessions list and blank it
+ # This is done to minimise lock contention
+ with self.lock:
+ sessions = self.sessions[:]
+ self.sessions = []
+ for session in sessions:
+ if not session.stopped():
+ # If session is still alive then re-add it back
+ # to our internal register
+ self.add_session(session)
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
while not self.shared.stopped():
- # Deep copy entire sessions list and blank it
- # This is done to minimise lock contention
- with self.lock:
- sessions = self.sessions[:]
- self.sessions = []
- for session in sessions:
- if not session.stopped():
- # If session is still alive then re-add it back
- # to our internal register
- self.add_session(session)
-
+ self.collect_garbage()
session, request = self.pop_request()
self.process(session, request)
# When ready, you call
# self.push_response(session,response)
+ def update_from_blocknum(self,block_number):
+ for session in self.sessions:
+ if not session.stopped():
+ if session.numblocks_sub is not None:
+ response = { 'id':session.numblocks_sub, 'result':block_number }
+ self.push_response(session,response)
+
+ def update_from_address(self,addr):
+ for session in self.sessions:
+ if not session.stopped():
+ m = session.addresses_sub.get(addr)
+ if m:
+ status = self.get_status( addr )
+ message_id, last_status = m
+ if status != last_status:
+ session.subscribe_to_address(addr,message_id, status)
+ response = { 'id':message_id, 'result':status }
+ self.push_response(session,response)
+
+ def get_status(self,addr):
+ # return status of an address
+ # return store.get_status(addr)
+ pass
+
+
class Session:
def __init__(self, connection, address):