sanity check session_id
[electrum-server.git] / transports / stratum_http.py
index 7a8ece2..ef738e3 100644 (file)
@@ -47,11 +47,7 @@ from the processor point of view:
 """
 
 
-def random_string(N):
-    import random, string
-    return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
-
-
+from processor import random_string
 
 
 def get_version(request):
@@ -101,6 +97,10 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
             response = fault.response()
             return response
 
+        session = self.dispatcher.get_session_by_address(session_id)
+        if not session:
+            return 'Error: session not found'
+
         responses = []
         if type(request) is not types.ListType:
             request = [ request ]
@@ -110,11 +110,13 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
             if type(result) is Fault:
                 responses.append(result.response())
                 continue
-            resp_entry = self._marshaled_single_dispatch(session_id, req_entry)
-            if resp_entry is not None:
-                responses.append(resp_entry)
 
-        r = self.poll_session(session_id)
+            self.dispatcher.process(session, req_entry)
+                
+            if req_entry['method'] == 'server.stop':
+                return json.dumps({'result':'ok'})
+
+        r = self.poll_session(session)
         for item in r:
             responses.append(json.dumps(item))
             
@@ -127,65 +129,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
 
         return response
 
-    def _marshaled_single_dispatch(self, session_id, request):
-        # TODO - Use the multiprocessing and skip the response if
-        # it is a notification
-        # Put in support for custom dispatcher here
-        # (See SimpleXMLRPCServer._marshaled_dispatch)
-        method = request.get('method')
-        params = request.get('params')
-        try:
-            response = self._dispatch(method, session_id, request)
-        except:
-            exc_type, exc_value, exc_tb = sys.exc_info()
-            fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
-            return fault.response()
-        if 'id' not in request.keys() or request['id'] == None:
-            # It's a notification
-            return None
-
-        try:
-            response = jsonrpclib.dumps(response,
-                                        methodresponse=True,
-                                        rpcid=request['id']
-                                        )
-            return response
-        except:
-            exc_type, exc_value, exc_tb = sys.exc_info()
-            fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
-            return fault.response()
 
-    def _dispatch(self, method, session_id, request):
-        func = None
-        try:
-            func = self.funcs[method]
-        except KeyError:
-            if self.instance is not None:
-                if hasattr(self.instance, '_dispatch'):
-                    return self.instance._dispatch(method, params)
-                else:
-                    try:
-                        func = SimpleXMLRPCServer.resolve_dotted_attribute(
-                            self.instance,
-                            method,
-                            True
-                            )
-                    except AttributeError:
-                        pass
-        if func is not None:
-            try:
-                response = func(session_id, request)
-                return response
-            except TypeError:
-                return Fault(-32602, 'Invalid parameters.')
-            except:
-                err_lines = traceback.format_exc().splitlines()
-                trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
-                fault = jsonrpclib.Fault(-32603, 'Server error: %s' % 
-                                         trace_string)
-                return fault
-        else:
-            return Fault(-32601, 'Method %s not supported.' % method)
 
 class StratumJSONRPCRequestHandler(
         SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
@@ -223,6 +167,7 @@ class StratumJSONRPCRequestHandler(
             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
 
         self.send_header("Content-type", "application/json-rpc")
+        self.send_header("Access-Control-Allow-Origin", "*")
         self.send_header("Content-length", str(len(response)))
         self.end_headers()
         self.wfile.write(response)
@@ -248,12 +193,12 @@ class StratumJSONRPCRequestHandler(
             c = self.headers.get('cookie')
             if c:
                 if c[0:8]=='SESSION=':
-                    print "found cookie", c[8:]
+                    #print "found cookie", c[8:]
                     session_id = c[8:]
 
             if session_id is None:
                 session_id = self.server.create_session()
-                print "setting cookie", session_id
+                #print "setting cookie", session_id
 
             response = self.server._marshaled_dispatch(session_id, data)
             self.send_response(200)
@@ -271,6 +216,7 @@ class StratumJSONRPCRequestHandler(
             self.send_header("Set-Cookie", "SESSION=%s"%session_id)
 
         self.send_header("Content-type", "application/json-rpc")
+        self.send_header("Access-Control-Allow-Origin", "*")
         self.send_header("Content-length", str(len(response)))
         self.end_headers()
         self.wfile.write(response)
@@ -311,39 +257,43 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
             flags |= fcntl.FD_CLOEXEC
             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
 
-        self.sessions = {}
-
 
 
     def create_session(self):
         session_id = random_string(10)
-        self.sessions[session_id] = HttpSession(session_id)
+        session = HttpSession(session_id)
+        self.dispatcher.add_session(session)
         return session_id
 
-    def poll_session(self,session_id):
-        responses = self.sessions[session_id].pending_responses[:]
-        self.sessions[session_id].pending_responses = []
-        print "poll: %d responses"%len(responses)
+    def poll_session(self, session):
+        q = session.pending_responses
+        responses = []
+        while not q.empty():
+            r = q.get()
+            responses.append(r)
+        #print "poll: %d responses"%len(responses)
         return responses
 
 
 from processor import Session
+import Queue
 
 class HttpSession(Session):
 
     def __init__(self, session_id):
         Session.__init__(self)
-        self.pending_responses = []
-        print "new http session", session_id
+        self.pending_responses = Queue.Queue()
+        self.address = session_id
+        self.name = "HTTP session"
 
     def send_response(self, response):
         raw_response = json.dumps(response)
-        self.pending_responses.append(response)
+        self.pending_responses.put(response)
 
 class HttpServer(threading.Thread):
-    def __init__(self, shared, _processor, host, port):
-        self.shared = shared
-        self.processor = _processor
+    def __init__(self, dispatcher, host, port):
+        self.shared = dispatcher.shared
+        self.dispatcher = dispatcher.request_dispatcher
         threading.Thread.__init__(self)
         self.daemon = True
         self.host = host
@@ -354,27 +304,12 @@ class HttpServer(threading.Thread):
         # see http://code.google.com/p/jsonrpclib/
         from SocketServer import ThreadingMixIn
         class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
-        self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
-        for s in ['server.peers.subscribe', 'server.banner', 'blockchain.transaction.broadcast', \
-                      'blockchain.address.get_history','blockchain.address.subscribe', \
-                      'blockchain.numblocks.subscribe', 'client.version' ]:
-            self.server.register_function(self.process, s)
 
-        self.server.register_function(self.do_stop, 'stop')
+        self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
+        self.server.dispatcher = self.dispatcher
+        self.server.register_function(None, 'server.stop')
+        self.server.register_function(None, 'server.info')
 
         print "HTTP server started."
         self.server.serve_forever()
 
-
-    def process(self, session_id, request):
-        #print session, request
-        session = self.server.sessions.get(session_id)
-        if session:
-            self.processor.process(session, request)
-
-    def do_stop(self, session, request):
-        self.shared.stop()
-        return 'ok'
-
-
-