X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=transports%2Fstratum_http.py;h=659bae033d139d33546202bdb9b3de74eb0d22eb;hb=561047a590b4f1546174c8dc8814d0523533aae1;hp=a7ef49d40bcb3b69d634bac3bad67f21e174d75e;hpb=5ee042c213baa4ae8a0f538de64644ae9abe7b36;p=electrum-server.git diff --git a/transports/stratum_http.py b/transports/stratum_http.py index a7ef49d..659bae0 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -47,11 +47,7 @@ from the processor point of view: """ -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - - +from processor import random_string def get_version(request): @@ -110,9 +106,12 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): if type(result) is Fault: responses.append(result.response()) continue - resp_entry = self._marshaled_single_dispatch(session_id, req_entry) - if resp_entry is not None: - responses.append(resp_entry) + + session = self.dispatcher.get_session_by_address(session_id) + self.dispatcher.process(session, req_entry) + + if req_entry['method'] == 'server.stop': + return json.dumps({'result':'ok'}) r = self.poll_session(session_id) for item in r: @@ -127,65 +126,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response - def _marshaled_single_dispatch(self, session_id, request): - # TODO - Use the multiprocessing and skip the response if - # it is a notification - # Put in support for custom dispatcher here - # (See SimpleXMLRPCServer._marshaled_dispatch) - method = request.get('method') - params = request.get('params') - try: - response = self._dispatch(method, session_id, request) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - if 'id' not in request.keys() or request['id'] == None: - # It's a notification - return None - - try: - response = jsonrpclib.dumps(response, - methodresponse=True, - rpcid=request['id'] - ) - return response - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - def _dispatch(self, method, session_id, request): - func = None - try: - func = self.funcs[method] - except KeyError: - if self.instance is not None: - if hasattr(self.instance, '_dispatch'): - return self.instance._dispatch(method, params) - else: - try: - func = SimpleXMLRPCServer.resolve_dotted_attribute( - self.instance, - method, - True - ) - except AttributeError: - pass - if func is not None: - try: - response = func(session_id, request) - return response - except TypeError: - return Fault(-32602, 'Invalid parameters.') - except: - err_lines = traceback.format_exc().splitlines() - trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) - fault = jsonrpclib.Fault(-32603, 'Server error: %s' % - trace_string) - return fault - else: - return Fault(-32601, 'Method %s not supported.' % method) class StratumJSONRPCRequestHandler( SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): @@ -223,6 +164,7 @@ class StratumJSONRPCRequestHandler( self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") + self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) @@ -248,12 +190,12 @@ class StratumJSONRPCRequestHandler( c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': - print "found cookie", c[8:] + #print "found cookie", c[8:] session_id = c[8:] if session_id is None: session_id = self.server.create_session() - print "setting cookie", session_id + #print "setting cookie", session_id response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) @@ -271,6 +213,7 @@ class StratumJSONRPCRequestHandler( self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") + self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) @@ -311,34 +254,39 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - self.sessions = {} - def create_session(self): session_id = random_string(10) - self.sessions[session_id] = HttpSession(session_id) + session = HttpSession(session_id) + self.dispatcher.add_session(session) return session_id - def poll_session(self,session_id): - responses = self.sessions[session_id].pending_responses[:] - self.sessions[session_id].pending_responses = [] - print "poll: %d responses"%len(responses) + def poll_session(self, session_id): + session = self.dispatcher.get_session_by_address(session_id) + q = session.pending_responses + responses = [] + while not q.empty(): + r = q.get() + responses.append(r) + #print "poll: %d responses"%len(responses) return responses from processor import Session +import Queue class HttpSession(Session): def __init__(self, session_id): Session.__init__(self) - self.pending_responses = [] - print "new http session", session_id + self.pending_responses = Queue.Queue() + self.address = session_id + self.name = "HTTP session" def send_response(self, response): raw_response = json.dumps(response) - self.pending_responses.append(response) + self.pending_responses.put(response) class HttpServer(threading.Thread): def __init__(self, dispatcher, host, port): @@ -354,27 +302,12 @@ class HttpServer(threading.Thread): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass - self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) - for s in ['server.peers.subscribe', 'server.banner', 'blockchain.transaction.broadcast', \ - 'blockchain.address.get_history','blockchain.address.subscribe', \ - 'blockchain.numblocks.subscribe', 'client.version' ]: - self.server.register_function(self.process, s) - self.server.register_function(self.do_stop, 'stop') + self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) + self.server.dispatcher = self.dispatcher + self.server.register_function(None, 'server.stop') + self.server.register_function(None, 'server.info') print "HTTP server started." self.server.serve_forever() - - def process(self, session_id, request): - #print session, request - session = self.server.sessions.get(session_id) - if session: - self.dispatcher.process(session, request) - - def do_stop(self, session, request): - self.shared.stop() - return 'ok' - - -