refactoring
authorThomasV <thomasv@gitorious>
Tue, 27 Mar 2012 19:27:50 +0000 (23:27 +0400)
committerThomasV <thomasv@gitorious>
Tue, 27 Mar 2012 19:27:50 +0000 (23:27 +0400)
StratumJSONRPCServer.py

index b595feb..5929cce 100644 (file)
@@ -25,7 +25,8 @@ import logging
 import os
 import types
 import traceback
-import sys
+import sys, threading
+
 try:
     import fcntl
 except ImportError:
@@ -34,6 +35,25 @@ except ImportError:
 
 import json
 
+
+"""
+sessions are identified with cookies
+ - each session has a buffer of responses to requests
+
+
+from the processor point of view: 
+ - the user only defines process() ; the rest is session management.  thus sessions should not belong to processor
+
+"""
+
+
+def random_string(N):
+    import random, string
+    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+
+
+
+
 def get_version(request):
     # must be a dict
     if 'jsonrpc' in request.keys():
@@ -72,7 +92,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
                                         allow_none=True,
                                         encoding=encoding)
 
-    def _marshaled_dispatch(self, data, dispatch_method = None):
+    def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
         response = None
         try:
             request = jsonrpclib.loads(data)
@@ -90,14 +110,11 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
             if type(result) is Fault:
                 responses.append(result.response())
                 continue
-            resp_entry = self._marshaled_single_dispatch(req_entry)
+            resp_entry = self._marshaled_single_dispatch(session_id, req_entry)
             if resp_entry is not None:
                 responses.append(resp_entry)
 
-        # poll
-        r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' })
-        r = jsonrpclib.loads(r)
-        r = r.get('result')
+        r = self.poll_session(session_id)
         for item in r:
             responses.append(json.dumps(item))
             
@@ -110,18 +127,15 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
 
         return response
 
-    def _marshaled_single_dispatch(self, request):
+    def _marshaled_single_dispatch(self, session_id, request):
         # TODO - Use the multiprocessing and skip the response if
         # it is a notification
         # Put in support for custom dispatcher here
         # (See SimpleXMLRPCServer._marshaled_dispatch)
         method = request.get('method')
         params = request.get('params')
-        if params is None: params=[]
-        params = [ self.session_id, request['id'] ] + params
-        #print method, params
         try:
-            response = self._dispatch(method, params)
+            response = self._dispatch(method, session_id, request)
         except:
             exc_type, exc_value, exc_tb = sys.exc_info()
             fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
@@ -141,7 +155,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
             fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
             return fault.response()
 
-    def _dispatch(self, method, params):
+    def _dispatch(self, method, session_id, request):
         func = None
         try:
             func = self.funcs[method]
@@ -160,10 +174,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
                         pass
         if func is not None:
             try:
-                if type(params) is types.ListType:
-                    response = func(*params)
-                else:
-                    response = func(**params)
+                response = func(session_id, request)
                 return response
             except TypeError:
                 return Fault(-32602, 'Invalid parameters.')
@@ -184,21 +195,19 @@ class StratumJSONRPCRequestHandler(
             self.report_404()
             return
         try:
-            self.server.session_id = None
+            session_id = None
             c = self.headers.get('cookie')
             if c:
                 if c[0:8]=='SESSION=':
-                    #print "found cookie", c[8:]
-                    self.server.session_id = c[8:]
+                    print "found cookie", c[8:]
+                    session_id = c[8:]
 
-            if self.server.session_id is None:
-                r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
-                r = jsonrpclib.loads(r)
-                self.server.session_id = r.get('result')
-                #print "setting cookie", self.server.session_id
+            if session_id is None:
+                session_id = self.server.create_session()
+                print "setting cookie", session_id
 
             data = json.dumps([])
-            response = self.server._marshaled_dispatch(data)
+            response = self.server._marshaled_dispatch(session_id, data)
             self.send_response(200)
         except Exception, e:
             self.send_response(500)
@@ -210,10 +219,8 @@ class StratumJSONRPCRequestHandler(
         if response == None:
             response = ''
 
-        if hasattr(self.server, 'session_id'):
-            if self.server.session_id:
-                self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
-                self.session_id = None
+        if session_id:
+            self.send_header("Set-Cookie", "SESSION=%s"%session_id)
 
         self.send_header("Content-type", "application/json-rpc")
         self.send_header("Content-length", str(len(response)))
@@ -237,20 +244,18 @@ class StratumJSONRPCRequestHandler(
                 size_remaining -= len(L[-1])
             data = ''.join(L)
 
-            self.server.session_id = None
+            session_id = None
             c = self.headers.get('cookie')
             if c:
                 if c[0:8]=='SESSION=':
-                    #print "found cookie", c[8:]
-                    self.server.session_id = c[8:]
+                    print "found cookie", c[8:]
+                    session_id = c[8:]
 
-            if self.server.session_id is None:
-                r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
-                r = jsonrpclib.loads(r)
-                self.server.session_id = r.get('result')
-                #print "setting cookie", self.server.session_id
+            if session_id is None:
+                session_id = self.server.create_session()
+                print "setting cookie", session_id
 
-            response = self.server._marshaled_dispatch(data)
+            response = self.server._marshaled_dispatch(session_id, data)
             self.send_response(200)
         except Exception, e:
             self.send_response(500)
@@ -262,10 +267,8 @@ class StratumJSONRPCRequestHandler(
         if response == None:
             response = ''
 
-        if hasattr(self.server, 'session_id'):
-            if self.server.session_id:
-                self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
-                self.session_id = None
+        if session_id:
+            self.send_header("Set-Cookie", "SESSION=%s"%session_id)
 
         self.send_header("Content-type", "application/json-rpc")
         self.send_header("Content-length", str(len(response)))
@@ -308,4 +311,68 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
             flags |= fcntl.FD_CLOEXEC
             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
 
+        self.sessions = {}
+
+
+
+    def create_session(self):
+        session_id = random_string(10)
+        self.sessions[session_id] = { 'addresses':{}, 'responses':[]}
+        return session_id
+
+    def poll_session(self,session_id):
+        responses = self.sessions[session_id]['responses']
+        self.sessions[session_id]['responses'] = []
+        print "poll: %d responses"%len(responses)
+        return responses
+
+
+class HttpResponder(threading.Thread):
+    """read responses from the queue and dispatch them to sessions"""
+    def __init__(self, shared, processor):
+        self.shared = shared
+        self.processor = processor
+        threading.Thread.__init__(self)
+
+    def run(self):
+        while not self.shared.stopped():
+            session,response = self.processor.pop_response()
+            if not session.stopped():
+                raw_response = json.dumps(response)
+                session.responses.append(response)
+
+
+
+class HttpServer(threading.Thread):
+    def __init__(self, shared, _processor, host, port):
+        self.shared = shared
+        self.processor = _processor
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.host = host
+        self.port = port
+        self.lock = threading.Lock()
+
+    def run(self):
+        # see http://code.google.com/p/jsonrpclib/
+        from SocketServer import ThreadingMixIn
+        from StratumJSONRPCServer import StratumJSONRPCServer
+        class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
+        server = StratumThreadedJSONRPCServer(( self.host, self.port))
+        for s in ['server.peers', 'server.banner', 'transaction.broadcast', \
+                      'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']:
+            server.register_function(self.process, s)
+
+        server.register_function(self.do_stop, 'stop')
+        print "HTTP server started."
+        server.serve_forever()
+
+    def process(self, session, request):
+        print session, request
+
+    def do_stop(self, session, request):
+        self.shared.stop()
+        return 'ok'
+
 
+