From 638b04908c8a25b8e40511eb5b5b6c0fce99b99a Mon Sep 17 00:00:00 2001 From: ThomasV Date: Fri, 30 Mar 2012 02:24:32 +0400 Subject: [PATCH] simplification, server.stop --- modules/irc/__init__.py | 16 ++++++- server.py | 6 +- transports/stratum_http.py | 98 +++++--------------------------------------- 3 files changed, 28 insertions(+), 92 deletions(-) diff --git a/modules/irc/__init__.py b/modules/irc/__init__.py index 41c468e..83c3c63 100644 --- a/modules/irc/__init__.py +++ b/modules/irc/__init__.py @@ -14,6 +14,7 @@ class ServerProcessor(Processor): self.peers = {} self.banner = config.get('server','banner') self.host = config.get('server','host') + self.password = config.get('server','password') self.native_port = config.get('server','native_port') self.stratum_tcp_port = config.get('server','stratum_tcp_port') @@ -91,12 +92,25 @@ class ServerProcessor(Processor): method = request['method'] params = request['params'] result = None + if method == 'server.banner': result = self.banner.replace('\\n','\n') + elif method == 'server.peers.subscribe': result = self.get_peers() + elif method == 'server.version': print "version", params + + elif method == 'server.stop': + print "stopping..." + try: + password = request['params'][0] + except: + password = None + if password == self.password: + self.shared.stop() + result = 'ok' else: print "unknown method", request @@ -104,5 +118,3 @@ class ServerProcessor(Processor): response = { 'id':request['id'], 'method':method, 'params':params, 'result':result } self.push_response(response) - - diff --git a/server.py b/server.py index 8b67e49..0b13733 100755 --- a/server.py +++ b/server.py @@ -72,10 +72,10 @@ if __name__ == '__main__': if len(sys.argv)>1: import jsonrpclib - server = jsonrpclib.Server('http://%s:8081'%host) + server = jsonrpclib.Server('http://%s:%s'%(host,stratum_http_port)) cmd = sys.argv[1] if cmd == 'stop': - out = server.stop(password) + out = server.server.stop(password) else: out = "Unknown command: '%s'" % cmd print out @@ -96,7 +96,7 @@ if __name__ == '__main__': transports = [] if native_port: transports.append( NativeServer(shared, abe, sb, config.get('server','banner'), host, int(native_port)) ) if stratum_tcp_port: transports.append( TcpServer(dispatcher, host, int(stratum_tcp_port)) ) - if stratum_http_port: transports.append( HttpServer(dispatcher, host, int(stratum_http_port), password) ) + if stratum_http_port: transports.append( HttpServer(dispatcher, host, int(stratum_http_port)) ) for server in transports: server.start() diff --git a/transports/stratum_http.py b/transports/stratum_http.py index b9fe30c..419b1ad 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -110,9 +110,13 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): if type(result) is Fault: responses.append(result.response()) continue - resp_entry = self._marshaled_single_dispatch(session_id, req_entry) - if resp_entry is not None: - responses.append(resp_entry) + + session = self.sessions.get(session_id) + if session: + self.dispatcher.process(session, req_entry) + if req_entry['method'] == 'server.stop': + return json.dumps({'result':'ok'}) + r = self.poll_session(session_id) for item in r: @@ -127,65 +131,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response - def _marshaled_single_dispatch(self, session_id, request): - # TODO - Use the multiprocessing and skip the response if - # it is a notification - # Put in support for custom dispatcher here - # (See SimpleXMLRPCServer._marshaled_dispatch) - method = request.get('method') - params = request.get('params') - try: - response = self._dispatch(method, session_id, request) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - if 'id' not in request.keys() or request['id'] == None: - # It's a notification - return None - - try: - response = jsonrpclib.dumps(response, - methodresponse=True, - rpcid=request['id'] - ) - return response - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - def _dispatch(self, method, session_id, request): - func = None - try: - func = self.funcs[method] - except KeyError: - if self.instance is not None: - if hasattr(self.instance, '_dispatch'): - return self.instance._dispatch(method, params) - else: - try: - func = SimpleXMLRPCServer.resolve_dotted_attribute( - self.instance, - method, - True - ) - except AttributeError: - pass - if func is not None: - try: - response = func(session_id, request) - return response - except TypeError: - return Fault(-32602, 'Invalid parameters.') - except: - err_lines = traceback.format_exc().splitlines() - trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) - fault = jsonrpclib.Fault(-32603, 'Server error: %s' % - trace_string) - return fault - else: - return Fault(-32601, 'Method %s not supported.' % method) class StratumJSONRPCRequestHandler( SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): @@ -341,46 +287,24 @@ class HttpSession(Session): self.pending_responses.append(response) class HttpServer(threading.Thread): - def __init__(self, dispatcher, host, port, password): + def __init__(self, dispatcher, host, port): self.shared = dispatcher.shared self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port - self.password = password self.lock = threading.Lock() def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass - self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) - for s in ['server.peers.subscribe', 'server.banner', 'blockchain.transaction.broadcast', \ - 'blockchain.address.get_history','blockchain.address.subscribe', \ - 'blockchain.numblocks.subscribe', 'client.version' ]: - self.server.register_function(self.process, s) - self.server.register_function(self.do_stop, 'stop') + self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) + self.server.dispatcher = self.dispatcher + self.server.register_function(None, 'server.stop') print "HTTP server started." self.server.serve_forever() - - def process(self, session_id, request): - #print session, request - session = self.server.sessions.get(session_id) - if session: - self.dispatcher.process(session, request) - - def do_stop(self, session, request): - try: - password = request['params'][0] - except: - password = None - if password == self.password: - self.shared.stop() - return 'ok' - - - -- 1.7.1