From c3cbca85eb03efbd8ff1fd46b6d86364b13a341f Mon Sep 17 00:00:00 2001 From: ThomasV Date: Tue, 27 Mar 2012 23:27:50 +0400 Subject: [PATCH] refactoring --- StratumJSONRPCServer.py | 153 +++++++++++++++++++++++++++++++++------------- 1 files changed, 110 insertions(+), 43 deletions(-) diff --git a/StratumJSONRPCServer.py b/StratumJSONRPCServer.py index b595feb..5929cce 100644 --- a/StratumJSONRPCServer.py +++ b/StratumJSONRPCServer.py @@ -25,7 +25,8 @@ import logging import os import types import traceback -import sys +import sys, threading + try: import fcntl except ImportError: @@ -34,6 +35,25 @@ except ImportError: import json + +""" +sessions are identified with cookies + - each session has a buffer of responses to requests + + +from the processor point of view: + - the user only defines process() ; the rest is session management. thus sessions should not belong to processor + +""" + + +def random_string(N): + import random, string + return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) + + + + def get_version(request): # must be a dict if 'jsonrpc' in request.keys(): @@ -72,7 +92,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): allow_none=True, encoding=encoding) - def _marshaled_dispatch(self, data, dispatch_method = None): + def _marshaled_dispatch(self, session_id, data, dispatch_method = None): response = None try: request = jsonrpclib.loads(data) @@ -90,14 +110,11 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): if type(result) is Fault: responses.append(result.response()) continue - resp_entry = self._marshaled_single_dispatch(req_entry) + resp_entry = self._marshaled_single_dispatch(session_id, req_entry) if resp_entry is not None: responses.append(resp_entry) - # poll - r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' }) - r = jsonrpclib.loads(r) - r = r.get('result') + r = self.poll_session(session_id) for item in r: responses.append(json.dumps(item)) @@ -110,18 +127,15 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response - def _marshaled_single_dispatch(self, request): + def _marshaled_single_dispatch(self, session_id, request): # TODO - Use the multiprocessing and skip the response if # it is a notification # Put in support for custom dispatcher here # (See SimpleXMLRPCServer._marshaled_dispatch) method = request.get('method') params = request.get('params') - if params is None: params=[] - params = [ self.session_id, request['id'] ] + params - #print method, params try: - response = self._dispatch(method, params) + response = self._dispatch(method, session_id, request) except: exc_type, exc_value, exc_tb = sys.exc_info() fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) @@ -141,7 +155,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) return fault.response() - def _dispatch(self, method, params): + def _dispatch(self, method, session_id, request): func = None try: func = self.funcs[method] @@ -160,10 +174,7 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): pass if func is not None: try: - if type(params) is types.ListType: - response = func(*params) - else: - response = func(**params) + response = func(session_id, request) return response except TypeError: return Fault(-32602, 'Invalid parameters.') @@ -184,21 +195,19 @@ class StratumJSONRPCRequestHandler( self.report_404() return try: - self.server.session_id = None + session_id = None c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': - #print "found cookie", c[8:] - self.server.session_id = c[8:] + print "found cookie", c[8:] + session_id = c[8:] - if self.server.session_id is None: - r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' }) - r = jsonrpclib.loads(r) - self.server.session_id = r.get('result') - #print "setting cookie", self.server.session_id + if session_id is None: + session_id = self.server.create_session() + print "setting cookie", session_id data = json.dumps([]) - response = self.server._marshaled_dispatch(data) + response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) except Exception, e: self.send_response(500) @@ -210,10 +219,8 @@ class StratumJSONRPCRequestHandler( if response == None: response = '' - if hasattr(self.server, 'session_id'): - if self.server.session_id: - self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id) - self.session_id = None + if session_id: + self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Content-length", str(len(response))) @@ -237,20 +244,18 @@ class StratumJSONRPCRequestHandler( size_remaining -= len(L[-1]) data = ''.join(L) - self.server.session_id = None + session_id = None c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': - #print "found cookie", c[8:] - self.server.session_id = c[8:] + print "found cookie", c[8:] + session_id = c[8:] - if self.server.session_id is None: - r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' }) - r = jsonrpclib.loads(r) - self.server.session_id = r.get('result') - #print "setting cookie", self.server.session_id + if session_id is None: + session_id = self.server.create_session() + print "setting cookie", session_id - response = self.server._marshaled_dispatch(data) + response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) except Exception, e: self.send_response(500) @@ -262,10 +267,8 @@ class StratumJSONRPCRequestHandler( if response == None: response = '' - if hasattr(self.server, 'session_id'): - if self.server.session_id: - self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id) - self.session_id = None + if session_id: + self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Content-length", str(len(response))) @@ -308,4 +311,68 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) + self.sessions = {} + + + + def create_session(self): + session_id = random_string(10) + self.sessions[session_id] = { 'addresses':{}, 'responses':[]} + return session_id + + def poll_session(self,session_id): + responses = self.sessions[session_id]['responses'] + self.sessions[session_id]['responses'] = [] + print "poll: %d responses"%len(responses) + return responses + + +class HttpResponder(threading.Thread): + """read responses from the queue and dispatch them to sessions""" + def __init__(self, shared, processor): + self.shared = shared + self.processor = processor + threading.Thread.__init__(self) + + def run(self): + while not self.shared.stopped(): + session,response = self.processor.pop_response() + if not session.stopped(): + raw_response = json.dumps(response) + session.responses.append(response) + + + +class HttpServer(threading.Thread): + def __init__(self, shared, _processor, host, port): + self.shared = shared + self.processor = _processor + threading.Thread.__init__(self) + self.daemon = True + self.host = host + self.port = port + self.lock = threading.Lock() + + def run(self): + # see http://code.google.com/p/jsonrpclib/ + from SocketServer import ThreadingMixIn + from StratumJSONRPCServer import StratumJSONRPCServer + class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass + server = StratumThreadedJSONRPCServer(( self.host, self.port)) + for s in ['server.peers', 'server.banner', 'transaction.broadcast', \ + 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']: + server.register_function(self.process, s) + + server.register_function(self.do_stop, 'stop') + print "HTTP server started." + server.serve_forever() + + def process(self, session, request): + print session, request + + def do_stop(self, session, request): + self.shared.stop() + return 'ok' + + -- 1.7.1