8 from processor import Session, Dispatcher
9 from utils import print_log
12 class TcpSession(Session):
14 def __init__(self, connection, address, use_ssl, ssl_certfile, ssl_keyfile):
15 Session.__init__(self)
16 self.use_ssl = use_ssl
19 self._connection = ssl.wrap_socket(
22 certfile=ssl_certfile,
24 ssl_version=ssl.PROTOCOL_SSLv23,
25 do_handshake_on_connect=False)
27 self._connection = connection
29 self.address = address[0]
30 self.name = "TCP " if not use_ssl else "SSL "
31 self.response_queue = queue.Queue()
33 def do_handshake(self):
35 self._connection.do_handshake()
39 raise Exception("Session was stopped")
41 return self._connection
48 self._connection.shutdown(socket.SHUT_RDWR)
50 # print_log("problem shutting down", self.address)
51 # traceback.print_exc(file=sys.stdout)
54 self._connection.close()
58 def send_response(self, response):
59 self.response_queue.put(response)
62 class TcpClientResponder(threading.Thread):
64 def __init__(self, session):
65 self.session = session
66 threading.Thread.__init__(self)
69 while not self.session.stopped():
71 response = self.session.response_queue.get(timeout=10)
74 data = json.dumps(response) + "\n"
77 l = self.session.connection().send(data)
84 class TcpClientRequestor(threading.Thread):
86 def __init__(self, dispatcher, session):
87 self.shared = dispatcher.shared
88 self.dispatcher = dispatcher
90 self.session = session
91 threading.Thread.__init__(self)
95 self.session.do_handshake()
100 while not self.shared.stopped():
102 data = self.receive()
108 self.session.time = time.time()
116 return self.session.connection().recv(2048)
121 raw_buffer = self.message.find('\n')
125 raw_command = self.message[0:raw_buffer].strip()
126 self.message = self.message[raw_buffer + 1:]
127 if raw_command == 'quit':
132 command = json.loads(raw_command)
134 self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
138 # Try to load vital fields, and return an error if
140 message_id = command['id']
141 method = command['method']
143 # Return an error JSON in response.
144 self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
146 self.dispatcher.push_request(self.session, command)
147 # sleep a bit to prevent a single session from DOSing the queue
153 class TcpServer(threading.Thread):
155 def __init__(self, dispatcher, host, port, use_ssl, ssl_certfile, ssl_keyfile):
156 self.shared = dispatcher.shared
157 self.dispatcher = dispatcher.request_dispatcher
158 threading.Thread.__init__(self)
162 self.lock = threading.Lock()
163 self.use_ssl = use_ssl
164 self.ssl_keyfile = ssl_keyfile
165 self.ssl_certfile = ssl_certfile
168 print_log( ("SSL" if self.use_ssl else "TCP") + " server started on port %d"%self.port)
169 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
170 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
171 sock.bind((self.host, self.port))
174 while not self.shared.stopped():
176 #if self.use_ssl: print_log("SSL: socket listening")
178 connection, address = sock.accept()
180 traceback.print_exc(file=sys.stdout)
184 #if self.use_ssl: print_log("SSL: new session", address)
186 session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
187 except BaseException, e:
189 print_log("cannot start TCP session", error, address)
194 self.dispatcher.add_session(session)
195 self.dispatcher.collect_garbage()
196 client_req = TcpClientRequestor(self.dispatcher, session)
198 responder = TcpClientResponder(session)