X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=transports%2Fstratum_http.py;h=5a3fe1d418055fa050913c67ec16d41443419a5e;hb=f73c44e47158e4d8ad137c8f111519a472d3c798;hp=419b1ad71bb42b44303d86fd3e26b51bfd673f1d;hpb=638b04908c8a25b8e40511eb5b5b6c0fce99b99a;p=electrum-server.git diff --git a/transports/stratum_http.py b/transports/stratum_http.py index 419b1ad..5a3fe1d 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -22,11 +22,13 @@ import SimpleXMLRPCServer import SocketServer import socket import logging -import os +import os, time import types import traceback import sys, threading +from OpenSSL import SSL + try: import fcntl except ImportError: @@ -47,11 +49,7 @@ from the processor point of view: """ -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - - +from processor import random_string def get_version(request): @@ -101,6 +99,11 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): response = fault.response() return response + session = self.dispatcher.get_session_by_address(session_id) + if not session: + return 'Error: session not found' + session.time = time.time() + responses = [] if type(request) is not types.ListType: request = [ request ] @@ -111,14 +114,12 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): responses.append(result.response()) continue - session = self.sessions.get(session_id) - if session: - self.dispatcher.process(session, req_entry) - if req_entry['method'] == 'server.stop': - return json.dumps({'result':'ok'}) + self.dispatcher.do_dispatch(session, req_entry) + + if req_entry['method'] == 'server.stop': + return json.dumps({'result':'ok'}) - - r = self.poll_session(session_id) + r = self.poll_session(session) for item in r: responses.append(json.dumps(item)) @@ -132,6 +133,23 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response + def create_session(self): + session_id = random_string(10) + session = HttpSession(session_id) + self.dispatcher.add_session(session) + return session_id + + def poll_session(self, session): + q = session.pending_responses + responses = [] + while not q.empty(): + r = q.get() + responses.append(r) + #print "poll: %d responses"%len(responses) + return responses + + + class StratumJSONRPCRequestHandler( SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): @@ -169,6 +187,7 @@ class StratumJSONRPCRequestHandler( self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") + self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) @@ -194,12 +213,12 @@ class StratumJSONRPCRequestHandler( c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': - print "found cookie", c[8:] + #print "found cookie", c[8:] session_id = c[8:] if session_id is None: session_id = self.server.create_session() - print "setting cookie", session_id + #print "setting cookie", session_id response = self.server._marshaled_dispatch(session_id, data) self.send_response(200) @@ -217,6 +236,7 @@ class StratumJSONRPCRequestHandler( self.send_header("Set-Cookie", "SESSION=%s"%session_id) self.send_header("Content-type", "application/json-rpc") + self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) @@ -224,7 +244,25 @@ class StratumJSONRPCRequestHandler( self.connection.shutdown(1) -class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): + + +class SSLTCPServer(SocketServer.TCPServer): + + def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True): + SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass) + ctx = SSL.Context(SSL.SSLv3_METHOD) + ctx.use_privatekey_file(keyfile) + ctx.use_certificate_file(certfile) + self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type)) + if bind_and_activate: + self.server_bind() + self.server_activate() + + def shutdown_request(self,request): + request.shutdown() + + +class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): allow_reuse_address = True @@ -246,65 +284,102 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): os.unlink(addr) except OSError: logging.warning("Could not unlink socket %s", addr) - # if python 2.5 and lower - if vi[0] < 3 and vi[1] < 6: - SocketServer.TCPServer.__init__(self, addr, requestHandler) - else: - SocketServer.TCPServer.__init__(self, addr, requestHandler, - bind_and_activate) + + SocketServer.TCPServer.__init__(self, addr, requestHandler, bind_and_activate) + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - self.sessions = {} + +class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): + + allow_reuse_address = True + + def __init__(self, addr, certfile, keyfile, + requestHandler=StratumJSONRPCRequestHandler, + logRequests=False, encoding=None, bind_and_activate=True, + address_family=socket.AF_INET): + + self.logRequests = logRequests + StratumJSONRPCDispatcher.__init__(self, encoding) + # TCPServer.__init__ has an extra parameter on 2.6+, so + # check Python version and decide on how to call it + vi = sys.version_info + self.address_family = address_family + if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: + # Unix sockets can't be bound if they already exist in the + # filesystem. The convention of e.g. X11 is to unlink + # before binding again. + if os.path.exists(addr): + try: + os.unlink(addr) + except OSError: + logging.warning("Could not unlink socket %s", addr) + + SSLTCPServer.__init__(self, addr, certfile, keyfile, requestHandler, bind_and_activate) + + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): + flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - def create_session(self): - session_id = random_string(10) - self.sessions[session_id] = HttpSession(session_id) - return session_id - def poll_session(self,session_id): - responses = self.sessions[session_id].pending_responses[:] - self.sessions[session_id].pending_responses = [] - print "poll: %d responses"%len(responses) - return responses from processor import Session +import Queue class HttpSession(Session): def __init__(self, session_id): Session.__init__(self) - self.pending_responses = [] - print "new http session", session_id + self.pending_responses = Queue.Queue() + self.address = session_id + self.name = "HTTP" def send_response(self, response): raw_response = json.dumps(response) - self.pending_responses.append(response) + self.pending_responses.put(response) + + def stopped(self): + with self.lock: + if time.time() - self.time > 60: + self._stopped = True + return self._stopped class HttpServer(threading.Thread): - def __init__(self, dispatcher, host, port): + def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile): self.shared = dispatcher.shared self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port + self.use_ssl = use_ssl + self.certfile = certfile + self.keyfile = keyfile self.lock = threading.Lock() + def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn - class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass + if self.use_ssl: + class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass + self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile) + print "HTTPS server started." + else: + class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass + self.server = StratumThreadedServer(( self.host, self.port)) + print "HTTP server started." - self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) self.server.dispatcher = self.dispatcher self.server.register_function(None, 'server.stop') + self.server.register_function(None, 'server.info') - print "HTTP server started." self.server.serve_forever()