self.address = address[0]
self.name = "TCP " if not use_ssl else "SSL "
+ self.response_queue = queue.Queue()
def do_handshake(self):
if self.use_ssl:
self._stopped = True
def send_response(self, response):
- data = json.dumps(response) + "\n"
- # Possible race condition here by having session
- # close connection?
- # I assume Python connections are thread safe interfaces
- try:
- connection = self.connection()
- while data:
- l = connection.send(data)
- data = data[l:]
- except:
- self.stop()
+ self.response_queue.put(response)
+
+
+class TcpClientResponder(threading.Thread):
+
+ def __init__(self, session):
+ self.session = session
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while not self.session.stopped():
+ try:
+ response = self.session.response_queue.get(timeout=10)
+ except queue.Empty:
+ continue
+ data = json.dumps(response) + "\n"
+ try:
+ while data:
+ l = self.session.connection().send(data)
+ data = data[l:]
+ except:
+ self.session.stop()
+
class TcpClientRequestor(threading.Thread):
try:
self.session.do_handshake()
except:
+ self.session.stop()
return
while not self.shared.stopped():
- if not self.update():
+
+ data = self.receive()
+ if not data:
+ self.session.stop()
break
+ self.message += data
self.session.time = time.time()
while self.parse():
pass
- def update(self):
- data = self.receive()
- if not data:
- # close_session
- self.session.stop()
- return False
-
- self.message += data
- return True
def receive(self):
try:
while not self.shared.stopped():
+ #if self.use_ssl: print_log("SSL: socket listening")
try:
connection, address = sock.accept()
except:
time.sleep(0.1)
continue
+ #if self.use_ssl: print_log("SSL: new session", address)
try:
session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
except BaseException, e:
self.dispatcher.collect_garbage()
client_req = TcpClientRequestor(self.dispatcher, session)
client_req.start()
+ responder = TcpClientResponder(session)
+ responder.start()