self.history_cache.pop(address)
if address in self.watched_addresses:
+ # TODO: update cache here. if new value equals cached value, do not send notification
self.address_queue.put(address)
def main_iteration(self):
self.internal_ids = {}
self.internal_id = 1
self.lock = threading.Lock()
+ self.idlock = threading.Lock()
self.sessions = []
self.processors = {}
return x
def get_session_id(self, internal_id):
- with self.lock:
+ with self.idlock:
return self.internal_ids.pop(internal_id)
def store_session_id(self, session, msgid):
- with self.lock:
+ with self.idlock:
self.internal_ids[self.internal_id] = session, msgid
r = self.internal_id
self.internal_id += 1
suffix = method.split('.')[-1]
if session is not None:
- is_new = session.protocol_version >= 0.5
if suffix == 'subscribe':
session.subscribe_to_service(method, params)
except:
pass
- #if session.protocol_version < 0.6:
- # print_log("stopping session from old client", session.protocol_version)
- # session.stop()
def get_sessions(self):
with self.lock:
import socket
import threading
import time
+import traceback, sys
from processor import Session, Dispatcher
from utils import print_log
return self._connection
def stop(self):
+ if self.stopped():
+ return
+
+ try:
+ self._connection.shutdown(socket.SHUT_RDWR)
+ except:
+ # print_log("problem shutting down", self.address)
+ # traceback.print_exc(file=sys.stdout)
+ pass
+
self._connection.close()
- #print "Terminating connection:", self.address
with self.lock:
self._stopped = True
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.host, self.port))
- sock.listen(1)
+ sock.listen(5)
+
while not self.shared.stopped():
+
try:
connection, address = sock.accept()
+ except:
+ traceback.print_exc(file=sys.stdout)
+ time.sleep(0.1)
+ continue
+
+ try:
session = TcpSession(connection, address, use_ssl=self.use_ssl, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile)
except BaseException, e:
error = str(e)
print_log("cannot start TCP session", error, address)
+ connection.close()
time.sleep(0.1)
continue
+
self.dispatcher.add_session(session)
self.dispatcher.collect_garbage()
client_req = TcpClientRequestor(self.dispatcher, session)