Merge branch 'master' of github.com:spesmilo/electrum-server
[electrum-server.git] / processor.py
index 0bdb994..a1b0851 100644 (file)
@@ -44,15 +44,21 @@ class Processor(threading.Thread):
         #print "response", response
         self.dispatcher.request_dispatcher.push_response(session, response)
 
+    def close(self):
+        pass
+
     def run(self):
         while not self.shared.stopped():
-            request, session = self.queue.get(10000000000)
+            try:
+                request, session = self.queue.get(True, timeout=1)
+            except:
+                continue
             try:
                 self.process(request, session)
             except:
                 traceback.print_exc(file=sys.stdout)
 
-        print_log("processor terminating")
+        self.close()
 
 
 class Dispatcher:
@@ -145,8 +151,8 @@ class RequestDispatcher(threading.Thread):
         p.add_request(session, request)
 
         if method in ['server.version']:
-            session.version = params[0]
             try:
+                session.version = params[0]
                 session.protocol_version = float(params[1])
             except:
                 pass
@@ -235,38 +241,18 @@ class Session:
 
     def subscribe_to_service(self, method, params):
         with self.lock:
+            if self._stopped:
+                return
             if (method, params) not in self.subscriptions:
                 self.subscriptions.append((method,params))
+        self.bp.do_subscribe(method, params, self)
 
 
     def stop_subscriptions(self):
-        bp = self.bp
-
         with self.lock:
             s = self.subscriptions[:]
-
         for method, params in s:
-            with bp.watch_lock:
-                if method == 'blockchain.numblocks.subscribe':
-                    if self in bp.watch_blocks:
-                        bp.watch_blocks.remove(self)
-                elif method == 'blockchain.headers.subscribe':
-                    if self in bp.watch_headers:
-                        bp.watch_headers.remove(self)
-                elif method == "blockchain.address.subscribe":
-                    addr = params[0]
-                    l = bp.watched_addresses.get(addr)
-                    if not l:
-                        continue
-                    if self in l:
-                        l.remove(self)
-                    if self in l:
-                        print "error rc!!"
-                        bp.shared.stop()
-
-                    if l == []:
-                        bp.watched_addresses.pop(addr)
-
+            self.bp.do_unsubscribe(method, params, self)
         with self.lock:
             self.subscriptions = []