From 70f23254388eb74ca7a05009ccf2a5d7a723ec36 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Wed, 28 Mar 2012 16:53:21 +0400 Subject: [PATCH] major reorganisation, http works now --- processor.py | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ server.py | 141 +++++---------------------------------------- stratum.py | 169 +++++-------------------------------------------------- stratum_http.py | 52 +++++++++-------- 4 files changed, 207 insertions(+), 304 deletions(-) create mode 100644 processor.py diff --git a/processor.py b/processor.py new file mode 100644 index 0000000..d0fe91f --- /dev/null +++ b/processor.py @@ -0,0 +1,149 @@ +import json +import socket +import threading +import time +import Queue as queue + +class Shared: + + def __init__(self): + self.lock = threading.Lock() + self._stopped = False + + def stop(self): + print "Stopping Stratum" + with self.lock: + self._stopped = True + + def stopped(self): + with self.lock: + return self._stopped + + +class Processor(threading.Thread): + + def __init__(self): + self.shared = None + threading.Thread.__init__(self) + self.daemon = True + self.request_queue = queue.Queue() + self.response_queue = queue.Queue() + self.internal_ids = {} + self.internal_id = 1 + self.lock = threading.Lock() + self.sessions = [] + + def push_response(self, item): + self.response_queue.put(item) + + def pop_response(self): + return self.response_queue.get() + + def push_request(self, session, item): + self.request_queue.put((session,item)) + + def pop_request(self): + return self.request_queue.get() + + def get_session_id(self, internal_id): + with self.lock: + return self.internal_ids.pop(internal_id) + + def store_session_id(self, session, msgid): + with self.lock: + self.internal_ids[self.internal_id] = session, msgid + r = self.internal_id + self.internal_id += 1 + return r + + def run(self): + if self.shared is None: + raise TypeError("self.shared not set in Processor") + while not self.shared.stopped(): + session, request = self.pop_request() + + method = request['method'] + params = request.get('params',[]) + + if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']: + session.subscribe_to_service(method, params) + + # store session and id locally + request['id'] = self.store_session_id(session, request['id']) + self.process(request) + + self.stop() + + def stop(self): + pass + + def process(self, request): + print "New request", request + # Do stuff... + # response = request + # When ready, you call + # self.push_response(response) + + def add_session(self, session): + with self.lock: + self.sessions.append(session) + + def collect_garbage(self): + # Deep copy entire sessions list and blank it + # This is done to minimise lock contention + with self.lock: + sessions = self.sessions[:] + self.sessions = [] + for session in sessions: + if not session.stopped(): + # If session is still alive then re-add it back + # to our internal register + self.add_session(session) + + +class Session: + + def __init__(self): + self._stopped = False + self.lock = threading.Lock() + self.subscriptions = [] + + def stopped(self): + with self.lock: + return self._stopped + + def subscribe_to_service(self, method, params): + with self.lock: + self.subscriptions.append((method, params)) + + +class Dispatcher(threading.Thread): + + def __init__(self, shared, processor): + self.shared = shared + self.processor = processor + threading.Thread.__init__(self) + + def run(self): + while not self.shared.stopped(): + response = self.processor.pop_response() + #print "pop response", response + internal_id = response.get('id') + params = response.get('params',[]) + try: + method = response['method'] + except: + print "no method", response + continue + + if internal_id: + session, message_id = self.processor.get_session_id(internal_id) + response['id'] = message_id + session.send_response(response) + + else: + for session in self.processor.sessions: + if not session.stopped(): + if (method,params) in session.subscriptions: + session.send_response(response) + diff --git a/server.py b/server.py index 1c34fae..6ab0512 100755 --- a/server.py +++ b/server.py @@ -64,42 +64,15 @@ except: password = config.get('server','password') - stopping = False sessions = {} -m_sessions = [{}] # served by http - - -from Queue import Queue -input_queue = Queue() -output_queue = Queue() - - def random_string(N): import random, string return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) - - -def cmd_stop(_,__,pw): - global stopping - if password == pw: - stopping = True - return 'ok' - else: - return 'wrong password' - -def cmd_load(_,__,pw): - if password == pw: - return repr( len(sessions) ) - else: - return 'wrong password' - - - def modified_addresses(a_session): @@ -135,54 +108,13 @@ def poll_session(session_id): return repr( (store.block_number,ret)) -def poll_session_json(session_id, message_id): - session = m_sessions[0].get(session_id) - if session is None: - raise BaseException("session not found %s"%session_id) - else: - m_sessions[0][session_id]['last_time'] = time.time() - out = [] - ret, addresses = modified_addresses(session) - if ret: - m_sessions[0][session_id]['addresses'] = addresses - for addr in ret: - msg_id, status = addresses[addr] - out.append( { 'id':msg_id, 'result':status } ) - - msg_id, last_nb = session.get('numblocks') - if last_nb: - if last_nb != block_number: - m_sessions[0][session_id]['numblocks'] = msg_id, block_number - out.append( {'id':msg_id, 'result':block_number} ) - - return out - - - - -def address_get_history_json(_,message_id,address): - return store.get_history(address) - -def subscribe_to_numblocks_json(session_id, message_id): - global m_sessions - m_sessions[0][session_id]['numblocks'] = message_id,block_number - return block_number - -def add_address_to_session_json(session_id, message_id, address): - global m_sessions - sessions = m_sessions[0] - status = store.get_status(address) - sessions[session_id]['addresses'][address] = (message_id, status) - sessions[session_id]['last_time'] = time.time() - m_sessions[0] = sessions - return status - def add_address_to_session(session_id, address): status = store.get_status(address) sessions[session_id]['addresses'][address] = ("", status) sessions[session_id]['last_time'] = time.time() return status + def new_session(version, addresses): session_id = random_string(10) sessions[session_id] = { 'addresses':{}, 'version':version } @@ -193,17 +125,6 @@ def new_session(version, addresses): return out -def client_version_json(session_id, _, version): - global m_sessions - sessions = m_sessions[0] - sessions[session_id]['version'] = version - m_sessions[0] = sessions - - - -def get_banner(_,__): - return config.get('server','banner').replace('\\n','\n') - def update_session(session_id,addresses): """deprecated in 0.42, wad replaced by add_address_to_session""" sessions[session_id]['addresses'] = {} @@ -212,6 +133,7 @@ def update_session(session_id,addresses): sessions[session_id]['last_time'] = time.time() return 'ok' + def native_server_thread(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -303,20 +225,13 @@ def do_command(cmd, data, ipaddr): out = poll_session(data) elif cmd == 'h': - # history address = data out = repr( store.get_history( address ) ) - elif cmd == 'load': - out = cmd_load(None,None,data) - elif cmd =='tx': out = store.send_tx(data) print timestr(), "sent tx:", ipaddr, out - elif cmd == 'stop': - out = cmd_stop(data) - elif cmd == 'peers': out = repr(irc.get_peers()) @@ -341,9 +256,11 @@ def clean_session_thread(): #################################################################### -import stratum +from processor import Shared, Processor, Dispatcher +from stratum_http import HttpServer +from stratum import TcpServer -class AbeProcessor(stratum.Processor): +class AbeProcessor(Processor): def process(self,request): message_id = request['id'] method = request['method'] @@ -441,27 +358,6 @@ class Irc(threading.Thread): s.close() -def get_peers_json(_,__): - return irc.get_peers() - -def http_server_thread(): - # see http://code.google.com/p/jsonrpclib/ - from SocketServer import ThreadingMixIn - from StratumJSONRPCServer import StratumJSONRPCServer - class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass - server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081)) - server.register_function(get_peers_json, 'server.peers') - server.register_function(cmd_stop, 'stop') - server.register_function(cmd_load, 'load') - server.register_function(get_banner, 'server.banner') - server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast') - server.register_function(address_get_history_json, 'address.get_history') - server.register_function(add_address_to_session_json, 'address.subscribe') - server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe') - server.register_function(client_version_json, 'client.version') - server.register_function(create_session_json, 'session.create') # internal message (not part of protocol) - server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol) - server.serve_forever() if __name__ == '__main__': @@ -494,29 +390,24 @@ if __name__ == '__main__': # backend store = abe_backend.AbeStore(config) - # supported protocols + # old protocol thread.start_new_thread(native_server_thread, ()) thread.start_new_thread(clean_session_thread, ()) - #thread.start_new_thread(http_server_thread, ()) - - processor = AbeProcessor() - shared = stratum.Shared() + shared = Shared() # Bind shared to processor since constructor is user defined processor.shared = shared processor.start() - + # dispatcher + dispatcher = Dispatcher(shared, processor) + dispatcher.start() # Create various transports we need - - #tcp stratum - tcpserver = stratum.TcpServer(shared, processor, "ecdsa.org",50001) - tcpserver.start() - - #http stratum - from stratum_http import HttpServer - server = HttpServer(shared, processor, "ecdsa.org",8081) - server.start() + transports = [ TcpServer(shared, processor, "ecdsa.org",50001), + HttpServer(shared, processor, "ecdsa.org",8081) + ] + for server in transports: + server.start() if (config.get('server','irc') == 'yes' ): diff --git a/stratum.py b/stratum.py index 649987a..f364d93 100644 --- a/stratum.py +++ b/stratum.py @@ -4,90 +4,14 @@ import threading import time import Queue as queue -class Processor(threading.Thread): +from processor import Session, Dispatcher, Shared - def __init__(self): - self.shared = None - threading.Thread.__init__(self) - self.daemon = True - self.request_queue = queue.Queue() - self.response_queue = queue.Queue() - self.internal_ids = {} - self.internal_id = 1 - self.lock = threading.Lock() - - def push_response(self, item): - self.response_queue.put(item) - - def pop_response(self): - return self.response_queue.get() - - def push_request(self, session, item): - self.request_queue.put((session,item)) - - def pop_request(self): - return self.request_queue.get() - - def get_session_id(self, internal_id): - with self.lock: - return self.internal_ids.pop(internal_id) - - def store_session_id(self, session, msgid): - with self.lock: - self.internal_ids[self.internal_id] = session, msgid - r = self.internal_id - self.internal_id += 1 - return r - - def run(self): - if self.shared is None: - raise TypeError("self.shared not set in Processor") - while not self.shared.stopped(): - session, request = self.pop_request() - - method = request['method'] - params = request.get('params',[]) - - if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']: - session.subscribe_to_service(method, params) - - # store session and id locally - request['id'] = self.store_session_id(session, request['id']) - self.process(request) - - self.stop() - - def stop(self): - pass - - def process(self, request): - print "New request", request - # Do stuff... - # response = request - # When ready, you call - # self.push_response(response) - - - -class Session: +class TcpSession(Session): def __init__(self, connection, address): self._connection = connection self.address = address - self._stopped = False - self.lock = threading.Lock() - self.subscriptions = [] - print "new session", address - - def stop(self): - self._connection.close() - print "Terminating connection:", self.address[0] - with self.lock: - self._stopped = True - - def stopped(self): - with self.lock: - return self._stopped + Session.__init__(self) def connection(self): if self.stopped(): @@ -95,55 +19,24 @@ class Session: else: return self._connection - def subscribe_to_service(self, method, params): + def stop(self): + self._connection.close() + print "Terminating connection:", self.address[0] with self.lock: - self.subscriptions.append((method, params)) - - - -class TcpResponder(threading.Thread): - - def __init__(self, shared, processor, server): - self.shared = shared - self.processor = processor - self.server = server - threading.Thread.__init__(self) - - - def run(self): - while not self.shared.stopped(): - response = self.processor.pop_response() - internal_id = response.get('id') - params = response.get('params',[]) - try: - method = response['method'] - except: - print "no method", response - continue - - if internal_id: - session, message_id = self.processor.get_session_id(internal_id) - response['id'] = message_id - self.send_response(response, session) - - else: - for session in self.server.sessions: - if not session.stopped(): - if (method,params) in session.subscriptions: - self.send_response(response, session) - - + self._stopped = True - def send_response(self, response, session): + def send_response(self, response): raw_response = json.dumps(response) # Possible race condition here by having session # close connection? # I assume Python connections are thread safe interfaces try: - connection = session.connection() + connection = self.connection() connection.send(raw_response + "\n") except: - session.stop() + self.stop() + + class TcpClientRequestor(threading.Thread): @@ -213,7 +106,6 @@ class TcpServer(threading.Thread): def __init__(self, shared, processor, host, port): self.shared = shared self.processor = processor - self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.host = host @@ -226,46 +118,15 @@ class TcpServer(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) - responder = TcpResponder(self.shared, self.processor, self) - responder.start() while not self.shared.stopped(): - session = Session(*sock.accept()) + session = TcpSession(*sock.accept()) client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - self.add_session(session) - self.collect_garbage() - - def add_session(self, session): - with self.lock: - self.sessions.append(session) - - def collect_garbage(self): - # Deep copy entire sessions list and blank it - # This is done to minimise lock contention - with self.lock: - sessions = self.sessions[:] - self.sessions = [] - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - self.add_session(session) - + self.processor.add_session(session) + self.processor.collect_garbage() -class Shared: - - def __init__(self): - self.lock = threading.Lock() - self._stopped = False - def stop(self): - print "Stopping Stratum" - with self.lock: - self._stopped = True - def stopped(self): - with self.lock: - return self._stopped class Stratum: diff --git a/stratum_http.py b/stratum_http.py index 5e49b4f..499628b 100644 --- a/stratum_http.py +++ b/stratum_http.py @@ -199,12 +199,12 @@ class StratumJSONRPCRequestHandler( c = self.headers.get('cookie') if c: if c[0:8]=='SESSION=': - print "found cookie", c[8:] + #print "found cookie", c[8:] session_id = c[8:] if session_id is None: session_id = self.server.create_session() - print "setting cookie", session_id + #print "setting cookie", session_id data = json.dumps([]) response = self.server._marshaled_dispatch(session_id, data) @@ -317,31 +317,28 @@ class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): def create_session(self): session_id = random_string(10) - self.sessions[session_id] = { 'addresses':{}, 'responses':[]} + self.sessions[session_id] = HttpSession(session_id) return session_id def poll_session(self,session_id): - responses = self.sessions[session_id]['responses'] - self.sessions[session_id]['responses'] = [] + responses = self.sessions[session_id].pending_responses[:] + self.sessions[session_id].pending_responses = [] print "poll: %d responses"%len(responses) return responses -class HttpResponder(threading.Thread): - """read responses from the queue and dispatch them to sessions""" - def __init__(self, shared, processor): - self.shared = shared - self.processor = processor - threading.Thread.__init__(self) +from processor import Session - def run(self): - while not self.shared.stopped(): - session,response = self.processor.pop_response() - if not session.stopped(): - raw_response = json.dumps(response) - session.responses.append(response) +class HttpSession(Session): + def __init__(self, session_id): + Session.__init__(self) + self.pending_responses = [] + print "new http session", session_id + def send_response(self, response): + raw_response = json.dumps(response) + self.pending_responses.append(response) class HttpServer(threading.Thread): def __init__(self, shared, _processor, host, port): @@ -356,20 +353,25 @@ class HttpServer(threading.Thread): def run(self): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn - from StratumJSONRPCServer import StratumJSONRPCServer class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass - server = StratumThreadedJSONRPCServer(( self.host, self.port)) + self.server = StratumThreadedJSONRPCServer(( self.host, self.port)) for s in ['server.peers', 'server.banner', 'transaction.broadcast', \ 'address.get_history','address.subscribe', 'numblocks.subscribe', 'client.version']: - server.register_function(self.process, s) + self.server.register_function(self.process, s) + + self.server.register_function(self.do_stop, 'stop') - server.register_function(self.do_stop, 'stop') print "HTTP server started." - server.serve_forever() + self.server.serve_forever() + - def process(self, session, request): - print session, request - self.processor.process(request) + def process(self, session_id, request): + #print session, request + session = self.server.sessions.get(session_id) + if session: + #print "zz",session_id,session + request['id'] = self.processor.store_session_id(session, request['id']) + self.processor.process(request) def do_stop(self, session, request): self.shared.stop() -- 1.7.1