2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
30 from OpenSSL import SSL
42 sessions are identified with cookies
43 - each session has a buffer of responses to requests
46 from the processor point of view:
47 - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
52 from processor import random_string
55 def get_version(request):
57 if 'jsonrpc' in request.keys():
59 if 'id' in request.keys():
63 def validate_request(request):
64 if type(request) is not types.DictType:
66 -32600, 'Request must be {}, not %s.' % type(request)
69 rpcid = request.get('id', None)
70 version = get_version(request)
72 fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
74 request.setdefault('params', [])
75 method = request.get('method', None)
76 params = request.get('params')
77 param_types = (types.ListType, types.DictType, types.TupleType)
78 if not method or type(method) not in types.StringTypes or \
79 type(params) not in param_types:
81 -32600, 'Invalid request parameters or method.', rpcid=rpcid
86 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
88 def __init__(self, encoding=None):
89 SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
93 def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
96 request = jsonrpclib.loads(data)
98 fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
99 response = fault.response()
102 session = self.dispatcher.get_session_by_address(session_id)
104 return 'Error: session not found'
105 session.time = time.time()
108 if type(request) is not types.ListType:
109 request = [ request ]
111 for req_entry in request:
112 result = validate_request(req_entry)
113 if type(result) is Fault:
114 responses.append(result.response())
117 self.dispatcher.do_dispatch(session, req_entry)
119 if req_entry['method'] == 'server.stop':
120 return json.dumps({'result':'ok'})
122 r = self.poll_session(session)
124 responses.append(json.dumps(item))
126 if len(responses) > 1:
127 response = '[%s]' % ','.join(responses)
128 elif len(responses) == 1:
129 response = responses[0]
136 def create_session(self):
137 session_id = random_string(10)
138 session = HttpSession(session_id)
139 self.dispatcher.add_session(session)
142 def poll_session(self, session):
143 q = session.pending_responses
148 #print "poll: %d responses"%len(responses)
154 class StratumJSONRPCRequestHandler(
155 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
157 def do_OPTIONS(self):
158 self.send_response(200)
159 self.send_header('Allow', 'GET, POST, OPTIONS')
160 self.send_header('Access-Control-Allow-Origin', '*')
161 self.send_header('Access-Control-Allow-Headers', '*')
162 self.send_header('Content-Length', '0')
166 if not self.is_rpc_path_valid():
171 c = self.headers.get('cookie')
173 if c[0:8]=='SESSION=':
174 #print "found cookie", c[8:]
177 if session_id is None:
178 session_id = self.server.create_session()
179 #print "setting cookie", session_id
181 data = json.dumps([])
182 response = self.server._marshaled_dispatch(session_id, data)
183 self.send_response(200)
185 self.send_response(500)
186 err_lines = traceback.format_exc().splitlines()
187 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
188 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
189 response = fault.response()
190 print "500", trace_string
195 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
197 self.send_header("Content-type", "application/json-rpc")
198 self.send_header("Access-Control-Allow-Origin", "*")
199 self.send_header("Content-length", str(len(response)))
201 self.wfile.write(response)
203 self.shutdown_connection()
207 if not self.is_rpc_path_valid():
211 max_chunk_size = 10*1024*1024
212 size_remaining = int(self.headers["content-length"])
214 while size_remaining:
215 chunk_size = min(size_remaining, max_chunk_size)
216 L.append(self.rfile.read(chunk_size))
217 size_remaining -= len(L[-1])
221 c = self.headers.get('cookie')
223 if c[0:8]=='SESSION=':
224 #print "found cookie", c[8:]
227 if session_id is None:
228 session_id = self.server.create_session()
229 #print "setting cookie", session_id
231 response = self.server._marshaled_dispatch(session_id, data)
232 self.send_response(200)
234 self.send_response(500)
235 err_lines = traceback.format_exc().splitlines()
236 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
237 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
238 response = fault.response()
239 print "500", trace_string
244 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
246 self.send_header("Content-type", "application/json-rpc")
247 self.send_header("Access-Control-Allow-Origin", "*")
248 self.send_header("Content-length", str(len(response)))
250 self.wfile.write(response)
252 self.shutdown_connection()
254 def shutdown_connection(self):
255 self.connection.shutdown(1)
258 class SSLRequestHandler(StratumJSONRPCRequestHandler):
260 self.connection = self.request
261 self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
262 self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
264 def shutdown_connection(self):
265 self.connection.shutdown()
268 class SSLTCPServer(SocketServer.TCPServer):
269 def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
270 SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
271 ctx = SSL.Context(SSL.SSLv3_METHOD)
272 ctx.use_privatekey_file(keyfile)
273 ctx.use_certificate_file(certfile)
274 self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
275 if bind_and_activate:
277 self.server_activate()
279 def shutdown_request(self,request):
284 class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
286 allow_reuse_address = True
288 def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
289 logRequests=False, encoding=None, bind_and_activate=True,
290 address_family=socket.AF_INET):
291 self.logRequests = logRequests
292 StratumJSONRPCDispatcher.__init__(self, encoding)
293 # TCPServer.__init__ has an extra parameter on 2.6+, so
294 # check Python version and decide on how to call it
295 vi = sys.version_info
296 self.address_family = address_family
297 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
298 # Unix sockets can't be bound if they already exist in the
299 # filesystem. The convention of e.g. X11 is to unlink
300 # before binding again.
301 if os.path.exists(addr):
305 logging.warning("Could not unlink socket %s", addr)
307 SocketServer.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)
309 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
310 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
311 flags |= fcntl.FD_CLOEXEC
312 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
315 class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher):
317 allow_reuse_address = True
319 def __init__(self, addr, certfile, keyfile,
320 requestHandler=SSLRequestHandler,
321 logRequests=False, encoding=None, bind_and_activate=True,
322 address_family=socket.AF_INET):
324 self.logRequests = logRequests
325 StratumJSONRPCDispatcher.__init__(self, encoding)
326 # TCPServer.__init__ has an extra parameter on 2.6+, so
327 # check Python version and decide on how to call it
328 vi = sys.version_info
329 self.address_family = address_family
330 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
331 # Unix sockets can't be bound if they already exist in the
332 # filesystem. The convention of e.g. X11 is to unlink
333 # before binding again.
334 if os.path.exists(addr):
338 logging.warning("Could not unlink socket %s", addr)
340 SSLTCPServer.__init__(self, addr, certfile, keyfile, requestHandler, bind_and_activate)
342 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
343 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
344 flags |= fcntl.FD_CLOEXEC
345 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
352 from processor import Session
355 class HttpSession(Session):
357 def __init__(self, session_id):
358 Session.__init__(self)
359 self.pending_responses = Queue.Queue()
360 self.address = session_id
363 def send_response(self, response):
364 raw_response = json.dumps(response)
365 self.pending_responses.put(response)
369 if time.time() - self.time > 60:
373 class HttpServer(threading.Thread):
374 def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile):
375 self.shared = dispatcher.shared
376 self.dispatcher = dispatcher.request_dispatcher
377 threading.Thread.__init__(self)
381 self.use_ssl = use_ssl
382 self.certfile = certfile
383 self.keyfile = keyfile
384 self.lock = threading.Lock()
388 # see http://code.google.com/p/jsonrpclib/
389 from SocketServer import ThreadingMixIn
391 class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass
392 self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile)
393 print "HTTPS server started."
395 class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass
396 self.server = StratumThreadedServer(( self.host, self.port))
397 print "HTTP server started."
399 self.server.dispatcher = self.dispatcher
400 self.server.register_function(None, 'server.stop')
401 self.server.register_function(None, 'server.info')
403 self.server.serve_forever()