From 8c0a7d9c52070a81870bbc8c2043612e72ac3aa8 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 29 Mar 2012 00:46:14 +0400 Subject: [PATCH] directory for transports --- native.py | 197 ----------------------- server.py | 8 +- stratum.py | 131 --------------- stratum_http.py | 381 -------------------------------------------- transports/native.py | 197 +++++++++++++++++++++++ transports/stratum_http.py | 381 ++++++++++++++++++++++++++++++++++++++++++++ transports/stratum_tcp.py | 131 +++++++++++++++ 7 files changed, 714 insertions(+), 712 deletions(-) delete mode 100644 native.py delete mode 100644 stratum.py delete mode 100644 stratum_http.py create mode 100644 transports/__init__.py create mode 100644 transports/native.py create mode 100644 transports/stratum_http.py create mode 100644 transports/stratum_tcp.py diff --git a/native.py b/native.py deleted file mode 100644 index 1b6a6ec..0000000 --- a/native.py +++ /dev/null @@ -1,197 +0,0 @@ -import thread, threading, time, socket, traceback, ast - - - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - -def timestr(): - return time.strftime("[%d/%m/%Y-%H:%M:%S]") - - -class NativeServer(threading.Thread): - - def __init__(self, shared, store, irc, banner, host, port): - threading.Thread.__init__(self) - self.banner = banner - self.store = store - self.irc = irc - self.sessions = {} - self.host = host - self.port = port - self.daemon = True - self.shared = shared - - - def modified_addresses(self,a_session): - import copy - session = copy.deepcopy(a_session) - addresses = session['addresses'] - session['last_time'] = time.time() - ret = {} - k = 0 - for addr in addresses: - status = self.store.get_status( addr ) - msg_id, last_status = addresses.get( addr ) - if last_status != status: - addresses[addr] = msg_id, status - ret[addr] = status - - return ret, addresses - - - def poll_session(self, session_id): - session = self.sessions.get(session_id) - if session is None: - print time.asctime(), "session not found", session_id - return -1, {} - else: - self.sessions[session_id]['last_time'] = time.time() - ret, addresses = self.modified_addresses(session) - if ret: self.sessions[session_id]['addresses'] = addresses - return repr( (self.store.block_number,ret)) - - - def add_address_to_session(self, session_id, address): - status = self.store.get_status(address) - self.sessions[session_id]['addresses'][address] = ("", status) - self.sessions[session_id]['last_time'] = time.time() - return status - - - def new_session(self, version, addresses): - session_id = random_string(10) - self.sessions[session_id] = { 'addresses':{}, 'version':version } - for a in addresses: - self.sessions[session_id]['addresses'][a] = ('','') - out = repr( (session_id, self.banner.replace('\\n','\n') ) ) - self.sessions[session_id]['last_time'] = time.time() - return out - - - def update_session(self, session_id,addresses): - """deprecated in 0.42, wad replaced by add_address_to_session""" - self.sessions[session_id]['addresses'] = {} - for a in addresses: - self.sessions[session_id]['addresses'][a] = '' - self.sessions[session_id]['last_time'] = time.time() - return 'ok' - - - - def native_client_thread(self, ipaddr,conn): - try: - ipaddr = ipaddr[0] - msg = '' - while 1: - d = conn.recv(1024) - msg += d - if not d: - break - if '#' in msg: - msg = msg.split('#', 1)[0] - break - try: - cmd, data = ast.literal_eval(msg) - except: - print "syntax error", repr(msg), ipaddr - conn.close() - return - - out = self.do_command(cmd, data, ipaddr) - if out: - #print ipaddr, cmd, len(out) - try: - conn.send(out) - except: - print "error, could not send" - - finally: - conn.close() - - - - def do_command(self, cmd, data, ipaddr): - - if cmd=='b': - out = "%d"%block_number - - elif cmd in ['session','new_session']: - try: - if cmd == 'session': - addresses = ast.literal_eval(data) - version = "old" - else: - version, addresses = ast.literal_eval(data) - if version[0]=="0": version = "v" + version - except: - print "error", data - return None - print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version - out = self.new_session(version, addresses) - - elif cmd=='address.subscribe': - try: - session_id, addr = ast.literal_eval(data) - except: - traceback.print_exc(file=sys.stdout) - print data - return None - out = self.add_address_to_session(session_id,addr) - - elif cmd=='update_session': - try: - session_id, addresses = ast.literal_eval(data) - except: - traceback.print_exc(file=sys.stdout) - return None - print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses) - out = self.update_session(session_id,addresses) - - elif cmd=='poll': - out = self.poll_session(data) - - elif cmd == 'h': - address = data - out = repr( self.store.get_history( address ) ) - - elif cmd =='tx': - out = self.store.send_tx(data) - print timestr(), "sent tx:", ipaddr, out - - elif cmd == 'peers': - out = repr(self.irc.get_peers()) - - else: - out = None - - return out - - - def clean_session_thread(self): - while not self.shared.stopped(): - time.sleep(30) - t = time.time() - for k,s in self.sessions.items(): - if s.get('type') == 'persistent': continue - t0 = s['last_time'] - if t - t0 > 5*60: - self.sessions.pop(k) - print "lost session", k - - def run(self): - thread.start_new_thread(self.clean_session_thread, ()) - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((self.host, self.port)) - s.listen(1) - while not self.shared.stopped(): - conn, addr = s.accept() - try: - thread.start_new_thread(self.native_client_thread, (addr, conn,)) - except: - # can't start new thread if there is no memory.. - traceback.print_exc(file=sys.stdout) - diff --git a/server.py b/server.py index 97a9773..9ee8dc8 100755 --- a/server.py +++ b/server.py @@ -53,9 +53,11 @@ password = config.get('server','password') from processor import Shared, Processor, Dispatcher -from stratum_http import HttpServer -from stratum import TcpServer -from native import NativeServer + + +from transports.stratum_http import HttpServer +from transports.stratum_tcp import TcpServer +from transports.native import NativeServer import irc diff --git a/stratum.py b/stratum.py deleted file mode 100644 index ce39aca..0000000 --- a/stratum.py +++ /dev/null @@ -1,131 +0,0 @@ -import json -import socket -import threading -import time -import Queue as queue - -from processor import Session, Dispatcher, Shared - -class TcpSession(Session): - - def __init__(self, connection, address): - self._connection = connection - self.address = address - Session.__init__(self) - print "New session", address - - def connection(self): - if self.stopped(): - raise Exception("Session was stopped") - else: - return self._connection - - def stop(self): - self._connection.close() - print "Terminating connection:", self.address[0] - with self.lock: - self._stopped = True - - def send_response(self, response): - raw_response = json.dumps(response) - # Possible race condition here by having session - # close connection? - # I assume Python connections are thread safe interfaces - try: - connection = self.connection() - connection.send(raw_response + "\n") - except: - self.stop() - - - -class TcpClientRequestor(threading.Thread): - - def __init__(self, shared, processor, session): - self.shared = shared - self.processor = processor - self.message = "" - self.session = session - threading.Thread.__init__(self) - - def run(self): - while not self.shared.stopped(): - if not self.update(): - break - - while self.parse(): - pass - - def update(self): - data = self.receive() - if not data: - # close_session - self.session.stop() - return False - - self.message += data - return True - - def receive(self): - try: - return self.session.connection().recv(1024) - except: - return '' - - def parse(self): - raw_buffer = self.message.find('\n') - if raw_buffer == -1: - return False - - raw_command = self.message[0:raw_buffer].strip() - self.message = self.message[raw_buffer + 1:] - if raw_command == 'quit': - self.session.stop() - return False - - try: - command = json.loads(raw_command) - except: - self.processor.push_response({"error": "bad JSON", "request": raw_command}) - return True - - try: - # Try to load vital fields, and return an error if - # unsuccessful. - message_id = command['id'] - method = command['method'] - except KeyError: - # Return an error JSON in response. - self.processor.push_response({"error": "syntax error", "request": raw_command}) - else: - self.processor.push_request(self.session,command) - - return True - -class TcpServer(threading.Thread): - - def __init__(self, shared, processor, host, port): - self.shared = shared - self.processor = processor - threading.Thread.__init__(self) - self.daemon = True - self.host = host - self.port = port - self.lock = threading.Lock() - - def run(self): - print "TCP server started." - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((self.host, self.port)) - sock.listen(1) - while not self.shared.stopped(): - session = TcpSession(*sock.accept()) - client_req = TcpClientRequestor(self.shared, self.processor, session) - client_req.start() - self.processor.add_session(session) - self.processor.collect_garbage() - - - - diff --git a/stratum_http.py b/stratum_http.py deleted file mode 100644 index 499628b..0000000 --- a/stratum_http.py +++ /dev/null @@ -1,381 +0,0 @@ -#!/usr/bin/env python -# Copyright(C) 2012 thomasv@gitorious - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public -# License along with this program. If not, see -# . - -import jsonrpclib -from jsonrpclib import Fault -from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS -import SimpleXMLRPCServer -import SocketServer -import socket -import logging -import os -import types -import traceback -import sys, threading - -try: - import fcntl -except ImportError: - # For Windows - fcntl = None - -import json - - -""" -sessions are identified with cookies - - each session has a buffer of responses to requests - - -from the processor point of view: - - the user only defines process() ; the rest is session management. thus sessions should not belong to processor - -""" - - -def random_string(N): - import random, string - return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - - - - -def get_version(request): - # must be a dict - if 'jsonrpc' in request.keys(): - return 2.0 - if 'id' in request.keys(): - return 1.0 - return None - -def validate_request(request): - if type(request) is not types.DictType: - fault = Fault( - -32600, 'Request must be {}, not %s.' % type(request) - ) - return fault - rpcid = request.get('id', None) - version = get_version(request) - if not version: - fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) - return fault - request.setdefault('params', []) - method = request.get('method', None) - params = request.get('params') - param_types = (types.ListType, types.DictType, types.TupleType) - if not method or type(method) not in types.StringTypes or \ - type(params) not in param_types: - fault = Fault( - -32600, 'Invalid request parameters or method.', rpcid=rpcid - ) - return fault - return True - -class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): - - def __init__(self, encoding=None): - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, - allow_none=True, - encoding=encoding) - - def _marshaled_dispatch(self, session_id, data, dispatch_method = None): - response = None - try: - request = jsonrpclib.loads(data) - except Exception, e: - fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e)) - response = fault.response() - return response - - responses = [] - if type(request) is not types.ListType: - request = [ request ] - - for req_entry in request: - result = validate_request(req_entry) - if type(result) is Fault: - responses.append(result.response()) - continue - resp_entry = self._marshaled_single_dispatch(session_id, req_entry) - if resp_entry is not None: - responses.append(resp_entry) - - r = self.poll_session(session_id) - for item in r: - responses.append(json.dumps(item)) - - if len(responses) > 1: - response = '[%s]' % ','.join(responses) - elif len(responses) == 1: - response = responses[0] - else: - response = '' - - return response - - def _marshaled_single_dispatch(self, session_id, request): - # TODO - Use the multiprocessing and skip the response if - # it is a notification - # Put in support for custom dispatcher here - # (See SimpleXMLRPCServer._marshaled_dispatch) - method = request.get('method') - params = request.get('params') - try: - response = self._dispatch(method, session_id, request) - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - if 'id' not in request.keys() or request['id'] == None: - # It's a notification - return None - - try: - response = jsonrpclib.dumps(response, - methodresponse=True, - rpcid=request['id'] - ) - return response - except: - exc_type, exc_value, exc_tb = sys.exc_info() - fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) - return fault.response() - - def _dispatch(self, method, session_id, request): - func = None - try: - func = self.funcs[method] - except KeyError: - if self.instance is not None: - if hasattr(self.instance, '_dispatch'): - return self.instance._dispatch(method, params) - else: - try: - func = SimpleXMLRPCServer.resolve_dotted_attribute( - self.instance, - method, - True - ) - except AttributeError: - pass - if func is not None: - try: - response = func(session_id, request) - return response - except TypeError: - return Fault(-32602, 'Invalid parameters.') - except: - err_lines = traceback.format_exc().splitlines() - trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) - fault = jsonrpclib.Fault(-32603, 'Server error: %s' % - trace_string) - return fault - else: - return Fault(-32601, 'Method %s not supported.' % method) - -class StratumJSONRPCRequestHandler( - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - - def do_GET(self): - if not self.is_rpc_path_valid(): - self.report_404() - return - try: - session_id = None - c = self.headers.get('cookie') - if c: - if c[0:8]=='SESSION=': - #print "found cookie", c[8:] - session_id = c[8:] - - if session_id is None: - session_id = self.server.create_session() - #print "setting cookie", session_id - - data = json.dumps([]) - response = self.server._marshaled_dispatch(session_id, data) - self.send_response(200) - except Exception, e: - self.send_response(500) - err_lines = traceback.format_exc().splitlines() - trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) - fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) - response = fault.response() - print "500", trace_string - if response == None: - response = '' - - if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) - - self.send_header("Content-type", "application/json-rpc") - self.send_header("Content-length", str(len(response))) - self.end_headers() - self.wfile.write(response) - self.wfile.flush() - self.connection.shutdown(1) - - - def do_POST(self): - if not self.is_rpc_path_valid(): - self.report_404() - return - try: - max_chunk_size = 10*1024*1024 - size_remaining = int(self.headers["content-length"]) - L = [] - while size_remaining: - chunk_size = min(size_remaining, max_chunk_size) - L.append(self.rfile.read(chunk_size)) - size_remaining -= len(L[-1]) - data = ''.join(L) - - session_id = None - c = self.headers.get('cookie') - if c: - if c[0:8]=='SESSION=': - print "found cookie", c[8:] - session_id = c[8:] - - if session_id is None: - session_id = self.server.create_session() - print "setting cookie", session_id - - response = self.server._marshaled_dispatch(session_id, data) - self.send_response(200) - except Exception, e: - self.send_response(500) - err_lines = traceback.format_exc().splitlines() - trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) - fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) - response = fault.response() - print "500", trace_string - if response == None: - response = '' - - if session_id: - self.send_header("Set-Cookie", "SESSION=%s"%session_id) - - self.send_header("Content-type", "application/json-rpc") - self.send_header("Content-length", str(len(response))) - self.end_headers() - self.wfile.write(response) - self.wfile.flush() - self.connection.shutdown(1) - - -class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): - - allow_reuse_address = True - - def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler, - logRequests=False, encoding=None, bind_and_activate=True, - address_family=socket.AF_INET): - self.logRequests = logRequests - StratumJSONRPCDispatcher.__init__(self, encoding) - # TCPServer.__init__ has an extra parameter on 2.6+, so - # check Python version and decide on how to call it - vi = sys.version_info - self.address_family = address_family - if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: - # Unix sockets can't be bound if they already exist in the - # filesystem. The convention of e.g. X11 is to unlink - # before binding again. - if os.path.exists(addr): - try: - os.unlink(addr) - except OSError: - logging.warning("Could not unlink socket %s", addr) - # if python 2.5 and lower - if vi[0] < 3 and vi[1] < 6: - SocketServer.TCPServer.__init__(self, addr, requestHandler) - else: - SocketServer.TCPServer.__init__(self, addr, requestHandler, - bind_and_activate) - if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): - flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) - flags |= fcntl.FD_CLOEXEC - fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) - - self.sessions = {} - - - - def create_session(self): - session_id = random_string(10) - self.sessions[session_id] = HttpSession(session_id) - return session_id - - def poll_session(self,session_id): - responses = self.sessions[session_id].pending_responses[:] - self.sessions[session_id].pending_responses = [] - print "poll: %d responses"%len(responses) - return responses - - -from processor import Session - -class HttpSession(Session): - - def __init__(self, session_id): - Session.__init__(self) - self.pending_responses = [] - print "new http session", session_id - - def send_response(self, response): - raw_response = json.dumps(response) - self.pending_responses.append(response) - -class HttpServer(threading.Thread): - def __init__(self, shared, _processor, host, port): - self.shared = shared - self.processor = _processor - threading.Thread.__init__(self) - self.daemon = True - self.host = host - self.port = port - self.lock = threading.Lock() - - def run(self): - # see http://code.google.com/p/jsonrpclib/ - from SocketServer import ThreadingMixIn - class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass - self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) - for s in ['server.peers', 'server.banner', 'transaction.broadcast', \ - 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']: - self.server.register_function(self.process, s) - - self.server.register_function(self.do_stop, 'stop') - - print "HTTP server started." - self.server.serve_forever() - - - def process(self, session_id, request): - #print session, request - session = self.server.sessions.get(session_id) - if session: - #print "zz",session_id,session - request['id'] = self.processor.store_session_id(session, request['id']) - self.processor.process(request) - - def do_stop(self, session, request): - self.shared.stop() - return 'ok' - - - diff --git a/transports/__init__.py b/transports/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/transports/native.py b/transports/native.py new file mode 100644 index 0000000..1b6a6ec --- /dev/null +++ b/transports/native.py @@ -0,0 +1,197 @@ +import thread, threading, time, socket, traceback, ast + + + +def random_string(N): + import random, string + return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) + +def timestr(): + return time.strftime("[%d/%m/%Y-%H:%M:%S]") + + +class NativeServer(threading.Thread): + + def __init__(self, shared, store, irc, banner, host, port): + threading.Thread.__init__(self) + self.banner = banner + self.store = store + self.irc = irc + self.sessions = {} + self.host = host + self.port = port + self.daemon = True + self.shared = shared + + + def modified_addresses(self,a_session): + import copy + session = copy.deepcopy(a_session) + addresses = session['addresses'] + session['last_time'] = time.time() + ret = {} + k = 0 + for addr in addresses: + status = self.store.get_status( addr ) + msg_id, last_status = addresses.get( addr ) + if last_status != status: + addresses[addr] = msg_id, status + ret[addr] = status + + return ret, addresses + + + def poll_session(self, session_id): + session = self.sessions.get(session_id) + if session is None: + print time.asctime(), "session not found", session_id + return -1, {} + else: + self.sessions[session_id]['last_time'] = time.time() + ret, addresses = self.modified_addresses(session) + if ret: self.sessions[session_id]['addresses'] = addresses + return repr( (self.store.block_number,ret)) + + + def add_address_to_session(self, session_id, address): + status = self.store.get_status(address) + self.sessions[session_id]['addresses'][address] = ("", status) + self.sessions[session_id]['last_time'] = time.time() + return status + + + def new_session(self, version, addresses): + session_id = random_string(10) + self.sessions[session_id] = { 'addresses':{}, 'version':version } + for a in addresses: + self.sessions[session_id]['addresses'][a] = ('','') + out = repr( (session_id, self.banner.replace('\\n','\n') ) ) + self.sessions[session_id]['last_time'] = time.time() + return out + + + def update_session(self, session_id,addresses): + """deprecated in 0.42, wad replaced by add_address_to_session""" + self.sessions[session_id]['addresses'] = {} + for a in addresses: + self.sessions[session_id]['addresses'][a] = '' + self.sessions[session_id]['last_time'] = time.time() + return 'ok' + + + + def native_client_thread(self, ipaddr,conn): + try: + ipaddr = ipaddr[0] + msg = '' + while 1: + d = conn.recv(1024) + msg += d + if not d: + break + if '#' in msg: + msg = msg.split('#', 1)[0] + break + try: + cmd, data = ast.literal_eval(msg) + except: + print "syntax error", repr(msg), ipaddr + conn.close() + return + + out = self.do_command(cmd, data, ipaddr) + if out: + #print ipaddr, cmd, len(out) + try: + conn.send(out) + except: + print "error, could not send" + + finally: + conn.close() + + + + def do_command(self, cmd, data, ipaddr): + + if cmd=='b': + out = "%d"%block_number + + elif cmd in ['session','new_session']: + try: + if cmd == 'session': + addresses = ast.literal_eval(data) + version = "old" + else: + version, addresses = ast.literal_eval(data) + if version[0]=="0": version = "v" + version + except: + print "error", data + return None + print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version + out = self.new_session(version, addresses) + + elif cmd=='address.subscribe': + try: + session_id, addr = ast.literal_eval(data) + except: + traceback.print_exc(file=sys.stdout) + print data + return None + out = self.add_address_to_session(session_id,addr) + + elif cmd=='update_session': + try: + session_id, addresses = ast.literal_eval(data) + except: + traceback.print_exc(file=sys.stdout) + return None + print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses) + out = self.update_session(session_id,addresses) + + elif cmd=='poll': + out = self.poll_session(data) + + elif cmd == 'h': + address = data + out = repr( self.store.get_history( address ) ) + + elif cmd =='tx': + out = self.store.send_tx(data) + print timestr(), "sent tx:", ipaddr, out + + elif cmd == 'peers': + out = repr(self.irc.get_peers()) + + else: + out = None + + return out + + + def clean_session_thread(self): + while not self.shared.stopped(): + time.sleep(30) + t = time.time() + for k,s in self.sessions.items(): + if s.get('type') == 'persistent': continue + t0 = s['last_time'] + if t - t0 > 5*60: + self.sessions.pop(k) + print "lost session", k + + def run(self): + thread.start_new_thread(self.clean_session_thread, ()) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((self.host, self.port)) + s.listen(1) + while not self.shared.stopped(): + conn, addr = s.accept() + try: + thread.start_new_thread(self.native_client_thread, (addr, conn,)) + except: + # can't start new thread if there is no memory.. + traceback.print_exc(file=sys.stdout) + diff --git a/transports/stratum_http.py b/transports/stratum_http.py new file mode 100644 index 0000000..499628b --- /dev/null +++ b/transports/stratum_http.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python +# Copyright(C) 2012 thomasv@gitorious + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public +# License along with this program. If not, see +# . + +import jsonrpclib +from jsonrpclib import Fault +from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS +import SimpleXMLRPCServer +import SocketServer +import socket +import logging +import os +import types +import traceback +import sys, threading + +try: + import fcntl +except ImportError: + # For Windows + fcntl = None + +import json + + +""" +sessions are identified with cookies + - each session has a buffer of responses to requests + + +from the processor point of view: + - the user only defines process() ; the rest is session management. thus sessions should not belong to processor + +""" + + +def random_string(N): + import random, string + return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) + + + + +def get_version(request): + # must be a dict + if 'jsonrpc' in request.keys(): + return 2.0 + if 'id' in request.keys(): + return 1.0 + return None + +def validate_request(request): + if type(request) is not types.DictType: + fault = Fault( + -32600, 'Request must be {}, not %s.' % type(request) + ) + return fault + rpcid = request.get('id', None) + version = get_version(request) + if not version: + fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) + return fault + request.setdefault('params', []) + method = request.get('method', None) + params = request.get('params') + param_types = (types.ListType, types.DictType, types.TupleType) + if not method or type(method) not in types.StringTypes or \ + type(params) not in param_types: + fault = Fault( + -32600, 'Invalid request parameters or method.', rpcid=rpcid + ) + return fault + return True + +class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): + + def __init__(self, encoding=None): + SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, + allow_none=True, + encoding=encoding) + + def _marshaled_dispatch(self, session_id, data, dispatch_method = None): + response = None + try: + request = jsonrpclib.loads(data) + except Exception, e: + fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e)) + response = fault.response() + return response + + responses = [] + if type(request) is not types.ListType: + request = [ request ] + + for req_entry in request: + result = validate_request(req_entry) + if type(result) is Fault: + responses.append(result.response()) + continue + resp_entry = self._marshaled_single_dispatch(session_id, req_entry) + if resp_entry is not None: + responses.append(resp_entry) + + r = self.poll_session(session_id) + for item in r: + responses.append(json.dumps(item)) + + if len(responses) > 1: + response = '[%s]' % ','.join(responses) + elif len(responses) == 1: + response = responses[0] + else: + response = '' + + return response + + def _marshaled_single_dispatch(self, session_id, request): + # TODO - Use the multiprocessing and skip the response if + # it is a notification + # Put in support for custom dispatcher here + # (See SimpleXMLRPCServer._marshaled_dispatch) + method = request.get('method') + params = request.get('params') + try: + response = self._dispatch(method, session_id, request) + except: + exc_type, exc_value, exc_tb = sys.exc_info() + fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) + return fault.response() + if 'id' not in request.keys() or request['id'] == None: + # It's a notification + return None + + try: + response = jsonrpclib.dumps(response, + methodresponse=True, + rpcid=request['id'] + ) + return response + except: + exc_type, exc_value, exc_tb = sys.exc_info() + fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) + return fault.response() + + def _dispatch(self, method, session_id, request): + func = None + try: + func = self.funcs[method] + except KeyError: + if self.instance is not None: + if hasattr(self.instance, '_dispatch'): + return self.instance._dispatch(method, params) + else: + try: + func = SimpleXMLRPCServer.resolve_dotted_attribute( + self.instance, + method, + True + ) + except AttributeError: + pass + if func is not None: + try: + response = func(session_id, request) + return response + except TypeError: + return Fault(-32602, 'Invalid parameters.') + except: + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % + trace_string) + return fault + else: + return Fault(-32601, 'Method %s not supported.' % method) + +class StratumJSONRPCRequestHandler( + SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): + + def do_GET(self): + if not self.is_rpc_path_valid(): + self.report_404() + return + try: + session_id = None + c = self.headers.get('cookie') + if c: + if c[0:8]=='SESSION=': + #print "found cookie", c[8:] + session_id = c[8:] + + if session_id is None: + session_id = self.server.create_session() + #print "setting cookie", session_id + + data = json.dumps([]) + response = self.server._marshaled_dispatch(session_id, data) + self.send_response(200) + except Exception, e: + self.send_response(500) + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) + response = fault.response() + print "500", trace_string + if response == None: + response = '' + + if session_id: + self.send_header("Set-Cookie", "SESSION=%s"%session_id) + + self.send_header("Content-type", "application/json-rpc") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + self.wfile.flush() + self.connection.shutdown(1) + + + def do_POST(self): + if not self.is_rpc_path_valid(): + self.report_404() + return + try: + max_chunk_size = 10*1024*1024 + size_remaining = int(self.headers["content-length"]) + L = [] + while size_remaining: + chunk_size = min(size_remaining, max_chunk_size) + L.append(self.rfile.read(chunk_size)) + size_remaining -= len(L[-1]) + data = ''.join(L) + + session_id = None + c = self.headers.get('cookie') + if c: + if c[0:8]=='SESSION=': + print "found cookie", c[8:] + session_id = c[8:] + + if session_id is None: + session_id = self.server.create_session() + print "setting cookie", session_id + + response = self.server._marshaled_dispatch(session_id, data) + self.send_response(200) + except Exception, e: + self.send_response(500) + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) + response = fault.response() + print "500", trace_string + if response == None: + response = '' + + if session_id: + self.send_header("Set-Cookie", "SESSION=%s"%session_id) + + self.send_header("Content-type", "application/json-rpc") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + self.wfile.flush() + self.connection.shutdown(1) + + +class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): + + allow_reuse_address = True + + def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler, + logRequests=False, encoding=None, bind_and_activate=True, + address_family=socket.AF_INET): + self.logRequests = logRequests + StratumJSONRPCDispatcher.__init__(self, encoding) + # TCPServer.__init__ has an extra parameter on 2.6+, so + # check Python version and decide on how to call it + vi = sys.version_info + self.address_family = address_family + if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: + # Unix sockets can't be bound if they already exist in the + # filesystem. The convention of e.g. X11 is to unlink + # before binding again. + if os.path.exists(addr): + try: + os.unlink(addr) + except OSError: + logging.warning("Could not unlink socket %s", addr) + # if python 2.5 and lower + if vi[0] < 3 and vi[1] < 6: + SocketServer.TCPServer.__init__(self, addr, requestHandler) + else: + SocketServer.TCPServer.__init__(self, addr, requestHandler, + bind_and_activate) + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): + flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) + + self.sessions = {} + + + + def create_session(self): + session_id = random_string(10) + self.sessions[session_id] = HttpSession(session_id) + return session_id + + def poll_session(self,session_id): + responses = self.sessions[session_id].pending_responses[:] + self.sessions[session_id].pending_responses = [] + print "poll: %d responses"%len(responses) + return responses + + +from processor import Session + +class HttpSession(Session): + + def __init__(self, session_id): + Session.__init__(self) + self.pending_responses = [] + print "new http session", session_id + + def send_response(self, response): + raw_response = json.dumps(response) + self.pending_responses.append(response) + +class HttpServer(threading.Thread): + def __init__(self, shared, _processor, host, port): + self.shared = shared + self.processor = _processor + threading.Thread.__init__(self) + self.daemon = True + self.host = host + self.port = port + self.lock = threading.Lock() + + def run(self): + # see http://code.google.com/p/jsonrpclib/ + from SocketServer import ThreadingMixIn + class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass + self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) + for s in ['server.peers', 'server.banner', 'transaction.broadcast', \ + 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']: + self.server.register_function(self.process, s) + + self.server.register_function(self.do_stop, 'stop') + + print "HTTP server started." + self.server.serve_forever() + + + def process(self, session_id, request): + #print session, request + session = self.server.sessions.get(session_id) + if session: + #print "zz",session_id,session + request['id'] = self.processor.store_session_id(session, request['id']) + self.processor.process(request) + + def do_stop(self, session, request): + self.shared.stop() + return 'ok' + + + diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py new file mode 100644 index 0000000..ce39aca --- /dev/null +++ b/transports/stratum_tcp.py @@ -0,0 +1,131 @@ +import json +import socket +import threading +import time +import Queue as queue + +from processor import Session, Dispatcher, Shared + +class TcpSession(Session): + + def __init__(self, connection, address): + self._connection = connection + self.address = address + Session.__init__(self) + print "New session", address + + def connection(self): + if self.stopped(): + raise Exception("Session was stopped") + else: + return self._connection + + def stop(self): + self._connection.close() + print "Terminating connection:", self.address[0] + with self.lock: + self._stopped = True + + def send_response(self, response): + raw_response = json.dumps(response) + # Possible race condition here by having session + # close connection? + # I assume Python connections are thread safe interfaces + try: + connection = self.connection() + connection.send(raw_response + "\n") + except: + self.stop() + + + +class TcpClientRequestor(threading.Thread): + + def __init__(self, shared, processor, session): + self.shared = shared + self.processor = processor + self.message = "" + self.session = session + threading.Thread.__init__(self) + + def run(self): + while not self.shared.stopped(): + if not self.update(): + break + + while self.parse(): + pass + + def update(self): + data = self.receive() + if not data: + # close_session + self.session.stop() + return False + + self.message += data + return True + + def receive(self): + try: + return self.session.connection().recv(1024) + except: + return '' + + def parse(self): + raw_buffer = self.message.find('\n') + if raw_buffer == -1: + return False + + raw_command = self.message[0:raw_buffer].strip() + self.message = self.message[raw_buffer + 1:] + if raw_command == 'quit': + self.session.stop() + return False + + try: + command = json.loads(raw_command) + except: + self.processor.push_response({"error": "bad JSON", "request": raw_command}) + return True + + try: + # Try to load vital fields, and return an error if + # unsuccessful. + message_id = command['id'] + method = command['method'] + except KeyError: + # Return an error JSON in response. + self.processor.push_response({"error": "syntax error", "request": raw_command}) + else: + self.processor.push_request(self.session,command) + + return True + +class TcpServer(threading.Thread): + + def __init__(self, shared, processor, host, port): + self.shared = shared + self.processor = processor + threading.Thread.__init__(self) + self.daemon = True + self.host = host + self.port = port + self.lock = threading.Lock() + + def run(self): + print "TCP server started." + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((self.host, self.port)) + sock.listen(1) + while not self.shared.stopped(): + session = TcpSession(*sock.accept()) + client_req = TcpClientRequestor(self.shared, self.processor, session) + client_req.start() + self.processor.add_session(session) + self.processor.collect_garbage() + + + + -- 1.7.1