From: ThomasV Date: Tue, 27 Mar 2012 19:10:24 +0000 (+0400) Subject: new workflow X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=aa3bd16ea7859ac656d58e3f0b8cd06b3e88af05 new workflow --- diff --git a/abe_backend.py b/abe_backend.py index 8f6edf3..ae43dfd 100644 --- a/abe_backend.py +++ b/abe_backend.py @@ -7,6 +7,7 @@ import psycopg2, binascii import thread, traceback, sys, urllib, operator from json import dumps, loads from Queue import Queue +import time class AbeStore(Datastore_class): @@ -30,6 +31,8 @@ class AbeStore(Datastore_class): self.address_queue = Queue() self.dblock = thread.allocate_lock() + self.block_number = -1 + self.watched_addresses = [] @@ -365,3 +368,28 @@ class AbeStore(Datastore_class): store.dblock.release() return block_number + + def watch_address(self, addr): + if addr not in self.watched_addresses: + self.watched_addresses.append(addr) + + def run(self, processor): + + old_block_number = None + while not processor.shared.stopped(): + self.block_number = self.main_iteration() + + if self.block_number != old_block_number: + old_block_number = self.block_number + processor.push_response({ 'method':'numblocks.subscribe', 'result':self.block_number }) + + while True: + try: + addr = self.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.get_status( addr ) + processor.push_response({ 'method':'address.subscribe', 'params':[addr], 'result':status }) + + time.sleep(10) diff --git a/server.py b/server.py index 9c2e2b8..1c9a409 100755 --- a/server.py +++ b/server.py @@ -66,7 +66,6 @@ except: password = config.get('server','password') stopping = False -block_number = -1 sessions = {} m_sessions = [{}] # served by http @@ -134,7 +133,7 @@ def poll_session(session_id): sessions[session_id]['last_time'] = time.time() ret, addresses = modified_addresses(session) if ret: sessions[session_id]['addresses'] = addresses - return repr( (block_number,ret)) + return repr( (store.block_number,ret)) def poll_session_json(session_id, message_id): @@ -201,15 +200,6 @@ def client_version_json(session_id, _, version): sessions[session_id]['version'] = version m_sessions[0] = sessions -def create_session_json(_, __): - sessions = m_sessions[0] - session_id = random_string(10) - print "creating session", session_id - sessions[session_id] = { 'addresses':{}, 'numblocks':('','') } - sessions[session_id]['last_time'] = time.time() - m_sessions[0] = sessions - return session_id - def get_banner(_,__): @@ -355,7 +345,7 @@ def clean_session_thread(): import stratum class AbeProcessor(stratum.Processor): - def process(self,session,request): + def process(self,request): message_id = request['id'] method = request['method'] params = request.get('params',[]) @@ -363,15 +353,15 @@ class AbeProcessor(stratum.Processor): result = '' if method == 'numblocks.subscribe': - session.subscribe_to_numblocks(message_id) - result = block_number + result = store.block_number elif method == 'address.subscribe': address = params[0] + store.watch_address(address) status = store.get_status(address) - session.subscribe_to_address(address,message_id,status) result = status elif method == 'client.version': - session.version = params[0] + #session.version = params[0] + pass elif method == 'server.banner': result = config.get('server','banner').replace('\\n','\n') elif method == 'server.peers': @@ -387,8 +377,8 @@ class AbeProcessor(stratum.Processor): print "unknown method", request if result!='': - response = { 'id':message_id, 'result':result } - self.push_response(session,response) + response = { 'id':message_id, 'method':method, 'params':params, 'result':result } + self.push_response(response) def get_status(self,addr): return store.get_status(addr) @@ -498,41 +488,33 @@ if __name__ == '__main__': # supported protocols thread.start_new_thread(native_server_thread, ()) - - thread.start_new_thread(http_server_thread, ()) thread.start_new_thread(clean_session_thread, ()) - #tcp stratum - stratum_processor = AbeProcessor() + #thread.start_new_thread(http_server_thread, ()) + + + processor = AbeProcessor() shared = stratum.Shared() # Bind shared to processor since constructor is user defined - stratum_processor.shared = shared - stratum_processor.start() + processor.shared = shared + processor.start() + # Create various transports we need - server = stratum.TcpServer(shared, stratum_processor, "ecdsa.org",50001) + + #tcp stratum + tcpserver = stratum.TcpServer(shared, processor, "ecdsa.org",50001) + tcpserver.start() + + #http stratum + from StratumJSONRPCServer import HttpServer + server = HttpServer(shared, processor, "ecdsa.org",8081) server.start() + if (config.get('server','irc') == 'yes' ): thread.start_new_thread(irc_thread, ()) print "starting Electrum server" - - old_block_number = None - while not stopping: - block_number = store.main_iteration() - - if block_number != old_block_number: - old_block_number = block_number - stratum_processor.update_from_blocknum(block_number) - - while True: - try: - addr = store.address_queue.get(False) - except: - break - - stratum_processor.update_from_address(addr) - - time.sleep(10) + store.run(processor) print "server stopped" diff --git a/stratum.py b/stratum.py index deb58b9..0ce7485 100644 --- a/stratum.py +++ b/stratum.py @@ -8,19 +8,14 @@ class Processor(threading.Thread): def __init__(self): self.shared = None - self.lock = threading.Lock() - self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.request_queue = queue.Queue() self.response_queue = queue.Queue() + self.id_session = {} - def add_session(self, session): - with self.lock: - self.sessions.append(session) - - def push_response(self, session, item): - self.response_queue.put((session,item)) + def push_response(self, item): + self.response_queue.put(item) def pop_response(self): return self.response_queue.get() @@ -31,61 +26,41 @@ class Processor(threading.Thread): def pop_request(self): return self.request_queue.get() - def collect_garbage(self): - # Deep copy entire sessions list and blank it - # This is done to minimise lock contention - with self.lock: - sessions = self.sessions[:] - self.sessions = [] - for session in sessions: - if not session.stopped(): - # If session is still alive then re-add it back - # to our internal register - self.add_session(session) - def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") while not self.shared.stopped(): - self.collect_garbage() session, request = self.pop_request() - self.process(session, request) + + method = request['method'] + params = request.get('params',[]) + + if method == 'numblocks.subscribe': + session.subscribe_to_numblocks() + + elif method == 'address.subscribe': + address = params[0] + session.subscribe_to_address(address) + + elif method == 'server.peers': + session.subscribe_to_peers() + + message_id = request['id'] + self.id_session[message_id] = session + self.process(request) self.stop() def stop(self): pass - def process(self, session, request): + def process(self, request): print "New request", request # Do stuff... # response = request # When ready, you call - # self.push_response(session,response) + # self.push_response(response) - def update_from_blocknum(self,block_number): - for session in self.sessions: - if not session.stopped(): - if session.numblocks_sub is not None: - response = { 'id':session.numblocks_sub, 'result':block_number } - self.push_response(session,response) - - def update_from_address(self,addr): - for session in self.sessions: - if not session.stopped(): - m = session.addresses_sub.get(addr) - if m: - status = self.get_status( addr ) - message_id, last_status = m - if status != last_status: - session.subscribe_to_address(addr,message_id, status) - response = { 'id':message_id, 'result':status } - self.push_response(session,response) - - def get_status(self,addr): - # return status of an address - # return store.get_status(addr) - pass class Session: @@ -115,34 +90,76 @@ class Session: else: return self._connection - def subscribe_to_numblocks(self,message_id): + def subscribe_to_numblocks(self): with self.lock: - self.numblocks_sub = message_id + self.numblocks_sub = True - def subscribe_to_address(self,address,message_id,status): + def subscribe_to_peers(self): + pass + + def subscribe_to_address(self,address): with self.lock: - self.addresses_sub[address] = message_id,status + self.addresses_sub[address] = 'unknown' class TcpResponder(threading.Thread): - def __init__(self, shared, processor): + def __init__(self, shared, processor, server): self.shared = shared self.processor = processor + self.server = server threading.Thread.__init__(self) + def run(self): while not self.shared.stopped(): - session,response = self.processor.pop_response() - raw_response = json.dumps(response) - # Possible race condition here by having session - # close connection? - # I assume Python connections are thread safe interfaces + response = self.processor.pop_response() + # if it is a subscription, find the list of sessions that suuscribed + + # if there is an id, there should be a session + # note: I must add and remove the session id to the message id.. + + message_id = response.get('id') try: - connection = session.connection() - connection.send(raw_response + "\n") + method = response['method'] except: - session.stop() + print "no method", response + continue + + if message_id: + session = self.processor.id_session.pop(message_id) + self.send_response(response, session) + + elif method == 'numblocks.subscribe': + for session in self.server.sessions: + if not session.stopped(): + if session.numblocks_sub: + self.send_response(response, session) + + elif method == 'address.subscribe': + for session in self.server.sessions: + if not session.stopped(): + addr = response['params'][0] + last_status = session.addresses_sub.get(addr) + if last_status: + new_status = response.get('result') + if new_status != last_status: + session.addresses_sub[addr] = new_status + self.send_response(response, session) + else: + print "error", response + + + def send_response(self, response, session): + raw_response = json.dumps(response) + # Possible race condition here by having session + # close connection? + # I assume Python connections are thread safe interfaces + try: + connection = session.connection() + connection.send(raw_response + "\n") + except: + session.stop() class TcpClientRequestor(threading.Thread): @@ -191,8 +208,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.processor.push_response(self.session, - {"error": "bad JSON", "request": raw_command}) + self.processor.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -202,8 +218,7 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.processor.push_response(self.session, - {"error": "syntax error", "request": raw_command}) + self.processor.push_response({"error": "syntax error", "request": raw_command}) else: self.processor.push_request(self.session,command) @@ -214,11 +229,12 @@ class TcpServer(threading.Thread): def __init__(self, shared, processor, host, port): self.shared = shared self.processor = processor - self.clients = [] + self.sessions = [] threading.Thread.__init__(self) self.daemon = True self.host = host self.port = port + self.lock = threading.Lock() def run(self): print "TCP server started." @@ -226,13 +242,31 @@ class TcpServer(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.host, self.port)) sock.listen(1) - responder = TcpResponder(self.shared, self.processor) + responder = TcpResponder(self.shared, self.processor, self) responder.start() while not self.shared.stopped(): session = Session(*sock.accept()) client_req = TcpClientRequestor(self.shared, self.processor, session) client_req.start() - self.processor.add_session(session) + self.add_session(session) + self.collect_garbage() + + def add_session(self, session): + with self.lock: + self.sessions.append(session) + + def collect_garbage(self): + # Deep copy entire sessions list and blank it + # This is done to minimise lock contention + with self.lock: + sessions = self.sessions[:] + self.sessions = [] + for session in sessions: + if not session.stopped(): + # If session is still alive then re-add it back + # to our internal register + self.add_session(session) + class Shared: