X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=blobdiff_plain;f=processor.py;h=0bdb994564060388d346e1fe46b09bfd6b61b7d7;hp=40945808413a0ca3c351eeb076be23012809d738;hb=4ce69b7ea24ead59ebbcc7ed335ea9762ae3724b;hpb=5433b68b797e3ef31d46add539952ceb2aaa7662 diff --git a/processor.py b/processor.py index 4094580..0bdb994 100644 --- a/processor.py +++ b/processor.py @@ -82,7 +82,7 @@ class RequestDispatcher(threading.Thread): self.response_queue = queue.Queue() self.lock = threading.Lock() self.idlock = threading.Lock() - self.sessions = [] + self.sessions = {} self.processors = {} def push_response(self, session, item): @@ -98,13 +98,16 @@ class RequestDispatcher(threading.Thread): return self.request_queue.get() def get_session_by_address(self, address): - for x in self.sessions: + for x in self.sessions.values(): if x.address == address: return x def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") + + lastgc = 0 + while not self.shared.stopped(): session, request = self.pop_request() try: @@ -112,6 +115,10 @@ class RequestDispatcher(threading.Thread): except: traceback.print_exc(file=sys.stdout) + if time.time() - lastgc > 60.0: + self.collect_garbage() + lastgc = time.time() + self.stop() def stop(self): @@ -147,44 +154,32 @@ class RequestDispatcher(threading.Thread): def get_sessions(self): with self.lock: - r = self.sessions[:] + r = self.sessions.values() return r def add_session(self, session): + key = session.key() with self.lock: - self.sessions.append(session) + self.sessions[key] = session - def collect_garbage(self): - # Deep copy entire sessions list and blank it - # This is done to minimize lock contention + def remove_session(self, session): + key = session.key() with self.lock: - sessions = self.sessions[:] - - active_sessions = [] + self.sessions.pop(key) + def collect_garbage(self): now = time.time() - for session in sessions: - if (now - session.time) > 1000: + for session in self.sessions.values(): + if (now - session.time) > session.timeout: session.stop() - bp = self.processors['blockchain'] - - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - active_sessions.append(session) - else: - session.stop_subscriptions(bp) - - with self.lock: - self.sessions = active_sessions[:] - class Session: - def __init__(self): + def __init__(self, dispatcher): + self.dispatcher = dispatcher + self.bp = self.dispatcher.processors['blockchain'] self._stopped = False self.lock = threading.Lock() self.subscriptions = [] @@ -196,6 +191,10 @@ class Session: threading.Timer(2, self.info).start() + def key(self): + return self.name + self.address + + # Debugging method. Doesn't need to be threadsafe. def info(self): for sub in self.subscriptions: @@ -214,6 +213,21 @@ class Session: "%3d" % len(self.subscriptions), self.version) + def stop(self): + with self.lock: + if self._stopped: + return + self._stopped = True + + self.shutdown() + self.dispatcher.remove_session(self) + self.stop_subscriptions() + + + def shutdown(self): + pass + + def stopped(self): with self.lock: return self._stopped @@ -225,7 +239,9 @@ class Session: self.subscriptions.append((method,params)) - def stop_subscriptions(self, bp): + def stop_subscriptions(self): + bp = self.bp + with self.lock: s = self.subscriptions[:] @@ -244,6 +260,10 @@ class Session: continue if self in l: l.remove(self) + if self in l: + print "error rc!!" + bp.shared.stop() + if l == []: bp.watched_addresses.pop(addr)