From: ThomasV Date: Sun, 25 Mar 2012 20:42:46 +0000 (+0400) Subject: use shared queues X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=9bb9585ad111f0e7a44301530ca29743baa9c4e3 use shared queues --- diff --git a/stratum.py b/stratum.py index ba3ab22..f34bd06 100644 --- a/stratum.py +++ b/stratum.py @@ -12,11 +12,25 @@ class Processor(threading.Thread): self.sessions = [] threading.Thread.__init__(self) self.daemon = True + self.request_queue = queue.Queue() + self.response_queue = queue.Queue() def add_session(self, session): with self.lock: self.sessions.append(session) + def push_response(self, session, item): + self.response_queue.put((session,item)) + + def pop_response(self): + return self.response_queue.get() + + def push_request(self, session, item): + self.request_queue.put((session,item)) + + def pop_request(self): + return self.request_queue.get() + def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") @@ -31,17 +45,21 @@ class Processor(threading.Thread): # If session is still alive then re-add it back # to our internal register self.add_session(session) - self.process(session) + + session, request = self.pop_request() + self.process(session, request) + self.stop() def stop(self): pass - def process(self, session): - request = session.pop_request() + def process(self, session, request): print "New request", request - # Execute and when ready, you call - # session.push_response(response) + # Do stuff... + # response = request + # When ready, you call + # self.push_response(session,response) class Session: @@ -50,9 +68,6 @@ class Session: self.address = address self._stopped = False self.lock = threading.Lock() - - self.request_queue = queue.Queue() - self.response_queue = queue.Queue() self.numblocks_sub = None self.addresses_sub = {} @@ -72,18 +87,6 @@ class Session: else: return self._connection - def push_request(self, item): - self.request_queue.put(item) - - def pop_request(self): - return self.request_queue.get() - - def push_response(self, item): - self.response_queue.put(item) - - def pop_response(self): - return self.response_queue.get() - def subscribe_to_numblocks(self,message_id): with self.lock: self.numblocks_sub = message_id @@ -93,30 +96,31 @@ class Session: self.addresses_sub[address] = message_id,status -class TcpClientResponder(threading.Thread): +class TcpResponder(threading.Thread): - def __init__(self, shared, session): + def __init__(self, shared, processor): self.shared = shared - self.session = session + self.processor = processor threading.Thread.__init__(self) def run(self): - while not self.shared.stopped() or self.session.stopped(): - response = self.session.pop_response() + while not self.shared.stopped(): + session,response = self.processor.pop_response() raw_response = json.dumps(response) # Possible race condition here by having session # close connection? # I assume Python connections are thread safe interfaces - connection = self.session.connection() + connection = session.connection() try: connection.send(raw_response + "\n") except: - self.session.stop() + session.stop() class TcpClientRequestor(threading.Thread): - def __init__(self, shared, session): + def __init__(self, shared, processor, session): self.shared = shared + self.processor = processor self.message = "" self.session = session threading.Thread.__init__(self) @@ -158,7 +162,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.session.push_response( + self.processor.push_response(self.session, {"error": "bad JSON", "request": raw_command}) return True @@ -169,10 +173,10 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.session.push_response( + self.processor.push_response(self.session, {"error": "syntax error", "request": raw_command}) else: - self.session.push_request(command) + self.processor.push_request(self.session,command) return True @@ -193,12 +197,12 @@ class TcpServer(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) + responder = TcpResponder(self.shared, self.processor) + responder.start() while not self.shared.stopped(): session = Session(*sock.accept()) - client_req = TcpClientRequestor(self.shared, session) + client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - client_res = TcpClientResponder(self.shared, session) - client_res.start() self.processor.add_session(session) class Shared: @@ -224,7 +228,7 @@ class Stratum: processor.shared = shared processor.start() # Create various transports we need - transports = TcpServer(shared, processor, "176.31.24.241",50001), + transports = TcpServer(shared, processor, "ecdsa.org", 50002), for server in transports: server.start() while not shared.stopped():