#!/usr/bin/env python # Copyright(C) 2012 thomasv@gitorious # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public # License along with this program. If not, see # . import jsonrpclib from jsonrpclib import Fault from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS import SimpleXMLRPCServer import SocketServer import socket import logging import os, time import types import traceback import sys, threading from OpenSSL import SSL try: import fcntl except ImportError: # For Windows fcntl = None import json """ sessions are identified with cookies - each session has a buffer of responses to requests from the processor point of view: - the user only defines process() ; the rest is session management. thus sessions should not belong to processor """ from processor import random_string, print_log def get_version(request): # must be a dict if 'jsonrpc' in request.keys(): return 2.0 if 'id' in request.keys(): return 1.0 return None def validate_request(request): if type(request) is not types.DictType: fault = Fault( -32600, 'Request must be {}, not %s.' % type(request) ) return fault rpcid = request.get('id', None) version = get_version(request) if not version: fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) return fault request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) if not method or type(method) not in types.StringTypes or \ type(params) not in param_types: fault = Fault( -32600, 'Invalid request parameters or method.', rpcid=rpcid ) return fault return True class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): def __init__(self, encoding=None): SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding) def _marshaled_dispatch(self, session_id, data, dispatch_method = None): response = None try: request = jsonrpclib.loads(data) except Exception, e: fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e)) response = fault.response() return response session = self.dispatcher.get_session_by_address(session_id) if not session: return 'Error: session not found' session.time = time.time() responses = [] if type(request) is not types.ListType: request = [ request ] for req_entry in request: result = validate_request(req_entry) if type(result) is Fault: responses.append(result.response()) continue self.dispatcher.do_dispatch(session, req_entry) if req_entry['method'] == 'server.stop': return json.dumps({'result':'ok'}) r = self.poll_session(session) for item in r: responses.append(json.dumps(item)) if len(responses) > 1: response = '[%s]' % ','.join(responses) elif len(responses) == 1: response = responses[0] else: response = '' return response def create_session(self): session_id = random_string(10) session = HttpSession(session_id) self.dispatcher.add_session(session) return session_id def poll_session(self, session): q = session.pending_responses responses = [] while not q.empty(): r = q.get() responses.append(r) #print "poll: %d responses"%len(responses) return responses class StratumJSONRPCRequestHandler( SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): def do_OPTIONS(self): self.send_response(200) self.send_header('Allow', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Headers', '*') self.send_header('Content-Length', '0') self.end_headers() def do_GET(self): if not self.is_rpc_path_valid(): self.report_404() return try: session_id = None c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': #print "found cookie", c[8:] session_id = c[8:] if session_id is None: session_id = self.server.create_session() #print "setting cookie", session_id data = json.dumps([]) response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) except Exception, e: self.send_response(500) err_lines = traceback.format_exc().splitlines() trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string if response == None: response = '' if session_id: self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) self.wfile.flush() self.shutdown_connection() def do_POST(self): if not self.is_rpc_path_valid(): self.report_404() return try: max_chunk_size = 10*1024*1024 size_remaining = int(self.headers["content-length"]) L = [] while size_remaining: chunk_size = min(size_remaining, max_chunk_size) L.append(self.rfile.read(chunk_size)) size_remaining -= len(L[-1]) data = ''.join(L) session_id = None c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': #print "found cookie", c[8:] session_id = c[8:] if session_id is None: session_id = self.server.create_session() #print "setting cookie", session_id response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) except Exception, e: self.send_response(500) err_lines = traceback.format_exc().splitlines() trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string if response == None: response = '' if session_id: self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) self.wfile.flush() self.shutdown_connection() def shutdown_connection(self): self.connection.shutdown(1) class SSLRequestHandler(StratumJSONRPCRequestHandler): def setup(self): self.connection = self.request self.rfile = socket._fileobject(self.request, "rb", self.rbufsize) self.wfile = socket._fileobject(self.request, "wb", self.wbufsize) def shutdown_connection(self): self.connection.shutdown() class SSLTCPServer(SocketServer.TCPServer): def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True): SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass) ctx = SSL.Context(SSL.SSLv3_METHOD) ctx.use_privatekey_file(keyfile) ctx.use_certificate_file(certfile) self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) if bind_and_activate: self.server_bind() self.server_activate() def shutdown_request(self,request): #request.shutdown() pass class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): allow_reuse_address = True def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler, logRequests=False, encoding=None, bind_and_activate=True, address_family=socket.AF_INET): self.logRequests = logRequests StratumJSONRPCDispatcher.__init__(self, encoding) # TCPServer.__init__ has an extra parameter on 2.6+, so # check Python version and decide on how to call it vi = sys.version_info self.address_family = address_family if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. if os.path.exists(addr): try: os.unlink(addr) except OSError: logging.warning("Could not unlink socket %s", addr) SocketServer.TCPServer.__init__(self, addr, requestHandler, bind_and_activate) if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): allow_reuse_address = True def __init__(self, addr, certfile, keyfile, requestHandler=SSLRequestHandler, logRequests=False, encoding=None, bind_and_activate=True, address_family=socket.AF_INET): self.logRequests = logRequests StratumJSONRPCDispatcher.__init__(self, encoding) # TCPServer.__init__ has an extra parameter on 2.6+, so # check Python version and decide on how to call it vi = sys.version_info self.address_family = address_family if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. if os.path.exists(addr): try: os.unlink(addr) except OSError: logging.warning("Could not unlink socket %s", addr) SSLTCPServer.__init__(self, addr, certfile, keyfile, requestHandler, bind_and_activate) if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) from processor import Session import Queue class HttpSession(Session): def __init__(self, session_id): Session.__init__(self) self.pending_responses = Queue.Queue() self.address = session_id self.name = "HTTP" def send_response(self, response): raw_response = json.dumps(response) self.pending_responses.put(response) def stopped(self): with self.lock: if time.time() - self.time > 60: self._stopped = True return self._stopped class HttpServer(threading.Thread): def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile): self.shared = dispatcher.shared self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port self.use_ssl = use_ssl self.certfile = certfile self.keyfile = keyfile self.lock = threading.Lock() def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn if self.use_ssl: class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile) print_log( "HTTPS server started.") else: class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass self.server = StratumThreadedServer(( self.host, self.port)) print_log( "HTTP server started.") self.server.dispatcher = self.dispatcher self.server.register_function(None, 'server.stop') self.server.register_function(None, 'server.info') self.server.serve_forever()