X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=transports%2Fstratum_tcp.py;h=5fb26a552684fe995ce93f4e920f3fea6021ca3c;hb=1189cf8382d936a035e399f9afdd95080b7d087e;hp=88ec0cfebea3660e4565ef1315562d0e16a3c4ed;hpb=247a7b2add3cdffe0f60553a2b2a07db9ad4b576;p=electrum-server.git diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index 88ec0cf..5fb26a5 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -13,6 +13,7 @@ class TcpSession(Session): def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile): Session.__init__(self) + self.use_ssl = use_ssl if use_ssl: import ssl self._connection = ssl.wrap_socket( @@ -20,12 +21,18 @@ class TcpSession(Session): server_side=True, certfile=ssl_certfile, keyfile=ssl_keyfile, - ssl_version=ssl.PROTOCOL_SSLv23) + ssl_version=ssl.PROTOCOL_SSLv23, + do_handshake_on_connect=False) else: self._connection = connection self.address = address[0] self.name = "TCP " if not use_ssl else "SSL " + self.response_queue = queue.Queue() + + def do_handshake(self): + if self.use_ssl: + self._connection.do_handshake() def connection(self): if self.stopped(): @@ -49,17 +56,26 @@ class TcpSession(Session): self._stopped = True def send_response(self, response): - data = json.dumps(response) + "\n" - # Possible race condition here by having session - # close connection? - # I assume Python connections are thread safe interfaces - try: - connection = self.connection() - while data: - l = connection.send(data) - data = data[l:] - except: - self.stop() + self.response_queue.put(response) + + +class TcpClientResponder(threading.Thread): + + def __init__(self, session): + self.session = session + threading.Thread.__init__(self) + + def run(self): + while not self.session.stopped(): + response = self.session.response_queue.get() + data = json.dumps(response) + "\n" + try: + while data: + l = self.session.connection().send(data) + data = data[l:] + except: + self.session.stop() + class TcpClientRequestor(threading.Thread): @@ -72,6 +88,11 @@ class TcpClientRequestor(threading.Thread): threading.Thread.__init__(self) def run(self): + try: + self.session.do_handshake() + except: + return + while not self.shared.stopped(): if not self.update(): break @@ -143,10 +164,7 @@ class TcpServer(threading.Thread): self.ssl_certfile = ssl_certfile def run(self): - if self.use_ssl: - print_log("TCP/SSL server started.") - else: - print_log("TCP server started.") + print_log( ("SSL" if self.use_ssl else "TCP") + " server started on port %d"%self.port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) @@ -154,6 +172,7 @@ class TcpServer(threading.Thread): while not self.shared.stopped(): + #if self.use_ssl: print_log("SSL: socket listening") try: connection, address = sock.accept() except: @@ -161,6 +180,7 @@ class TcpServer(threading.Thread): time.sleep(0.1) continue + #if self.use_ssl: print_log("SSL: new session", address) try: session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile) except BaseException, e: @@ -174,3 +194,5 @@ class TcpServer(threading.Thread): self.dispatcher.collect_garbage() client_req = TcpClientRequestor(self.dispatcher, session) client_req.start() + responder = TcpClientResponder(session) + responder.start()