X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=blobdiff_plain;f=transports%2Fstratum_http.py;h=2b86869f92705deb58306ccdb46c404d68842a67;hp=b3e9a724aa9023508a517bb3001ec71339a73438;hb=4ce69b7ea24ead59ebbcc7ed335ea9762ae3724b;hpb=4ba89c7c7f2ede820ebe99dfa459cc3180ed9b72 diff --git a/transports/stratum_http.py b/transports/stratum_http.py index b3e9a72..2b86869 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -14,19 +14,31 @@ # You should have received a copy of the GNU Affero General Public # License along with this program. If not, see # . +""" +sessions are identified with cookies + - each session has a buffer of responses to requests -import jsonrpclib -from jsonrpclib import Fault -from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS + +from the processor point of view: + - the user only defines process() ; the rest is session management. thus sessions should not belong to processor + +""" +import json +import logging +import os +import Queue import SimpleXMLRPCServer -import SocketServer import socket -import logging -import os, time -import types +import SocketServer +import sys +import time +import threading import traceback -import sys, threading +import types +import jsonrpclib +from jsonrpclib import Fault +from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS from OpenSSL import SSL try: @@ -35,21 +47,9 @@ except ImportError: # For Windows fcntl = None -import json - - -""" -sessions are identified with cookies - - each session has a buffer of responses to requests - -from the processor point of view: - - the user only defines process() ; the rest is session management. thus sessions should not belong to processor - -""" - - -from processor import random_string, print_log +from processor import Session +from utils import random_string, print_log def get_version(request): @@ -59,38 +59,31 @@ def get_version(request): if 'id' in request.keys(): return 1.0 return None - + + def validate_request(request): - if type(request) is not types.DictType: - fault = Fault( - -32600, 'Request must be {}, not %s.' % type(request) - ) - return fault + if not isinstance(request, types.DictType): + return Fault(-32600, 'Request must be {}, not %s.' % type(request)) rpcid = request.get('id', None) version = get_version(request) if not version: - fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) - return fault + return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) request.setdefault('params', []) method = request.get('method', None) params = request.get('params') param_types = (types.ListType, types.DictType, types.TupleType) - if not method or type(method) not in types.StringTypes or \ - type(params) not in param_types: - fault = Fault( - -32600, 'Invalid request parameters or method.', rpcid=rpcid - ) - return fault + if not method or type(method) not in types.StringTypes or type(params) not in param_types: + return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid) return True + class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): def __init__(self, encoding=None): - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, - allow_none=True, - encoding=encoding) + # todo: use super + SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding) - def _marshaled_dispatch(self, session_id, data, dispatch_method = None): + def _marshaled_dispatch(self, session_id, data, dispatch_method=None): response = None try: request = jsonrpclib.loads(data) @@ -105,8 +98,8 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): session.time = time.time() responses = [] - if type(request) is not types.ListType: - request = [ request ] + if not isinstance(request, types.ListType): + request = [request] for req_entry in request: result = validate_request(req_entry) @@ -115,14 +108,14 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): continue self.dispatcher.do_dispatch(session, req_entry) - + if req_entry['method'] == 'server.stop': - return json.dumps({'result':'ok'}) + return json.dumps({'result': 'ok'}) r = self.poll_session(session) for item in r: responses.append(json.dumps(item)) - + if len(responses) > 1: response = '[%s]' % ','.join(responses) elif len(responses) == 1: @@ -132,11 +125,9 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return response - def create_session(self): - session_id = random_string(10) - session = HttpSession(session_id) - self.dispatcher.add_session(session) + session_id = random_string(20) + session = HttpSession(self.dispatcher, session_id) return session_id def poll_session(self, session): @@ -149,19 +140,16 @@ class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): return responses +class StratumJSONRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - -class StratumJSONRPCRequestHandler( - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - def do_OPTIONS(self): self.send_response(200) self.send_header('Allow', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Origin', '*') - self.send_header('Access-Control-Allow-Headers', '*') + self.send_header('Access-Control-Allow-Headers', 'Cache-Control, Content-Language, Content-Type, Expires, Last-Modified, Pragma, Accept-Language, Accept, Origin') self.send_header('Content-Length', '0') self.end_headers() - + def do_GET(self): if not self.is_rpc_path_valid(): self.report_404() @@ -170,7 +158,7 @@ class StratumJSONRPCRequestHandler( session_id = None c = self.headers.get('cookie') if c: - if c[0:8]=='SESSION=': + if c[0:8] == 'SESSION=': #print "found cookie", c[8:] session_id = c[8:] @@ -188,11 +176,11 @@ class StratumJSONRPCRequestHandler( fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string - if response == None: + if response is None: response = '' if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) + self.send_header("Set-Cookie", "SESSION=%s" % session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") @@ -202,7 +190,6 @@ class StratumJSONRPCRequestHandler( self.wfile.flush() self.shutdown_connection() - def do_POST(self): if not self.is_rpc_path_valid(): self.report_404() @@ -220,7 +207,7 @@ class StratumJSONRPCRequestHandler( session_id = None c = self.headers.get('cookie') if c: - if c[0:8]=='SESSION=': + if c[0:8] == 'SESSION=': #print "found cookie", c[8:] session_id = c[8:] @@ -237,11 +224,11 @@ class StratumJSONRPCRequestHandler( fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) response = fault.response() print "500", trace_string - if response == None: + if response is None: response = '' if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) + self.send_header("Set-Cookie", "SESSION=%s" % session_id) self.send_header("Content-type", "application/json-rpc") self.send_header("Access-Control-Allow-Origin", "*") @@ -276,7 +263,7 @@ class SSLTCPServer(SocketServer.TCPServer): self.server_bind() self.server_activate() - def shutdown_request(self,request): + def shutdown_request(self, request): #request.shutdown() pass @@ -298,7 +285,7 @@ class StratumHTTPServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. - if os.path.exists(addr): + if os.path.exists(addr): try: os.unlink(addr) except OSError: @@ -331,7 +318,7 @@ class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): # Unix sockets can't be bound if they already exist in the # filesystem. The convention of e.g. X11 is to unlink # before binding again. - if os.path.exists(addr): + if os.path.exists(addr): try: os.unlink(addr) except OSError: @@ -345,30 +332,21 @@ class StratumHTTPSSLServer(SSLTCPServer, StratumJSONRPCDispatcher): fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - - - - -from processor import Session -import Queue - class HttpSession(Session): - def __init__(self, session_id): - Session.__init__(self) + def __init__(self, dispatcher, session_id): + Session.__init__(self, dispatcher) self.pending_responses = Queue.Queue() self.address = session_id self.name = "HTTP" + self.timeout = 60 + self.dispatcher.add_session(self) def send_response(self, response): raw_response = json.dumps(response) self.pending_responses.put(response) - def stopped(self): - with self.lock: - if time.time() - self.time > 60: - self._stopped = True - return self._stopped + class HttpServer(threading.Thread): def __init__(self, dispatcher, host, port, use_ssl, certfile, keyfile): @@ -383,22 +361,22 @@ class HttpServer(threading.Thread): self.keyfile = keyfile self.lock = threading.Lock() - def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn if self.use_ssl: - class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): pass - self.server = StratumThreadedServer(( self.host, self.port), self.certfile, self.keyfile) - print_log( "HTTPS server started.") + class StratumThreadedServer(ThreadingMixIn, StratumHTTPSSLServer): + pass + self.server = StratumThreadedServer((self.host, self.port), self.certfile, self.keyfile) + print_log("HTTPS server started.") else: - class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): pass - self.server = StratumThreadedServer(( self.host, self.port)) - print_log( "HTTP server started.") + class StratumThreadedServer(ThreadingMixIn, StratumHTTPServer): + pass + self.server = StratumThreadedServer((self.host, self.port)) + print_log("HTTP server started.") self.server.dispatcher = self.dispatcher self.server.register_function(None, 'server.stop') self.server.register_function(None, 'server.info') self.server.serve_forever() -