add responder thread to session
[electrum-server.git] / transports / stratum_tcp.py
index 6f29636..2a9463f 100644 (file)
@@ -28,6 +28,7 @@ class TcpSession(Session):
 
         self.address = address[0]
         self.name = "TCP " if not use_ssl else "SSL "
+        self.response_queue = queue.Queue()
 
     def do_handshake(self):
         if self.use_ssl:
@@ -55,17 +56,27 @@ class TcpSession(Session):
             self._stopped = True
 
     def send_response(self, response):
-        data = json.dumps(response) + "\n"
-        # Possible race condition here by having session
-        # close connection?
-        # I assume Python connections are thread safe interfaces
-        try:
-            connection = self.connection()
-            while data:
-                l = connection.send(data)
-                data = data[l:]
-        except:
-            self.stop()
+        self.response_queue.put(response)
+
+
+class TcpClientResponder(threading.Thread):
+
+    def __init__(self, session):
+        self.session = session
+        self.connection = self.session.connection()
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not self.session.stopped():
+            response = self.session.response_queue.get()
+            data = json.dumps(response) + "\n"
+            try:
+                while data:
+                    l = self.connection.send(data)
+                    data = data[l:]
+            except:
+                self.session.stop()
+
 
 
 class TcpClientRequestor(threading.Thread):
@@ -162,6 +173,7 @@ class TcpServer(threading.Thread):
 
         while not self.shared.stopped():
 
+            #if self.use_ssl: print_log("SSL: socket listening")
             try:
                 connection, address = sock.accept()
             except:
@@ -169,6 +181,7 @@ class TcpServer(threading.Thread):
                 time.sleep(0.1)
                 continue
 
+            #if self.use_ssl: print_log("SSL: new session", address)
             try:
                 session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
             except BaseException, e:
@@ -182,3 +195,5 @@ class TcpServer(threading.Thread):
             self.dispatcher.collect_garbage()
             client_req = TcpClientRequestor(self.dispatcher, session)
             client_req.start()
+            responder = TcpClientResponder(session)
+            responder.start()