X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=stratum.py;h=4a0a5423c612e89ce67f0be72424a92861ef7da2;hb=29597ac47dfa99532835b01d0bf90e4daa0cf213;hp=f89fafd5f1549cab6a4a13bd85d09344c5f3e0fd;hpb=ea96440732a78324f92dc6ec4ad6dcabefbe0a16;p=electrum-server.git diff --git a/stratum.py b/stratum.py index f89fafd..4a0a542 100644 --- a/stratum.py +++ b/stratum.py @@ -8,19 +8,16 @@ class Processor(threading.Thread): def __init__(self): self.shared = None - self.lock = threading.Lock() - self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.request_queue = queue.Queue() self.response_queue = queue.Queue() + self.internal_ids = {} + self.internal_id = 0 + self.lock = threading.Lock() - def add_session(self, session): - with self.lock: - self.sessions.append(session) - - def push_response(self, session, item): - self.response_queue.put((session,item)) + def push_response(self, item): + self.response_queue.put(item) def pop_response(self): return self.response_queue.get() @@ -31,57 +28,43 @@ class Processor(threading.Thread): def pop_request(self): return self.request_queue.get() + def get_session_id(self, internal_id): + with self.lock: + return session_ids.pop(internal_id) + + def store_session_id(self, session, msgid): + with self.lock: + self.internal_ids[self.internal_id] = session, msgid + self.internal_id += 1 + def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") while not self.shared.stopped(): - # Deep copy entire sessions list and blank it - # This is done to minimise lock contention - with self.lock: - sessions = self.sessions[:] - self.sessions = [] - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - self.add_session(session) - session, request = self.pop_request() - self.process(session, request) + + method = request['method'] + params = request.get('params',[]) + + if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']: + session.subscribe_to_service(method, params) + + # store session and id locally + request['id'] = self.store_session_id(session, request['id']) + self.process(request) self.stop() def stop(self): pass - def process(self, session, request): + def process(self, request): print "New request", request # Do stuff... # response = request # When ready, you call - # self.push_response(session,response) - - def update_from_blocknum(self,block_number): - for session in self.sessions: - if session.numblocks_sub is not None: - response = { 'id':session.numblocks_sub, 'result':block_number } - self.push_response(session,response) - - def update_from_address(self,addr): - for session in self.sessions: - m = session.addresses_sub.get(addr) - if m: - status = self.get_status( addr ) - message_id, last_status = m - if status != last_status: - session.subscribe_to_address(addr,message_id, status) - response = { 'id':message_id, 'result':status } - self.push_response(session,response) - - def get_status(self,addr): - # return status of an address - # return store.get_status(addr) - pass + # self.push_response(response) + class Session: @@ -91,8 +74,7 @@ class Session: self.address = address self._stopped = False self.lock = threading.Lock() - self.numblocks_sub = None - self.addresses_sub = {} + self.subscriptions = [] print "new session", address def stop(self): @@ -111,34 +93,57 @@ class Session: else: return self._connection - def subscribe_to_numblocks(self,message_id): + def subscribe_to_service(self, method, params): with self.lock: - self.numblocks_sub = message_id + self.subscriptions.append((method, params)) - def subscribe_to_address(self,address,message_id,status): - with self.lock: - self.addresses_sub[address] = message_id,status class TcpResponder(threading.Thread): - def __init__(self, shared, processor): + def __init__(self, shared, processor, server): self.shared = shared self.processor = processor + self.server = server threading.Thread.__init__(self) + def run(self): while not self.shared.stopped(): - session,response = self.processor.pop_response() - raw_response = json.dumps(response) - # Possible race condition here by having session - # close connection? - # I assume Python connections are thread safe interfaces + response = self.processor.pop_response() + + internal_id = response.get('id') + params = response.get('params',[]) try: - connection = session.connection() - connection.send(raw_response + "\n") + method = response['method'] except: - session.stop() + print "no method", response + continue + + if internal_id: + session, message_id = self.processor.get_session_id(internal_id) + if message_id: + response['id'] = message_id + self.send_response(response, session) + + else: + for session in self.server.sessions: + if not session.stopped(): + if (method,params) in session.subscriptions: + self.send_response(response, session) + + + + def send_response(self, response, session): + raw_response = json.dumps(response) + # Possible race condition here by having session + # close connection? + # I assume Python connections are thread safe interfaces + try: + connection = session.connection() + connection.send(raw_response + "\n") + except: + session.stop() class TcpClientRequestor(threading.Thread): @@ -187,8 +192,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.processor.push_response(self.session, - {"error": "bad JSON", "request": raw_command}) + self.processor.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -198,8 +202,7 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.processor.push_response(self.session, - {"error": "syntax error", "request": raw_command}) + self.processor.push_response({"error": "syntax error", "request": raw_command}) else: self.processor.push_request(self.session,command) @@ -210,11 +213,12 @@ class TcpServer(threading.Thread): def __init__(self, shared, processor, host, port): self.shared = shared self.processor = processor - self.clients = [] + self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port + self.lock = threading.Lock() def run(self): print "TCP server started." @@ -222,13 +226,31 @@ class TcpServer(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) - responder = TcpResponder(self.shared, self.processor) + responder = TcpResponder(self.shared, self.processor, self) responder.start() while not self.shared.stopped(): session = Session(*sock.accept()) client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - self.processor.add_session(session) + self.add_session(session) + self.collect_garbage() + + def add_session(self, session): + with self.lock: + self.sessions.append(session) + + def collect_garbage(self): + # Deep copy entire sessions list and blank it + # This is done to minimise lock contention + with self.lock: + sessions = self.sessions[:] + self.sessions = [] + for session in sessions: + if not session.stopped(): + # If session is still alive then re-add it back + # to our internal register + self.add_session(session) + class Shared: