self.internal_ids = {}
self.internal_id = 1
self.lock = threading.Lock()
+ self.idlock = threading.Lock()
self.sessions = []
self.processors = {}
return x
def get_session_id(self, internal_id):
- with self.lock:
+ with self.idlock:
return self.internal_ids.pop(internal_id)
def store_session_id(self, session, msgid):
- with self.lock:
+ with self.idlock:
self.internal_ids[self.internal_id] = session, msgid
r = self.internal_id
self.internal_id += 1
suffix = method.split('.')[-1]
if session is not None:
- is_new = session.protocol_version >= 0.5
if suffix == 'subscribe':
session.subscribe_to_service(method, params)
except:
pass
- #if session.protocol_version < 0.6:
- # print_log("stopping session from old client", session.protocol_version)
- # session.stop()
def get_sessions(self):
with self.lock:
def collect_garbage(self):
# Deep copy entire sessions list and blank it
- # This is done to minimise lock contention
+ # This is done to minimize lock contention
with self.lock:
sessions = self.sessions[:]
- self.sessions = []
+
+ active_sessions = []
for session in sessions:
if not session.stopped():
# If session is still alive then re-add it back
# to our internal register
- self.add_session(session)
+ active_sessions.append(session)
+
+ with self.lock:
+ self.sessions = active_sessions[:]
+
class Session: