X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=stratum.py;h=f364d935725cf8ab2297103d96d8e34de0e7d774;hb=70f23254388eb74ca7a05009ccf2a5d7a723ec36;hp=6643815ef3c5e5c2dbc044da0e67cc29d9ef2c80;hpb=5c89878db307ef2c0687a801751eeec2997d6db6;p=electrum-server.git diff --git a/stratum.py b/stratum.py index 6643815..f364d93 100644 --- a/stratum.py +++ b/stratum.py @@ -4,65 +4,14 @@ import threading import time import Queue as queue -class Processor(threading.Thread): +from processor import Session, Dispatcher, Shared - def __init__(self): - self.shared = None - self.lock = threading.Lock() - self.sessions = [] - threading.Thread.__init__(self) - self.daemon = True - - def add_session(self, session): - with self.lock: - self.sessions.append(session) - - def run(self): - if self.shared is None: - raise TypeError("self.shared not set in Processor") - while not self.shared.stopped(): - # Deep copy entire sessions list and blank it - # This is done to minimise lock contention - with self.lock: - sessions = self.sessions[:] - self.sessions = [] - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - self.add_session(session) - self.process(session) - self.stop() - - def stop(self): - pass - - def process(self, session): - request = session.pop_request() - print "New request", request - # Execute and when ready, you call - # session.push_response(response) - -class Session: +class TcpSession(Session): def __init__(self, connection, address): self._connection = connection self.address = address - self._stopped = False - self.lock = threading.Lock() - - self.request_queue = queue.Queue() - self.response_queue = queue.Queue() - - def stop(self): - self._connection.close() - print "Terminating connection:", self.address[0] - with self.lock: - self._stopped = True - - def stopped(self): - with self.lock: - return self._stopped + Session.__init__(self) def connection(self): if self.stopped(): @@ -70,42 +19,30 @@ class Session: else: return self._connection - def push_request(self, item): - self.request_queue.put(item) - - def pop_request(self): - return self.request_queue.get() - - def push_response(self, item): - self.response_queue.put(item) - - def pop_response(self): - return self.response_queue.get() + def stop(self): + self._connection.close() + print "Terminating connection:", self.address[0] + with self.lock: + self._stopped = True -class TcpClientResponder(threading.Thread): + def send_response(self, response): + raw_response = json.dumps(response) + # Possible race condition here by having session + # close connection? + # I assume Python connections are thread safe interfaces + try: + connection = self.connection() + connection.send(raw_response + "\n") + except: + self.stop() - def __init__(self, shared, session): - self.shared = shared - self.session = session - threading.Thread.__init__(self) - def run(self): - while not self.shared.stopped() or self.session.stopped(): - response = self.session.pop_response() - raw_response = json.dumps(response) - # Possible race condition here by having session - # close connection? - # I assume Python connections are thread safe interfaces - connection = self.session.connection() - try: - connection.send(raw_response + "\n") - except: - self.session.stop() class TcpClientRequestor(threading.Thread): - def __init__(self, shared, session): + def __init__(self, shared, processor, session): self.shared = shared + self.processor = processor self.message = "" self.session = session threading.Thread.__init__(self) @@ -113,42 +50,42 @@ class TcpClientRequestor(threading.Thread): def run(self): while not self.shared.stopped(): if not self.update(): - self.session.stop() - return + break + + while self.parse(): + pass def update(self): data = self.receive() - if data is None: + if not data: # close_session - self.stop() + self.session.stop() return False self.message += data - if not self.parse(): - return False return True def receive(self): try: return self.session.connection().recv(1024) - except socket.error: - return None + except: + return '' def parse(self): raw_buffer = self.message.find('\n') if raw_buffer == -1: - return True + return False raw_command = self.message[0:raw_buffer].strip() self.message = self.message[raw_buffer + 1:] if raw_command == 'quit': + self.session.stop() return False try: command = json.loads(raw_command) except: - self.session.push_response( - {"error": "bad JSON", "request": raw_command}) + self.processor.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -158,50 +95,38 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.session.push_response( - {"error": "syntax error", "request": raw_command}) + self.processor.push_response({"error": "syntax error", "request": raw_command}) else: - self.session.push_request(command) + self.processor.push_request(self.session,command) return True class TcpServer(threading.Thread): - def __init__(self, shared, processor): + def __init__(self, shared, processor, host, port): self.shared = shared self.processor = processor - self.clients = [] threading.Thread.__init__(self) self.daemon = True + self.host = host + self.port = port + self.lock = threading.Lock() def run(self): print "TCP server started." sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(("176.31.24.241", 50001)) + sock.bind((self.host, self.port)) sock.listen(1) while not self.shared.stopped(): - session = Session(*sock.accept()) - client_req = TcpClientRequestor(self.shared, session) + session = TcpSession(*sock.accept()) + client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - client_res = TcpClientResponder(self.shared, session) - client_res.start() self.processor.add_session(session) + self.processor.collect_garbage() -class Shared: - def __init__(self): - self.lock = threading.Lock() - self._stopped = False - - def stop(self): - print "Stopping Stratum" - with self.lock: - self._stopped = True - def stopped(self): - with self.lock: - return self._stopped class Stratum: @@ -211,7 +136,7 @@ class Stratum: processor.shared = shared processor.start() # Create various transports we need - transports = TcpServer(shared, processor), + transports = TcpServer(shared, processor, "176.31.24.241", 50001), for server in transports: server.start() while not shared.stopped():