2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
18 sessions are identified with cookies
19 - each session has a buffer of responses to requests
22 from the processor point of view:
23 - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
30 import SimpleXMLRPCServer
40 from jsonrpclib import Fault
41 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
42 from OpenSSL import SSL
51 from processor import Session
52 from utils import random_string, print_log
55 def get_version(request):
57 if 'jsonrpc' in request.keys():
59 if 'id' in request.keys():
64 def validate_request(request):
65 if not isinstance(request, types.DictType):
66 return Fault(-32600, 'Request must be {}, not %s.' % type(request))
67 rpcid = request.get('id', None)
68 version = get_version(request)
70 return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
71 request.setdefault('params', [])
72 method = request.get('method', None)
73 params = request.get('params')
74 param_types = (types.ListType, types.DictType, types.TupleType)
75 if not method or type(method) not in types.StringTypes or type(params) not in param_types:
76 return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid)
80 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
82 def __init__(self, encoding=None):
84 SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding)
86 def _marshaled_dispatch(self, session_id, data, dispatch_method=None):
89 request = jsonrpclib.loads(data)
91 fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
92 response = fault.response()
95 session = self.dispatcher.get_session_by_address(session_id)
97 return 'Error: session not found'
98 session.time = time.time()
101 if not isinstance(request, types.ListType):
104 for req_entry in request:
105 result = validate_request(req_entry)
106 if type(result) is Fault:
107 responses.append(result.response())
110 self.dispatcher.do_dispatch(session, req_entry)
112 if req_entry['method'] == 'server.stop':
113 return json.dumps({'result': 'ok'})
115 r = self.poll_session(session)
117 responses.append(json.dumps(item))
119 if len(responses) > 1:
120 response = '[%s]' % ','.join(responses)
121 elif len(responses) == 1:
122 response = responses[0]
128 def create_session(self):
129 session_id = random_string(20)
130 session = HttpSession(self.dispatcher, session_id)
133 def poll_session(self, session):
134 q = session.pending_responses
139 #print "poll: %d responses"%len(responses)
143 class StratumJSONRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
145 def do_OPTIONS(self):
146 self.send_response(200)
147 self.send_header('Allow', 'GET, POST, OPTIONS')
148 self.send_header('Access-Control-Allow-Origin', '*')
149 self.send_header('Access-Control-Allow-Headers', 'Cache-Control, Content-Language, Content-Type, Expires, Last-Modified, Pragma, Accept-Language, Accept, Origin')
150 self.send_header('Content-Length', '0')
154 if not self.is_rpc_path_valid():
159 c = self.headers.get('cookie')
161 if c[0:8] == 'SESSION=':
162 #print "found cookie", c[8:]
165 if session_id is None:
166 session_id = self.server.create_session()
167 #print "setting cookie", session_id
169 data = json.dumps([])
170 response = self.server._marshaled_dispatch(session_id, data)
171 self.send_response(200)
173 self.send_response(500)
174 err_lines = traceback.format_exc().splitlines()
175 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
176 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
177 response = fault.response()
178 print "500", trace_string
183 self.send_header("Set-Cookie", "SESSION=%s" % session_id)
185 self.send_header("Content-type", "application/json-rpc")
186 self.send_header("Access-Control-Allow-Origin", "*")
187 self.send_header("Content-length", str(len(response)))
189 self.wfile.write(response)
191 self.shutdown_connection()
194 if not self.is_rpc_path_valid():
198 max_chunk_size = 10*1024*1024
199 size_remaining = int(self.headers["content-length"])
201 while size_remaining:
202 chunk_size = min(size_remaining, max_chunk_size)
203 L.append(self.rfile.read(chunk_size))
204 size_remaining -= len(L[-1])
208 c = self.headers.get('cookie')
210 if c[0:8] == 'SESSION=':
211 #print "found cookie", c[8:]
214 if session_id is None:
215 session_id = self.server.create_session()
216 #print "setting cookie", session_id
218 response = self.server._marshaled_dispatch(session_id, data)
219 self.send_response(200)
221 self.send_response(500)
222 err_lines = traceback.format_exc().splitlines()
223 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
224 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
225 response = fault.response()
226 print "500", trace_string
231 self.send_header("Set-Cookie", "SESSION=%s" % session_id)
233 self.send_header("Content-type", "application/json-rpc")
234 self.send_header("Access-Control-Allow-Origin", "*")
235 self.send_header("Content-length", str(len(response)))
237 self.wfile.write(response)
239 self.shutdown_connection()
241 def shutdown_connection(self):
242 self.connection.shutdown(1)
245 class SSLRequestHandler(StratumJSONRPCRequestHandler):
247 self.connection = self.request
248 self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
249 self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
251 def shutdown_connection(self):
252 self.connection.shutdown()
255 class SSLTCPServer(SocketServer.TCPServer):
256 def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
257 SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
258 ctx = SSL.Context(SSL.SSLv23_METHOD)
259 ctx.use_privatekey_file(keyfile)
260 ctx.use_certificate_file(certfile)
261 self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
262 if bind_and_activate:
264 self.server_activate()
266 def shutdown_request(self, request):
271 class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
273 allow_reuse_address = True
275 def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
276 logRequests=False, encoding=None, bind_and_activate=True,
277 address_family=socket.AF_INET):
278 self.logRequests = logRequests
279 StratumJSONRPCDispatcher.__init__(self, encoding)
280 # TCPServer.__init__ has an extra parameter on 2.6+, so
281 # check Python version and decide on how to call it
282 vi = sys.version_info
283 self.address_family = address_family
284 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
285 # Unix sockets can't be bound if they already exist in the
286 # filesystem. The convention of e.g. X11 is to unlink
287 # before binding again.
288 if os.path.exists(addr):
292 logging.warning("Could not unlink socket %s", addr)
294 SocketServer.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)
296 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
297 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
298 flags |= fcntl.FD_CLOEXEC
299 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
302 class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher):
304 allow_reuse_address = True
306 def __init__(self, addr, certfile, keyfile,
307 requestHandler=SSLRequestHandler,
308 logRequests=False, encoding=None, bind_and_activate=True,
309 address_family=socket.AF_INET):
311 self.logRequests = logRequests
312 StratumJSONRPCDispatcher.__init__(self, encoding)
313 # TCPServer.__init__ has an extra parameter on 2.6+, so
314 # check Python version and decide on how to call it
315 vi = sys.version_info
316 self.address_family = address_family
317 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
318 # Unix sockets can't be bound if they already exist in the
319 # filesystem. The convention of e.g. X11 is to unlink
320 # before binding again.
321 if os.path.exists(addr):
325 logging.warning("Could not unlink socket %s", addr)
327 SSLTCPServer.__init__(self, addr, certfile, keyfile, requestHandler, bind_and_activate)
329 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
330 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
331 flags |= fcntl.FD_CLOEXEC
332 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
335 class HttpSession(Session):
337 def __init__(self, dispatcher, session_id):
338 Session.__init__(self, dispatcher)
339 self.pending_responses = Queue.Queue()
340 self.address = session_id
343 self.dispatcher.add_session(self)
345 def send_response(self, response):
346 raw_response = json.dumps(response)
347 self.pending_responses.put(response)
351 class HttpServer(threading.Thread):
352 def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile):
353 self.shared = dispatcher.shared
354 self.dispatcher = dispatcher.request_dispatcher
355 threading.Thread.__init__(self)
359 self.use_ssl = use_ssl
360 self.certfile = certfile
361 self.keyfile = keyfile
362 self.lock = threading.Lock()
365 # see http://code.google.com/p/jsonrpclib/
366 from SocketServer import ThreadingMixIn
368 class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer):
370 self.server = StratumThreadedServer((self.host, self.port), self.certfile, self.keyfile)
371 print_log("HTTPS server started.")
373 class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer):
375 self.server = StratumThreadedServer((self.host, self.port))
376 print_log("HTTP server started.")
378 self.server.dispatcher = self.dispatcher
379 self.server.register_function(None, 'server.stop')
380 self.server.register_function(None, 'server.info')
382 self.server.serve_forever()