X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=blobdiff_plain;f=processor.py;h=a1b0851cb3e2147bca241982a8204fabbc053fe9;hp=40945808413a0ca3c351eeb076be23012809d738;hb=a126aaa670bcc2e40691c4cf91c601b8812150b3;hpb=5433b68b797e3ef31d46add539952ceb2aaa7662 diff --git a/processor.py b/processor.py index 4094580..a1b0851 100644 --- a/processor.py +++ b/processor.py @@ -44,15 +44,21 @@ class Processor(threading.Thread): #print "response", response self.dispatcher.request_dispatcher.push_response(session, response) + def close(self): + pass + def run(self): while not self.shared.stopped(): - request, session = self.queue.get(10000000000) + try: + request, session = self.queue.get(True, timeout=1) + except: + continue try: self.process(request, session) except: traceback.print_exc(file=sys.stdout) - print_log("processor terminating") + self.close() class Dispatcher: @@ -82,7 +88,7 @@ class RequestDispatcher(threading.Thread): self.response_queue = queue.Queue() self.lock = threading.Lock() self.idlock = threading.Lock() - self.sessions = [] + self.sessions = {} self.processors = {} def push_response(self, session, item): @@ -98,13 +104,16 @@ class RequestDispatcher(threading.Thread): return self.request_queue.get() def get_session_by_address(self, address): - for x in self.sessions: + for x in self.sessions.values(): if x.address == address: return x def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") + + lastgc = 0 + while not self.shared.stopped(): session, request = self.pop_request() try: @@ -112,6 +121,10 @@ class RequestDispatcher(threading.Thread): except: traceback.print_exc(file=sys.stdout) + if time.time() - lastgc > 60.0: + self.collect_garbage() + lastgc = time.time() + self.stop() def stop(self): @@ -138,8 +151,8 @@ class RequestDispatcher(threading.Thread): p.add_request(session, request) if method in ['server.version']: - session.version = params[0] try: + session.version = params[0] session.protocol_version = float(params[1]) except: pass @@ -147,44 +160,32 @@ class RequestDispatcher(threading.Thread): def get_sessions(self): with self.lock: - r = self.sessions[:] + r = self.sessions.values() return r def add_session(self, session): + key = session.key() with self.lock: - self.sessions.append(session) + self.sessions[key] = session - def collect_garbage(self): - # Deep copy entire sessions list and blank it - # This is done to minimize lock contention + def remove_session(self, session): + key = session.key() with self.lock: - sessions = self.sessions[:] - - active_sessions = [] + self.sessions.pop(key) + def collect_garbage(self): now = time.time() - for session in sessions: - if (now - session.time) > 1000: + for session in self.sessions.values(): + if (now - session.time) > session.timeout: session.stop() - bp = self.processors['blockchain'] - - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - active_sessions.append(session) - else: - session.stop_subscriptions(bp) - - with self.lock: - self.sessions = active_sessions[:] - class Session: - def __init__(self): + def __init__(self, dispatcher): + self.dispatcher = dispatcher + self.bp = self.dispatcher.processors['blockchain'] self._stopped = False self.lock = threading.Lock() self.subscriptions = [] @@ -196,6 +197,10 @@ class Session: threading.Timer(2, self.info).start() + def key(self): + return self.name + self.address + + # Debugging method. Doesn't need to be threadsafe. def info(self): for sub in self.subscriptions: @@ -214,6 +219,21 @@ class Session: "%3d" % len(self.subscriptions), self.version) + def stop(self): + with self.lock: + if self._stopped: + return + self._stopped = True + + self.shutdown() + self.dispatcher.remove_session(self) + self.stop_subscriptions() + + + def shutdown(self): + pass + + def stopped(self): with self.lock: return self._stopped @@ -221,32 +241,18 @@ class Session: def subscribe_to_service(self, method, params): with self.lock: + if self._stopped: + return if (method, params) not in self.subscriptions: self.subscriptions.append((method,params)) + self.bp.do_subscribe(method, params, self) - def stop_subscriptions(self, bp): + def stop_subscriptions(self): with self.lock: s = self.subscriptions[:] - for method, params in s: - with bp.watch_lock: - if method == 'blockchain.numblocks.subscribe': - if self in bp.watch_blocks: - bp.watch_blocks.remove(self) - elif method == 'blockchain.headers.subscribe': - if self in bp.watch_headers: - bp.watch_headers.remove(self) - elif method == "blockchain.address.subscribe": - addr = params[0] - l = bp.watched_addresses.get(addr) - if not l: - continue - if self in l: - l.remove(self) - if l == []: - bp.watched_addresses.pop(addr) - + self.bp.do_unsubscribe(method, params, self) with self.lock: self.subscriptions = []