Merge branch 'master' of github.com:spesmilo/electrum-server
[electrum-server.git] / processor.py
index 4094580..b94231b 100644 (file)
@@ -82,7 +82,7 @@ class RequestDispatcher(threading.Thread):
         self.response_queue = queue.Queue()
         self.lock = threading.Lock()
         self.idlock = threading.Lock()
-        self.sessions = []
+        self.sessions = {}
         self.processors = {}
 
     def push_response(self, session, item):
@@ -98,13 +98,16 @@ class RequestDispatcher(threading.Thread):
         return self.request_queue.get()
 
     def get_session_by_address(self, address):
-        for x in self.sessions:
+        for x in self.sessions.values():
             if x.address == address:
                 return x
 
     def run(self):
         if self.shared is None:
             raise TypeError("self.shared not set in Processor")
+
+        lastgc = 0 
+
         while not self.shared.stopped():
             session, request = self.pop_request()
             try:
@@ -112,6 +115,10 @@ class RequestDispatcher(threading.Thread):
             except:
                 traceback.print_exc(file=sys.stdout)
 
+            if time.time() - lastgc > 60.0:
+                self.collect_garbage()
+                lastgc = time.time()
+
         self.stop()
 
     def stop(self):
@@ -147,44 +154,32 @@ class RequestDispatcher(threading.Thread):
 
     def get_sessions(self):
         with self.lock:
-            r = self.sessions[:]
+            r = self.sessions.values()
         return r
 
     def add_session(self, session):
+        key = session.key()
         with self.lock:
-            self.sessions.append(session)
+            self.sessions[key] = session
 
-    def collect_garbage(self):
-        # Deep copy entire sessions list and blank it
-        # This is done to minimize lock contention
+    def remove_session(self, session):
+        key = session.key()
         with self.lock:
-            sessions = self.sessions[:]
-
-        active_sessions = []
+            self.sessions.pop(key)
 
+    def collect_garbage(self):
         now = time.time()
-        for session in sessions:
-            if (now - session.time) > 1000:
+        for session in self.sessions.values():
+            if (now - session.time) > session.timeout:
                 session.stop()
 
-        bp = self.processors['blockchain']
-
-        for session in sessions:
-            if not session.stopped():
-                # If session is still alive then re-add it back
-                # to our internal register
-                active_sessions.append(session)
-            else:
-                session.stop_subscriptions(bp)
-
-        with self.lock:
-            self.sessions = active_sessions[:]
-
 
 
 class Session:
 
-    def __init__(self):
+    def __init__(self, dispatcher):
+        self.dispatcher = dispatcher
+        self.bp = self.dispatcher.processors['blockchain']
         self._stopped = False
         self.lock = threading.Lock()
         self.subscriptions = []
@@ -196,6 +191,10 @@ class Session:
         threading.Timer(2, self.info).start()
 
 
+    def key(self):
+        return self.name + self.address
+
+
     # Debugging method. Doesn't need to be threadsafe.
     def info(self):
         for sub in self.subscriptions:
@@ -214,6 +213,21 @@ class Session:
                       "%3d" % len(self.subscriptions),
                       self.version)
 
+    def stop(self):
+        with self.lock:
+            if self._stopped:
+                return
+            self._stopped = True
+
+        self.shutdown()
+        self.dispatcher.remove_session(self)
+        self.stop_subscriptions()
+
+
+    def shutdown(self):
+        pass
+
+
     def stopped(self):
         with self.lock:
             return self._stopped
@@ -221,32 +235,18 @@ class Session:
 
     def subscribe_to_service(self, method, params):
         with self.lock:
+            if self._stopped:
+                return
             if (method, params) not in self.subscriptions:
                 self.subscriptions.append((method,params))
+        self.bp.do_subscribe(method, params, self)
 
 
-    def stop_subscriptions(self, bp):
+    def stop_subscriptions(self):
         with self.lock:
             s = self.subscriptions[:]
-
         for method, params in s:
-            with bp.watch_lock:
-                if method == 'blockchain.numblocks.subscribe':
-                    if self in bp.watch_blocks:
-                        bp.watch_blocks.remove(self)
-                elif method == 'blockchain.headers.subscribe':
-                    if self in bp.watch_headers:
-                        bp.watch_headers.remove(self)
-                elif method == "blockchain.address.subscribe":
-                    addr = params[0]
-                    l = bp.watched_addresses.get(addr)
-                    if not l:
-                        continue
-                    if self in l:
-                        l.remove(self)
-                    if l == []:
-                        bp.watched_addresses.pop(addr)
-
+            self.bp.do_unsubscribe(method, params, self)
         with self.lock:
             self.subscriptions = []