X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=transports%2Fstratum_tcp.py;h=c843f770a572d4e0d97d69362cb2fe1dd8f5327e;hb=a6c7750da047d13465a040edfa5fa4a9d3d28328;hp=ce39acad751f9571b0713615bbb05f159d27e098;hpb=8c0a7d9c52070a81870bbc8c2043612e72ac3aa8;p=electrum-server.git diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index ce39aca..c843f77 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -4,15 +4,25 @@ import threading import time import Queue as queue -from processor import Session, Dispatcher, Shared +from processor import Session, Dispatcher, print_log class TcpSession(Session): - def __init__(self, connection, address): - self._connection = connection - self.address = address + def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile): Session.__init__(self) - print "New session", address + if use_ssl: + import ssl + self._connection = ssl.wrap_socket( + connection, + server_side=True, + certfile=ssl_certfile, + keyfile=ssl_keyfile, + ssl_version=ssl.PROTOCOL_SSLv23) + else: + self._connection = connection + + self.address = address[0] + self.name = "TCP" def connection(self): if self.stopped(): @@ -22,18 +32,20 @@ class TcpSession(Session): def stop(self): self._connection.close() - print "Terminating connection:", self.address[0] + #print "Terminating connection:", self.address with self.lock: self._stopped = True def send_response(self, response): - raw_response = json.dumps(response) + data = json.dumps(response) + "\n" # Possible race condition here by having session # close connection? # I assume Python connections are thread safe interfaces try: connection = self.connection() - connection.send(raw_response + "\n") + while data: + l = connection.send(data) + data = data[l:] except: self.stop() @@ -41,9 +53,9 @@ class TcpSession(Session): class TcpClientRequestor(threading.Thread): - def __init__(self, shared, processor, session): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, session): + self.shared = dispatcher.shared + self.dispatcher = dispatcher self.message = "" self.session = session threading.Thread.__init__(self) @@ -53,6 +65,8 @@ class TcpClientRequestor(threading.Thread): if not self.update(): break + self.session.time = time.time() + while self.parse(): pass @@ -68,7 +82,7 @@ class TcpClientRequestor(threading.Thread): def receive(self): try: - return self.session.connection().recv(1024) + return self.session.connection().recv(2048) except: return '' @@ -86,7 +100,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.processor.push_response({"error": "bad JSON", "request": raw_command}) + self.dispatcher.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -96,36 +110,39 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.processor.push_response({"error": "syntax error", "request": raw_command}) + self.dispatcher.push_response({"error": "syntax error", "request": raw_command}) else: - self.processor.push_request(self.session,command) + self.dispatcher.push_request(self.session,command) return True class TcpServer(threading.Thread): - def __init__(self, shared, processor, host, port): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile): + self.shared = dispatcher.shared + self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port self.lock = threading.Lock() + self.use_ssl = use_ssl + self.ssl_keyfile = ssl_keyfile + self.ssl_certfile = ssl_certfile def run(self): - print "TCP server started." + if self.use_ssl: + print_log( "TCP/SSL server started.") + else: + print_log( "TCP server started.") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) while not self.shared.stopped(): - session = TcpSession(*sock.accept()) - client_req = TcpClientRequestor(self.shared, self.processor, session) + session = TcpSession(*sock.accept(), use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile) + self.dispatcher.add_session(session) + self.dispatcher.collect_garbage() + client_req = TcpClientRequestor(self.dispatcher, session) client_req.start() - self.processor.add_session(session) - self.processor.collect_garbage() - - -