def __init__(self, session):
self.session = session
- self.connection = self.session.connection()
threading.Thread.__init__(self)
def run(self):
while not self.session.stopped():
- response = self.session.response_queue.get()
+ try:
+ response = self.session.response_queue.get(timeout=10)
+ except queue.Empty:
+ continue
data = json.dumps(response) + "\n"
try:
while data:
- l = self.connection.send(data)
+ l = self.session.connection().send(data)
data = data[l:]
except:
self.session.stop()
try:
self.session.do_handshake()
except:
+ self.session.stop()
return
while not self.shared.stopped():
- if not self.update():
+
+ data = self.receive()
+ if not data:
+ self.session.stop()
break
+ self.message += data
self.session.time = time.time()
while self.parse():
pass
- def update(self):
- data = self.receive()
- if not data:
- # close_session
- self.session.stop()
- return False
-
- self.message += data
- return True
def receive(self):
try:
self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
else:
self.dispatcher.push_request(self.session, command)
+ # sleep a bit to prevent a single session from DOSing the queue
+ time.sleep(0.01)
return True