X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=blobdiff_plain;f=stratum.py;h=f364d935725cf8ab2297103d96d8e34de0e7d774;hp=649987ab4a9c4b36f9d9db155baa7dd76570206b;hb=70f23254388eb74ca7a05009ccf2a5d7a723ec36;hpb=36302849173b7038eea5b6701000ecede3a30030 diff --git a/stratum.py b/stratum.py index 649987a..f364d93 100644 --- a/stratum.py +++ b/stratum.py @@ -4,90 +4,14 @@ import threading import time import Queue as queue -class Processor(threading.Thread): +from processor import Session, Dispatcher, Shared - def __init__(self): - self.shared = None - threading.Thread.__init__(self) - self.daemon = True - self.request_queue = queue.Queue() - self.response_queue = queue.Queue() - self.internal_ids = {} - self.internal_id = 1 - self.lock = threading.Lock() - - def push_response(self, item): - self.response_queue.put(item) - - def pop_response(self): - return self.response_queue.get() - - def push_request(self, session, item): - self.request_queue.put((session,item)) - - def pop_request(self): - return self.request_queue.get() - - def get_session_id(self, internal_id): - with self.lock: - return self.internal_ids.pop(internal_id) - - def store_session_id(self, session, msgid): - with self.lock: - self.internal_ids[self.internal_id] = session, msgid - r = self.internal_id - self.internal_id += 1 - return r - - def run(self): - if self.shared is None: - raise TypeError("self.shared not set in Processor") - while not self.shared.stopped(): - session, request = self.pop_request() - - method = request['method'] - params = request.get('params',[]) - - if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']: - session.subscribe_to_service(method, params) - - # store session and id locally - request['id'] = self.store_session_id(session, request['id']) - self.process(request) - - self.stop() - - def stop(self): - pass - - def process(self, request): - print "New request", request - # Do stuff... - # response = request - # When ready, you call - # self.push_response(response) - - - -class Session: +class TcpSession(Session): def __init__(self, connection, address): self._connection = connection self.address = address - self._stopped = False - self.lock = threading.Lock() - self.subscriptions = [] - print "new session", address - - def stop(self): - self._connection.close() - print "Terminating connection:", self.address[0] - with self.lock: - self._stopped = True - - def stopped(self): - with self.lock: - return self._stopped + Session.__init__(self) def connection(self): if self.stopped(): @@ -95,55 +19,24 @@ class Session: else: return self._connection - def subscribe_to_service(self, method, params): + def stop(self): + self._connection.close() + print "Terminating connection:", self.address[0] with self.lock: - self.subscriptions.append((method, params)) - - - -class TcpResponder(threading.Thread): - - def __init__(self, shared, processor, server): - self.shared = shared - self.processor = processor - self.server = server - threading.Thread.__init__(self) - - - def run(self): - while not self.shared.stopped(): - response = self.processor.pop_response() - internal_id = response.get('id') - params = response.get('params',[]) - try: - method = response['method'] - except: - print "no method", response - continue - - if internal_id: - session, message_id = self.processor.get_session_id(internal_id) - response['id'] = message_id - self.send_response(response, session) - - else: - for session in self.server.sessions: - if not session.stopped(): - if (method,params) in session.subscriptions: - self.send_response(response, session) - - + self._stopped = True - def send_response(self, response, session): + def send_response(self, response): raw_response = json.dumps(response) # Possible race condition here by having session # close connection? # I assume Python connections are thread safe interfaces try: - connection = session.connection() + connection = self.connection() connection.send(raw_response + "\n") except: - session.stop() + self.stop() + + class TcpClientRequestor(threading.Thread): @@ -213,7 +106,6 @@ class TcpServer(threading.Thread): def __init__(self, shared, processor, host, port): self.shared = shared self.processor = processor - self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.host = host @@ -226,46 +118,15 @@ class TcpServer(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) - responder = TcpResponder(self.shared, self.processor, self) - responder.start() while not self.shared.stopped(): - session = Session(*sock.accept()) + session = TcpSession(*sock.accept()) client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - self.add_session(session) - self.collect_garbage() - - def add_session(self, session): - with self.lock: - self.sessions.append(session) - - def collect_garbage(self): - # Deep copy entire sessions list and blank it - # This is done to minimise lock contention - with self.lock: - sessions = self.sessions[:] - self.sessions = [] - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - self.add_session(session) - + self.processor.add_session(session) + self.processor.collect_garbage() -class Shared: - - def __init__(self): - self.lock = threading.Lock() - self._stopped = False - def stop(self): - print "Stopping Stratum" - with self.lock: - self._stopped = True - def stopped(self): - with self.lock: - return self._stopped class Stratum: