2 # Copyright(C) 2012 thomasv@gitorious
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public
15 # License along with this program. If not, see
16 # <http://www.gnu.org/licenses/agpl.html>.
19 from jsonrpclib import Fault
20 from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
21 import SimpleXMLRPCServer
40 sessions are identified with cookies
41 - each session has a buffer of responses to requests
44 from the processor point of view:
45 - the user only defines process() ; the rest is session management. thus sessions should not belong to processor
52 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
57 def get_version(request):
59 if 'jsonrpc' in request.keys():
61 if 'id' in request.keys():
65 def validate_request(request):
66 if type(request) is not types.DictType:
68 -32600, 'Request must be {}, not %s.' % type(request)
71 rpcid = request.get('id', None)
72 version = get_version(request)
74 fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
76 request.setdefault('params', [])
77 method = request.get('method', None)
78 params = request.get('params')
79 param_types = (types.ListType, types.DictType, types.TupleType)
80 if not method or type(method) not in types.StringTypes or \
81 type(params) not in param_types:
83 -32600, 'Invalid request parameters or method.', rpcid=rpcid
88 class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
90 def __init__(self, encoding=None):
91 SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
95 def _marshaled_dispatch(self, session_id, data, dispatch_method = None):
98 request = jsonrpclib.loads(data)
100 fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
101 response = fault.response()
105 if type(request) is not types.ListType:
106 request = [ request ]
108 for req_entry in request:
109 result = validate_request(req_entry)
110 if type(result) is Fault:
111 responses.append(result.response())
113 resp_entry = self._marshaled_single_dispatch(session_id, req_entry)
114 if resp_entry is not None:
115 responses.append(resp_entry)
117 r = self.poll_session(session_id)
119 responses.append(json.dumps(item))
121 if len(responses) > 1:
122 response = '[%s]' % ','.join(responses)
123 elif len(responses) == 1:
124 response = responses[0]
130 def _marshaled_single_dispatch(self, session_id, request):
131 # TODO - Use the multiprocessing and skip the response if
132 # it is a notification
133 # Put in support for custom dispatcher here
134 # (See SimpleXMLRPCServer._marshaled_dispatch)
135 method = request.get('method')
136 params = request.get('params')
138 response = self._dispatch(method, session_id, request)
140 exc_type, exc_value, exc_tb = sys.exc_info()
141 fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
142 return fault.response()
143 if 'id' not in request.keys() or request['id'] == None:
144 # It's a notification
148 response = jsonrpclib.dumps(response,
154 exc_type, exc_value, exc_tb = sys.exc_info()
155 fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
156 return fault.response()
158 def _dispatch(self, method, session_id, request):
161 func = self.funcs[method]
163 if self.instance is not None:
164 if hasattr(self.instance, '_dispatch'):
165 return self.instance._dispatch(method, params)
168 func = SimpleXMLRPCServer.resolve_dotted_attribute(
173 except AttributeError:
177 response = func(session_id, request)
180 return Fault(-32602, 'Invalid parameters.')
182 err_lines = traceback.format_exc().splitlines()
183 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
184 fault = jsonrpclib.Fault(-32603, 'Server error: %s' %
188 return Fault(-32601, 'Method %s not supported.' % method)
190 class StratumJSONRPCRequestHandler(
191 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
194 if not self.is_rpc_path_valid():
199 c = self.headers.get('cookie')
201 if c[0:8]=='SESSION=':
202 #print "found cookie", c[8:]
205 if session_id is None:
206 session_id = self.server.create_session()
207 #print "setting cookie", session_id
209 data = json.dumps([])
210 response = self.server._marshaled_dispatch(session_id, data)
211 self.send_response(200)
213 self.send_response(500)
214 err_lines = traceback.format_exc().splitlines()
215 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
216 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
217 response = fault.response()
218 print "500", trace_string
223 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
225 self.send_header("Content-type", "application/json-rpc")
226 self.send_header("Content-length", str(len(response)))
228 self.wfile.write(response)
230 self.connection.shutdown(1)
234 if not self.is_rpc_path_valid():
238 max_chunk_size = 10*1024*1024
239 size_remaining = int(self.headers["content-length"])
241 while size_remaining:
242 chunk_size = min(size_remaining, max_chunk_size)
243 L.append(self.rfile.read(chunk_size))
244 size_remaining -= len(L[-1])
248 c = self.headers.get('cookie')
250 if c[0:8]=='SESSION=':
251 print "found cookie", c[8:]
254 if session_id is None:
255 session_id = self.server.create_session()
256 print "setting cookie", session_id
258 response = self.server._marshaled_dispatch(session_id, data)
259 self.send_response(200)
261 self.send_response(500)
262 err_lines = traceback.format_exc().splitlines()
263 trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
264 fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
265 response = fault.response()
266 print "500", trace_string
271 self.send_header("Set-Cookie", "SESSION=%s"%session_id)
273 self.send_header("Content-type", "application/json-rpc")
274 self.send_header("Content-length", str(len(response)))
276 self.wfile.write(response)
278 self.connection.shutdown(1)
281 class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
283 allow_reuse_address = True
285 def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
286 logRequests=False, encoding=None, bind_and_activate=True,
287 address_family=socket.AF_INET):
288 self.logRequests = logRequests
289 StratumJSONRPCDispatcher.__init__(self, encoding)
290 # TCPServer.__init__ has an extra parameter on 2.6+, so
291 # check Python version and decide on how to call it
292 vi = sys.version_info
293 self.address_family = address_family
294 if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
295 # Unix sockets can't be bound if they already exist in the
296 # filesystem. The convention of e.g. X11 is to unlink
297 # before binding again.
298 if os.path.exists(addr):
302 logging.warning("Could not unlink socket %s", addr)
303 # if python 2.5 and lower
304 if vi[0] < 3 and vi[1] < 6:
305 SocketServer.TCPServer.__init__(self, addr, requestHandler)
307 SocketServer.TCPServer.__init__(self, addr, requestHandler,
309 if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
310 flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
311 flags |= fcntl.FD_CLOEXEC
312 fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
318 def create_session(self):
319 session_id = random_string(10)
320 self.sessions[session_id] = HttpSession(session_id)
323 def poll_session(self,session_id):
324 responses = self.sessions[session_id].pending_responses[:]
325 self.sessions[session_id].pending_responses = []
326 print "poll: %d responses"%len(responses)
330 from processor import Session
332 class HttpSession(Session):
334 def __init__(self, session_id):
335 Session.__init__(self)
336 self.pending_responses = []
337 print "new http session", session_id
339 def send_response(self, response):
340 raw_response = json.dumps(response)
341 self.pending_responses.append(response)
343 class HttpServer(threading.Thread):
344 def __init__(self, shared, _processor, host, port):
346 self.processor = _processor
347 threading.Thread.__init__(self)
351 self.lock = threading.Lock()
354 # see http://code.google.com/p/jsonrpclib/
355 from SocketServer import ThreadingMixIn
356 class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
357 self.server = StratumThreadedJSONRPCServer(( self.host, self.port))
358 for s in ['server.peers', 'server.banner', 'transaction.broadcast', \
359 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']:
360 self.server.register_function(self.process, s)
362 self.server.register_function(self.do_stop, 'stop')
364 print "HTTP server started."
365 self.server.serve_forever()
368 def process(self, session_id, request):
369 #print session, request
370 session = self.server.sessions.get(session_id)
372 #print "zz",session_id,session
373 request['id'] = self.processor.store_session_id(session, request['id'])
374 self.processor.process(request)
376 def do_stop(self, session, request):