2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
40 sessions are identified with cookies
41 - each session has a buffer of responses to requests
44 from the processor point of view:
45 - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
50 from processor import random_string
53 def get_version(request):
55 if 'jsonrpc' in request.keys():
57 if 'id' in request.keys():
61 def validate_request(request):
62 if type(request) is not types.DictType:
64 -32600, 'Request must be {}, not %s.' % type(request)
67 rpcid = request.get('id', None)
68 version = get_version(request)
70 fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
72 request.setdefault('params', [])
73 method = request.get('method', None)
74 params = request.get('params')
75 param_types = (types.ListType, types.DictType, types.TupleType)
76 if not method or type(method) not in types.StringTypes or \
77 type(params) not in param_types:
79 -32600, 'Invalid request parameters or method.', rpcid=rpcid
84 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
86 def __init__(self, encoding=None):
87 SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
91 def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
94 request = jsonrpclib.loads(data)
96 fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
97 response = fault.response()
100 session = self.dispatcher.get_session_by_address(session_id)
102 return 'Error: session not found'
103 session.time = time.time()
106 if type(request) is not types.ListType:
107 request = [ request ]
109 for req_entry in request:
110 result = validate_request(req_entry)
111 if type(result) is Fault:
112 responses.append(result.response())
115 self.dispatcher.process(session, req_entry)
117 if req_entry['method'] == 'server.stop':
118 return json.dumps({'result':'ok'})
120 r = self.poll_session(session)
122 responses.append(json.dumps(item))
124 if len(responses) > 1:
125 response = '[%s]' % ','.join(responses)
126 elif len(responses) == 1:
127 response = responses[0]
135 class StratumJSONRPCRequestHandler(
136 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
139 if not self.is_rpc_path_valid():
144 c = self.headers.get('cookie')
146 if c[0:8]=='SESSION=':
147 #print "found cookie", c[8:]
150 if session_id is None:
151 session_id = self.server.create_session()
152 #print "setting cookie", session_id
154 data = json.dumps([])
155 response = self.server._marshaled_dispatch(session_id, data)
156 self.send_response(200)
158 self.send_response(500)
159 err_lines = traceback.format_exc().splitlines()
160 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
161 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
162 response = fault.response()
163 print "500", trace_string
168 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
170 self.send_header("Content-type", "application/json-rpc")
171 self.send_header("Access-Control-Allow-Origin", "*")
172 self.send_header("Content-length", str(len(response)))
174 self.wfile.write(response)
176 self.connection.shutdown(1)
180 if not self.is_rpc_path_valid():
184 max_chunk_size = 10*1024*1024
185 size_remaining = int(self.headers["content-length"])
187 while size_remaining:
188 chunk_size = min(size_remaining, max_chunk_size)
189 L.append(self.rfile.read(chunk_size))
190 size_remaining -= len(L[-1])
194 c = self.headers.get('cookie')
196 if c[0:8]=='SESSION=':
197 #print "found cookie", c[8:]
200 if session_id is None:
201 session_id = self.server.create_session()
202 #print "setting cookie", session_id
204 response = self.server._marshaled_dispatch(session_id, data)
205 self.send_response(200)
207 self.send_response(500)
208 err_lines = traceback.format_exc().splitlines()
209 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
210 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
211 response = fault.response()
212 print "500", trace_string
217 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
219 self.send_header("Content-type", "application/json-rpc")
220 self.send_header("Access-Control-Allow-Origin", "*")
221 self.send_header("Content-length", str(len(response)))
223 self.wfile.write(response)
225 self.connection.shutdown(1)
228 class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
230 allow_reuse_address = True
232 def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
233 logRequests=False, encoding=None, bind_and_activate=True,
234 address_family=socket.AF_INET):
235 self.logRequests = logRequests
236 StratumJSONRPCDispatcher.__init__(self, encoding)
237 # TCPServer.__init__ has an extra parameter on 2.6+, so
238 # check Python version and decide on how to call it
239 vi = sys.version_info
240 self.address_family = address_family
241 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
242 # Unix sockets can't be bound if they already exist in the
243 # filesystem. The convention of e.g. X11 is to unlink
244 # before binding again.
245 if os.path.exists(addr):
249 logging.warning("Could not unlink socket %s", addr)
250 # if python 2.5 and lower
251 if vi[0] < 3 and vi[1] < 6:
252 SocketServer.TCPServer.__init__(self, addr, requestHandler)
254 SocketServer.TCPServer.__init__(self, addr, requestHandler,
256 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
257 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
258 flags |= fcntl.FD_CLOEXEC
259 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
263 def create_session(self):
264 session_id = random_string(10)
265 session = HttpSession(session_id)
266 self.dispatcher.add_session(session)
269 def poll_session(self, session):
270 q = session.pending_responses
275 #print "poll: %d responses"%len(responses)
279 from processor import Session
282 class HttpSession(Session):
284 def __init__(self, session_id):
285 Session.__init__(self)
286 self.pending_responses = Queue.Queue()
287 self.address = session_id
288 self.name = "HTTP session"
290 def send_response(self, response):
291 raw_response = json.dumps(response)
292 self.pending_responses.put(response)
296 if time.time() - self.time > 60:
300 class HttpServer(threading.Thread):
301 def __init__(self, dispatcher, host, port):
302 self.shared = dispatcher.shared
303 self.dispatcher = dispatcher.request_dispatcher
304 threading.Thread.__init__(self)
308 self.lock = threading.Lock()
311 # see http://code.google.com/p/jsonrpclib/
312 from SocketServer import ThreadingMixIn
313 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
315 self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
316 self.server.dispatcher = self.dispatcher
317 self.server.register_function(None, 'server.stop')
318 self.server.register_function(None, 'server.info')
320 print "HTTP server started."
321 self.server.serve_forever()