Front-end cannibalised from old server code.
authorgenjix <fake@lol.u>
Wed, 21 Mar 2012 10:27:33 +0000 (10:27 +0000)
committergenjix <fake@lol.u>
Wed, 21 Mar 2012 10:27:33 +0000 (10:27 +0000)
stratum.py [new file with mode: 0644]

diff --git a/stratum.py b/stratum.py
new file mode 100644 (file)
index 0000000..d0fd61e
--- /dev/null
@@ -0,0 +1,218 @@
+import json
+import socket
+import threading
+import time
+import Queue as queue
+
+class Processor(threading.Thread):
+
+    def __init__(self):
+        self.shared = None
+        self.lock = threading.Lock()
+        self.sessions = []
+        threading.Thread.__init__(self)
+        self.daemon = True
+
+    def add_session(self, session):
+        with self.lock:
+            self.sessions.append(session)
+
+    def run(self):
+        if self.shared is None:
+            raise TypeError("self.shared not set in Processor")
+        while not self.shared.stopped():
+            # Deep copy entire sessions list and blank it
+            # This is done to minimise lock contention
+            with self.lock:
+                sessions = self.sessions[:]
+                self.sessions = []
+            for session in sessions:
+                if not session.stopped():
+                    # If session is still alive then re-add it back
+                    # to our internal register
+                    self.add_session(session)
+                    self.process(session)
+
+    def process(self, session):
+        request = session.pop_request()
+        print "New request", request
+        # Execute and when ready, you call
+        # session.push_response(response)
+
+class Session:
+
+    def __init__(self, connection, address):
+        self._connection = connection
+        self.address = address
+        self._stopped = False
+        self.lock = threading.Lock()
+
+        self.request_queue = queue.Queue()
+        self.response_queue = queue.Queue()
+
+    def stop(self):
+        self._connection.close()
+        print "Terminating connection:", self.address[0]
+        with self.lock:
+            self._stopped = True
+
+    def stopped(self):
+        with self.lock:
+            return self._stopped
+
+    def connection(self):
+        if self.stopped():
+            raise Exception("Session was stopped")
+        else:
+            return self._connection
+
+    def push_request(self, item):
+        self.request_queue.put(item)
+
+    def pop_request(self):
+        return self.request_queue.get()
+
+    def push_response(self):
+        self.response_queue.put(item)
+
+    def pop_response(self):
+        return self.response_queue.get()
+
+class TcpClientResponder(threading.Thread):
+
+    def __init__(self, shared, session):
+        self.shared = shared
+        self.session = session
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not self.shared.stopped() or self.session.stopped():
+            response = self.session.pop_response()
+            # Possible race condition here by having session
+            # close connection?
+            # I assume Python connections are thread safe interfaces
+            connection = self.session.connection()
+            try:
+                connection.send(response + "\n")
+            except:
+                self.session.stop()
+
+class TcpClientRequestor(threading.Thread):
+
+    def __init__(self, shared, session):
+        self.shared = shared
+        self.message = ""
+        self.session = session
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not self.shared.stopped():
+            if not self.update():
+                self.session.stop()
+                return
+
+    def update(self):
+        data = self.receive()
+        if data is None:
+            # close_session
+            self.stop()
+            return False
+
+        self.message += data
+        if not self.parse():
+            return False
+        return True
+
+    def receive(self):
+        try:
+            return self.session.connection().recv(1024)
+        except socket.error:
+            return None
+
+    def parse(self):
+        while True:
+            raw_buffer = self.message.find('\n')
+            if raw_buffer == -1:
+                return True
+
+            command = self.message[0:raw_buffer].strip()
+            self.message = self.message[raw_buffer + 1:]
+            if command == 'quit': 
+                return False
+
+            try:
+                command = json.loads(command)
+            except:
+                print "json error", repr(command)
+                continue
+
+            try:
+                message_id = command.get('id')
+                method = command.get('method')
+                params = command.get('params')
+            except:
+                print "syntax error", repr(command), self.session.address[0]
+                continue
+
+            self.session.push_request((message_id, method, params))
+            print message_id, method, params
+
+class TcpServer(threading.Thread):
+
+    def __init__(self, shared, processor):
+        self.shared = shared
+        self.processor = processor
+        self.clients = []
+        threading.Thread.__init__(self)
+        self.daemon = True
+
+    def run(self):
+        print "TCP server started."
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.bind(("localhost", 50001))
+        sock.listen(1)
+        while not self.shared.stopped():
+            session = Session(*sock.accept())
+            client_req = TcpClientRequestor(self.shared, session)
+            client_req.start()
+            client_res = TcpClientResponder(self.shared, session)
+            client_res.start()
+            self.processor.add_session(session)
+
+class Shared:
+
+    def __init__(self):
+        self.lock = threading.Lock()
+        self._stopped = False
+
+    def stop(self):
+        print "Stopping Stratum"
+        with self.lock:
+            self._stopped = True
+
+    def stopped(self):
+        with self.lock:
+            return self._stopped
+
+class Stratum:
+
+    def start(self, processor):
+        shared = Shared()
+        # Bind shared to processor since constructor is user defined
+        processor.shared = shared
+        processor.start()
+        # Create various transports we need
+        transports = TcpServer(shared, processor),
+        for server in transports:
+            server.start()
+        while not shared.stopped():
+            if raw_input() == "quit":
+                shared.stop()
+            time.sleep(1)
+
+if __name__ == "__main__":
+    processor = Processor()
+    app = Stratum()
+    app.start(processor)
+