From: ThomasV Date: Thu, 29 Mar 2012 10:02:07 +0000 (+0400) Subject: create dispatcher class; redefine processors as threads X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=5ee042c213baa4ae8a0f538de64644ae9abe7b36 create dispatcher class; redefine processors as threads --- diff --git a/abe_backend.py b/abe_backend.py index f66ddf7..dc7056a 100644 --- a/abe_backend.py +++ b/abe_backend.py @@ -31,12 +31,11 @@ class AbeStore(Datastore_class): self.address_queue = Queue() self.dblock = thread.allocate_lock() - self.block_number = -1 - self.watched_addresses = [] def import_block(self, b, chain_ids=frozenset()): + #print "import block" block_id = super(AbeStore, self).import_block(b, chain_ids) for pos in xrange(len(b['transactions'])): tx = b['transactions'][pos] @@ -369,50 +368,27 @@ class AbeStore(Datastore_class): return block_number - def watch_address(self, addr): - if addr not in self.watched_addresses: - self.watched_addresses.append(addr) - def run(self, processor): - - old_block_number = None - while not processor.shared.stopped(): - self.block_number = self.main_iteration() +from processor import Processor - if self.block_number != old_block_number: - old_block_number = self.block_number - processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) +class AbeProcessor(Processor): - while True: - try: - addr = self.address_queue.get(False) - except: - break - if addr in self.watched_addresses: - status = self.get_status( addr ) - processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) - - time.sleep(10) - - - -class AbeBackend: - - def __init__(self,config, processor): + def __init__(self, config): + Processor.__init__(self) self.store = AbeStore(config) - self.store.processor = processor - thread.start_new_thread(self.store.run,(processor,)) + self.block_number = -1 + self.watched_addresses = [] - def process(self, request, queue): + def process(self, request): message_id = request['id'] method = request['method'] params = request.get('params',[]) result = '' if method == 'blockchain.numblocks.subscribe': - result = self.store.block_number + result = self.block_number elif method == 'blockchain.address.subscribe': address = params[0] - self.store.watch_address(address) + self.watch_address(address) status = self.store.get_status(address) result = status elif method == 'blockchain.address.get_history': @@ -427,7 +403,34 @@ class AbeBackend: if result != '': response = { 'id':message_id, 'method':method, 'params':params, 'result':result } - queue.put(response) + self.push_response(response) + + + def watch_address(self, addr): + if addr not in self.watched_addresses: + self.watched_addresses.append(addr) + + + def run(self): + + old_block_number = None + while not self.shared.stopped(): + self.block_number = self.store.main_iteration() + + if self.block_number != old_block_number: + old_block_number = self.block_number + self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) + + while True: + try: + addr = self.store.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.get_status( addr ) + self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) + + time.sleep(10) diff --git a/irc.py b/irc.py index bae1f80..908089a 100644 --- a/irc.py +++ b/irc.py @@ -6,23 +6,27 @@ def random_string(N): import random, string return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) +from processor import Processor -class Irc(threading.Thread): +class ServerProcessor(Processor): - def __init__(self, processor, host, nick): - self.processor = processor - threading.Thread.__init__(self) + def __init__(self, config): + Processor.__init__(self) self.daemon = True self.peers = {} - self.host = host - self.nick = nick + self.banner = config.get('server','banner') + self.host = config.get('server','host') + self.nick = config.get('server','ircname') + self.irc = config.get('server','irc') == 'yes' def get_peers(self): return self.peers.values() def run(self): + if not self.irc: + return NICK = 'E_'+random_string(10) - while not self.processor.shared.stopped(): + while not self.shared.stopped(): try: s = socket.socket() s.connect(('irc.freenode.net', 6667)) @@ -31,7 +35,7 @@ class Irc(threading.Thread): s.send('JOIN #electrum\n') sf = s.makefile('r', 0) t = 0 - while not self.processor.shared.stopped(): + while not self.shared.stopped(): line = sf.readline() line = line.rstrip('\r\n') line = line.split() @@ -51,7 +55,7 @@ class Irc(threading.Thread): host = line[k+9] self.peers[name] = (ip,host) if time.time() - t > 5*60: - self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]}) + self.push_response({'method':'server.peers', 'result':[self.get_peers()]}) s.send('NAMES #electrum\n') t = time.time() self.peers = {} @@ -62,29 +66,21 @@ class Irc(threading.Thread): s.close() -class ServerBackend: - - def __init__(self, config, processor): - self.banner = config.get('server','banner') - self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname')) - self.irc.processor = processor - if (config.get('server','irc') == 'yes' ): - self.irc.start() - def process(self, request, queue): + def process(self, request): method = request['method'] result = '' if method == 'server.banner': result = self.banner.replace('\\n','\n') elif method == 'server.peers.subscribe': - result = self.irc.get_peers() + result = self.get_peers() else: print "unknown method", request if result!='': response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result } - queue.put(response) + self.push_response(response) diff --git a/modules/__init__.pyc b/modules/__init__.pyc index f6cbf88..67cc5f8 100644 Binary files a/modules/__init__.pyc and b/modules/__init__.pyc differ diff --git a/modules/python_bitcoin/__init__.py b/modules/python_bitcoin/__init__.py index 361b7dc..6f91305 100644 --- a/modules/python_bitcoin/__init__.py +++ b/modules/python_bitcoin/__init__.py @@ -96,8 +96,9 @@ class GhostValue: class NumblocksSubscribe: - def __init__(self, backend): + def __init__(self, backend, processor): self.backend = backend + self.processor = processor self.lock = threading.Lock() self.backend.blockchain.subscribe_reorganize(self.reorganize) self.backend.blockchain.fetch_last_depth(self.set_last_depth) @@ -112,14 +113,15 @@ class NumblocksSubscribe: def reorganize(self, ec, fork_point, arrivals, replaced): latest = fork_point + len(arrivals) self.latest.set(latest) - self.push_response({"method":"numblocks.subscribe", "result": latest}) + self.processor.push_response({"method":"numblocks.subscribe", "result": latest}) self.backend.blockchain.subscribe_reorganize(self.reorganize) class AddressGetHistory: - def __init__(self, backend): + def __init__(self, backend, processor): self.backend = backend + self.processor = processor def get(self, request): address = str(request["params"]) @@ -127,15 +129,16 @@ class AddressGetHistory: bitcoin.bind(self.respond, request, bitcoin._1)) def respond(self, request, result): - self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result}) + self.processor.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result}) + class LibbitcoinProcessor(Processor): - def __init__(self): - self.backend = Backend() - self.numblocks_subscribe = NumblocksSubscribe(self.backend) - self.address_get_history = AddressGetHistory(self.backend) + def __init__(self, config): Processor.__init__(self) + self.backend = Backend() + self.numblocks_subscribe = NumblocksSubscribe(self.backend, self) + self.address_get_history = AddressGetHistory(self.backend, self) def stop(self): self.backend.stop() @@ -171,13 +174,9 @@ class LibbitcoinProcessor(Processor): response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash} self.push_response(response) - - -def run(processor): - #processor = LibbitcoinProcessor() - print "Warning: pre-alpha prototype. Full of bugs." - while not processor.shared.stopped(): - if raw_input() == "quit": - shared.stop() - time.sleep(1) + def run(self): + # this class is a thread. it does nothing in this example. + print "Warning: pre-alpha prototype. Full of bugs." + while not self.shared.stopped(): + time.sleep(1) diff --git a/processor.py b/processor.py index a18594d..c6da150 100644 --- a/processor.py +++ b/processor.py @@ -24,7 +24,39 @@ class Shared: class Processor(threading.Thread): def __init__(self): - self.shared = None + threading.Thread.__init__(self) + self.daemon = True + self.dispatcher = None + + def process(self, request): + pass + + def push_response(self, response): + self.dispatcher.request_dispatcher.push_response(response) + + + +class Dispatcher: + + def __init__(self): + self.shared = Shared() + self.request_dispatcher = RequestDispatcher(self.shared) + self.request_dispatcher.start() + self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher) + self.response_dispatcher.start() + + def register(self, prefix, processor): + processor.dispatcher = self + processor.shared = self.shared + processor.start() + self.request_dispatcher.processors[prefix] = processor + + + +class RequestDispatcher(threading.Thread): + + def __init__(self, shared): + self.shared = shared threading.Thread.__init__(self) self.daemon = True self.request_queue = queue.Queue() @@ -58,9 +90,6 @@ class Processor(threading.Thread): self.internal_id += 1 return r - def register(self, prefix, function): - self.processors[prefix] = function - def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") @@ -87,12 +116,12 @@ class Processor(threading.Thread): # dispatch request to the relevant module.. prefix = request['method'].split('.')[0] try: - func = self.processors[prefix] + p = self.processors[prefix] except: print "error: no processor for", prefix return try: - func(request,self.response_queue) + p.process(request) except: traceback.print_exc(file=sys.stdout) @@ -130,7 +159,7 @@ class Session: self.subscriptions.append((method, params)) -class Dispatcher(threading.Thread): +class ResponseDispatcher(threading.Thread): def __init__(self, shared, processor): self.shared = shared diff --git a/server.py b/server.py index 9ee8dc8..2005d8b 100755 --- a/server.py +++ b/server.py @@ -15,11 +15,8 @@ # License along with this program. If not, see # . -import time, json, socket, operator, thread, ast, sys, re, traceback +import time, sys, traceback import ConfigParser -from json import dumps, loads -import urllib -import threading config = ConfigParser.ConfigParser() # set some defaults, which will be overwritten by the config file @@ -48,32 +45,28 @@ try: except: pass - password = config.get('server','password') +host = config.get('server','host') +use_libbitcoin = False -from processor import Shared, Processor, Dispatcher - - +from processor import Dispatcher from transports.stratum_http import HttpServer from transports.stratum_tcp import TcpServer from transports.native import NativeServer +from irc import ServerProcessor +from abe_backend import AbeProcessor - -import irc -import abe_backend -from processor import Processor - - - - - +if use_libbitcoin: + from modules.python_bitcoin import LibBitcoinProcessor as BlockchainProcessor +else: + from abe_backend import AbeProcessor as BlockchainProcessor if __name__ == '__main__': if len(sys.argv)>1: import jsonrpclib - server = jsonrpclib.Server('http://%s:8081'%config.get('server','host')) + server = jsonrpclib.Server('http://%s:8081'%host) cmd = sys.argv[1] if cmd == 'stop': out = server.stop(password) @@ -82,27 +75,21 @@ if __name__ == '__main__': print out sys.exit(0) - processor = Processor() - shared = Shared() - # Bind shared to processor since constructor is user defined - processor.shared = shared - processor.start() + # Create hub + dispatcher = Dispatcher() + shared = dispatcher.shared - abe = abe_backend.AbeBackend(config, processor) - processor.register('blockchain', abe.process) + # Create and register processors + abe = BlockchainProcessor(config) + dispatcher.register('blockchain', abe) - sb = irc.ServerBackend(config, processor) - processor.register('server', sb.process) - - # dispatcher - dispatcher = Dispatcher(shared, processor) - dispatcher.start() + sb = ServerProcessor(config) + dispatcher.register('server', sb) # Create various transports we need - host = config.get('server','host') - transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000), - TcpServer(shared, processor, host, 50001), - HttpServer(shared, processor, host, 8081), + transports = [ NativeServer(shared, abe, sb, config.get('server','banner'), host, 50000), + TcpServer(dispatcher, host, 50001), + HttpServer(dispatcher, host, 8081), ] for server in transports: server.start() @@ -111,4 +98,3 @@ if __name__ == '__main__': while not shared.stopped(): time.sleep(1) print "server stopped" - diff --git a/transports/native.py b/transports/native.py index 1b6a6ec..729a428 100644 --- a/transports/native.py +++ b/transports/native.py @@ -1,4 +1,4 @@ -import thread, threading, time, socket, traceback, ast +import thread, threading, time, socket, traceback, ast, sys @@ -12,10 +12,11 @@ def timestr(): class NativeServer(threading.Thread): - def __init__(self, shared, store, irc, banner, host, port): + def __init__(self, shared, abe, irc, banner, host, port): threading.Thread.__init__(self) self.banner = banner - self.store = store + self.abe = abe + self.store = abe.store self.irc = irc self.sessions = {} self.host = host @@ -50,7 +51,7 @@ class NativeServer(threading.Thread): self.sessions[session_id]['last_time'] = time.time() ret, addresses = self.modified_addresses(session) if ret: self.sessions[session_id]['addresses'] = addresses - return repr( (self.store.block_number,ret)) + return repr( (self.abe.block_number,ret)) def add_address_to_session(self, session_id, address): @@ -115,7 +116,7 @@ class NativeServer(threading.Thread): def do_command(self, cmd, data, ipaddr): if cmd=='b': - out = "%d"%block_number + out = "%d"%self.abe.block_number elif cmd in ['session','new_session']: try: diff --git a/transports/stratum_http.py b/transports/stratum_http.py index 7a8ece2..a7ef49d 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -341,9 +341,9 @@ class HttpSession(Session): self.pending_responses.append(response) class HttpServer(threading.Thread): - def __init__(self, shared, _processor, host, port): - self.shared = shared - self.processor = _processor + def __init__(self, dispatcher, host, port): + self.shared = dispatcher.shared + self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host @@ -370,7 +370,7 @@ class HttpServer(threading.Thread): #print session, request session = self.server.sessions.get(session_id) if session: - self.processor.process(session, request) + self.dispatcher.process(session, request) def do_stop(self, session, request): self.shared.stop() diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index ce39aca..ab6c5c3 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -4,7 +4,7 @@ import threading import time import Queue as queue -from processor import Session, Dispatcher, Shared +from processor import Session, Dispatcher class TcpSession(Session): @@ -41,9 +41,9 @@ class TcpSession(Session): class TcpClientRequestor(threading.Thread): - def __init__(self, shared, processor, session): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, session): + self.shared = dispatcher.shared + self.dispatcher = dispatcher self.message = "" self.session = session threading.Thread.__init__(self) @@ -86,7 +86,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.processor.push_response({"error": "bad JSON", "request": raw_command}) + self.dispatcher.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -96,17 +96,17 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.processor.push_response({"error": "syntax error", "request": raw_command}) + self.dispatcher.push_response({"error": "syntax error", "request": raw_command}) else: - self.processor.push_request(self.session,command) + self.dispatcher.push_request(self.session,command) return True class TcpServer(threading.Thread): - def __init__(self, shared, processor, host, port): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, host, port): + self.shared = dispatcher.shared + self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host @@ -121,10 +121,10 @@ class TcpServer(threading.Thread): sock.listen(1) while not self.shared.stopped(): session = TcpSession(*sock.accept()) - client_req = TcpClientRequestor(self.shared, self.processor, session) + client_req = TcpClientRequestor(self.dispatcher, session) client_req.start() - self.processor.add_session(session) - self.processor.collect_garbage() + self.dispatcher.add_session(session) + self.dispatcher.collect_garbage()