From: genjix Date: Wed, 21 Mar 2012 10:27:33 +0000 (+0000) Subject: Front-end cannibalised from old server code. X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=cc3952f7be9cea7ae84caa37ccea5a349a705c75 Front-end cannibalised from old server code. --- diff --git a/stratum.py b/stratum.py new file mode 100644 index 0000000..d0fd61e --- /dev/null +++ b/stratum.py @@ -0,0 +1,218 @@ +import json +import socket +import threading +import time +import Queue as queue + +class Processor(threading.Thread): + + def __init__(self): + self.shared = None + self.lock = threading.Lock() + self.sessions = [] + threading.Thread.__init__(self) + self.daemon = True + + def add_session(self, session): + with self.lock: + self.sessions.append(session) + + def run(self): + if self.shared is None: + raise TypeError("self.shared not set in Processor") + while not self.shared.stopped(): + # Deep copy entire sessions list and blank it + # This is done to minimise lock contention + with self.lock: + sessions = self.sessions[:] + self.sessions = [] + for session in sessions: + if not session.stopped(): + # If session is still alive then re-add it back + # to our internal register + self.add_session(session) + self.process(session) + + def process(self, session): + request = session.pop_request() + print "New request", request + # Execute and when ready, you call + # session.push_response(response) + +class Session: + + def __init__(self, connection, address): + self._connection = connection + self.address = address + self._stopped = False + self.lock = threading.Lock() + + self.request_queue = queue.Queue() + self.response_queue = queue.Queue() + + def stop(self): + self._connection.close() + print "Terminating connection:", self.address[0] + with self.lock: + self._stopped = True + + def stopped(self): + with self.lock: + return self._stopped + + def connection(self): + if self.stopped(): + raise Exception("Session was stopped") + else: + return self._connection + + def push_request(self, item): + self.request_queue.put(item) + + def pop_request(self): + return self.request_queue.get() + + def push_response(self): + self.response_queue.put(item) + + def pop_response(self): + return self.response_queue.get() + +class TcpClientResponder(threading.Thread): + + def __init__(self, shared, session): + self.shared = shared + self.session = session + threading.Thread.__init__(self) + + def run(self): + while not self.shared.stopped() or self.session.stopped(): + response = self.session.pop_response() + # Possible race condition here by having session + # close connection? + # I assume Python connections are thread safe interfaces + connection = self.session.connection() + try: + connection.send(response + "\n") + except: + self.session.stop() + +class TcpClientRequestor(threading.Thread): + + def __init__(self, shared, session): + self.shared = shared + self.message = "" + self.session = session + threading.Thread.__init__(self) + + def run(self): + while not self.shared.stopped(): + if not self.update(): + self.session.stop() + return + + def update(self): + data = self.receive() + if data is None: + # close_session + self.stop() + return False + + self.message += data + if not self.parse(): + return False + return True + + def receive(self): + try: + return self.session.connection().recv(1024) + except socket.error: + return None + + def parse(self): + while True: + raw_buffer = self.message.find('\n') + if raw_buffer == -1: + return True + + command = self.message[0:raw_buffer].strip() + self.message = self.message[raw_buffer + 1:] + if command == 'quit': + return False + + try: + command = json.loads(command) + except: + print "json error", repr(command) + continue + + try: + message_id = command.get('id') + method = command.get('method') + params = command.get('params') + except: + print "syntax error", repr(command), self.session.address[0] + continue + + self.session.push_request((message_id, method, params)) + print message_id, method, params + +class TcpServer(threading.Thread): + + def __init__(self, shared, processor): + self.shared = shared + self.processor = processor + self.clients = [] + threading.Thread.__init__(self) + self.daemon = True + + def run(self): + print "TCP server started." + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("localhost", 50001)) + sock.listen(1) + while not self.shared.stopped(): + session = Session(*sock.accept()) + client_req = TcpClientRequestor(self.shared, session) + client_req.start() + client_res = TcpClientResponder(self.shared, session) + client_res.start() + self.processor.add_session(session) + +class Shared: + + def __init__(self): + self.lock = threading.Lock() + self._stopped = False + + def stop(self): + print "Stopping Stratum" + with self.lock: + self._stopped = True + + def stopped(self): + with self.lock: + return self._stopped + +class Stratum: + + def start(self, processor): + shared = Shared() + # Bind shared to processor since constructor is user defined + processor.shared = shared + processor.start() + # Create various transports we need + transports = TcpServer(shared, processor), + for server in transports: + server.start() + while not shared.stopped(): + if raw_input() == "quit": + shared.stop() + time.sleep(1) + +if __name__ == "__main__": + processor = Processor() + app = Stratum() + app.start(processor) +