use set instead of list in memorypool_update
[electrum-server.git] / processor.py
index 9210fd6..0bdb994 100644 (file)
@@ -80,11 +80,9 @@ class RequestDispatcher(threading.Thread):
         self.daemon = True
         self.request_queue = queue.Queue()
         self.response_queue = queue.Queue()
-        self.internal_ids = {}
-        self.internal_id = 1
         self.lock = threading.Lock()
         self.idlock = threading.Lock()
-        self.sessions = []
+        self.sessions = {}
         self.processors = {}
 
     def push_response(self, session, item):
@@ -100,24 +98,16 @@ class RequestDispatcher(threading.Thread):
         return self.request_queue.get()
 
     def get_session_by_address(self, address):
-        for x in self.sessions:
+        for x in self.sessions.values():
             if x.address == address:
                 return x
 
-    def get_session_id(self, internal_id):
-        with self.idlock:
-            return self.internal_ids.pop(internal_id)
-
-    def store_session_id(self, session, msgid):
-        with self.idlock:
-            self.internal_ids[self.internal_id] = session, msgid
-            r = self.internal_id
-            self.internal_id += 1
-            return r
-
     def run(self):
         if self.shared is None:
             raise TypeError("self.shared not set in Processor")
+
+        lastgc = 0 
+
         while not self.shared.stopped():
             session, request = self.pop_request()
             try:
@@ -125,6 +115,10 @@ class RequestDispatcher(threading.Thread):
             except:
                 traceback.print_exc(file=sys.stdout)
 
+            if time.time() - lastgc > 60.0:
+                self.collect_garbage()
+                lastgc = time.time()
+
         self.stop()
 
     def stop(self):
@@ -160,44 +154,32 @@ class RequestDispatcher(threading.Thread):
 
     def get_sessions(self):
         with self.lock:
-            r = self.sessions[:]
+            r = self.sessions.values()
         return r
 
     def add_session(self, session):
+        key = session.key()
         with self.lock:
-            self.sessions.append(session)
+            self.sessions[key] = session
 
-    def collect_garbage(self):
-        # Deep copy entire sessions list and blank it
-        # This is done to minimize lock contention
+    def remove_session(self, session):
+        key = session.key()
         with self.lock:
-            sessions = self.sessions[:]
-
-        active_sessions = []
+            self.sessions.pop(key)
 
+    def collect_garbage(self):
         now = time.time()
-        for session in sessions:
-            if (now - session.time) > 1000:
+        for session in self.sessions.values():
+            if (now - session.time) > session.timeout:
                 session.stop()
 
-        bp = self.processors['blockchain']
-
-        for session in sessions:
-            if not session.stopped():
-                # If session is still alive then re-add it back
-                # to our internal register
-                active_sessions.append(session)
-            else:
-                session.stop_subscriptions(bp)
-
-        with self.lock:
-            self.sessions = active_sessions[:]
-
 
 
 class Session:
 
-    def __init__(self):
+    def __init__(self, dispatcher):
+        self.dispatcher = dispatcher
+        self.bp = self.dispatcher.processors['blockchain']
         self._stopped = False
         self.lock = threading.Lock()
         self.subscriptions = []
@@ -209,6 +191,10 @@ class Session:
         threading.Timer(2, self.info).start()
 
 
+    def key(self):
+        return self.name + self.address
+
+
     # Debugging method. Doesn't need to be threadsafe.
     def info(self):
         for sub in self.subscriptions:
@@ -227,6 +213,21 @@ class Session:
                       "%3d" % len(self.subscriptions),
                       self.version)
 
+    def stop(self):
+        with self.lock:
+            if self._stopped:
+                return
+            self._stopped = True
+
+        self.shutdown()
+        self.dispatcher.remove_session(self)
+        self.stop_subscriptions()
+
+
+    def shutdown(self):
+        pass
+
+
     def stopped(self):
         with self.lock:
             return self._stopped
@@ -238,7 +239,9 @@ class Session:
                 self.subscriptions.append((method,params))
 
 
-    def stop_subscriptions(self, bp):
+    def stop_subscriptions(self):
+        bp = self.bp
+
         with self.lock:
             s = self.subscriptions[:]
 
@@ -257,6 +260,10 @@ class Session:
                         continue
                     if self in l:
                         l.remove(self)
+                    if self in l:
+                        print "error rc!!"
+                        bp.shared.stop()
+
                     if l == []:
                         bp.watched_addresses.pop(addr)