From 9d26d0c3417e94a653136589dd744c45576c6928 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Wed, 13 Jun 2012 18:24:15 +0400 Subject: [PATCH] restructuring: each processor has its own queue --- backends/abe/__init__.py | 44 ++++++++++++++++++----------------- backends/irc/__init__.py | 57 +++++++++++++++++++++++++++++++-------------- processor.py | 25 +++++++++++++------ 3 files changed, 79 insertions(+), 47 deletions(-) diff --git a/backends/abe/__init__.py b/backends/abe/__init__.py index eea0a03..24d309d 100644 --- a/backends/abe/__init__.py +++ b/backends/abe/__init__.py @@ -7,7 +7,7 @@ import binascii import thread, traceback, sys, urllib, operator from json import dumps, loads from Queue import Queue -import time +import time, threading class AbeStore(Datastore_class): @@ -409,8 +409,11 @@ class BlockchainProcessor(Processor): self.store = AbeStore(config) self.block_number = -1 self.watched_addresses = [] + threading.Timer(10, self.run_store_iteration).start() def process(self, request): + #print "abe process", request + message_id = request['id'] method = request['method'] params = request.get('params',[]) @@ -442,27 +445,26 @@ class BlockchainProcessor(Processor): self.watched_addresses.append(addr) - def run(self): + def run_store_iteration(self): + if self.shared.stopped(): + print "exit timer" + return - old_block_number = None - while not self.shared.stopped(): - self.block_number = self.store.main_iteration() - - if self.block_number != old_block_number: - old_block_number = self.block_number - print "block number:", self.block_number - self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] }) - - while True: - try: - addr = self.store.address_queue.get(False) - except: - break - if addr in self.watched_addresses: - status = self.store.get_status( addr ) - self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] }) - - time.sleep(10) + block_number = self.store.main_iteration() + if self.block_number != block_number: + self.block_number = block_number + print "block number:", self.block_number + self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] }) + + while True: + try: + addr = self.store.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.store.get_status( addr ) + self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] }) + threading.Timer(10, self.run_store_iteration).start() diff --git a/backends/irc/__init__.py b/backends/irc/__init__.py index 200d19f..87b6fa0 100644 --- a/backends/irc/__init__.py +++ b/backends/irc/__init__.py @@ -6,24 +6,20 @@ def random_string(N): from processor import Processor -class ServerProcessor(Processor): - def __init__(self, config): - Processor.__init__(self) - self.daemon = True - self.peers = {} - self.banner = config.get('server','banner') - self.host = config.get('server','host') - self.password = config.get('server','password') +class IrcThread(threading.Thread): + def __init__(self, processor, config): + threading.Thread.__init__(self) + self.processor = processor + self.daemon = True self.stratum_tcp_port = config.get('server','stratum_tcp_port') self.stratum_http_port = config.get('server','stratum_http_port') - - self.irc = config.get('server', 'irc') == 'yes' - self.nick = config.get('server', 'irc_nick') + self.peers = {} + self.host = config.get('server','host') + self.nick = config.get('server', 'irc_nick') if not self.nick: self.nick = random_string(10) - def get_peers(self): return self.peers.values() @@ -38,12 +34,9 @@ class ServerProcessor(Processor): def run(self): - if not self.irc: - return - ircname = self.getname() - while not self.shared.stopped(): + while not self.processor.shared.stopped(): try: s = socket.socket() s.connect(('irc.freenode.net', 6667)) @@ -52,7 +45,7 @@ class ServerProcessor(Processor): s.send('JOIN #electrum\n') sf = s.makefile('r', 0) t = 0 - while not self.shared.stopped(): + while not self.processor.shared.stopped(): line = sf.readline() line = line.rstrip('\r\n') line = line.split() @@ -73,7 +66,7 @@ class ServerProcessor(Processor): ports = line[k+10:] self.peers[name] = (ip, host, ports) if time.time() - t > 5*60: - self.push_response({'method':'server.peers', 'params':[self.get_peers()]}) + self.processor.push_response({'method':'server.peers', 'params':[self.get_peers()]}) s.send('NAMES #electrum\n') t = time.time() self.peers = {} @@ -83,7 +76,35 @@ class ServerProcessor(Processor): sf.close() s.close() + print "quitting IRC" + + +class ServerProcessor(Processor): + + def __init__(self, config): + Processor.__init__(self) + self.daemon = True + self.banner = config.get('server','banner') + self.password = config.get('server','password') + + if config.get('server', 'irc') == 'yes': + self.irc = IrcThread(self, config) + else: + self.irc = None + + + def get_peers(self): + if self.irc: + return self.irc.get_peers() + else: + return [] + + + def run(self): + if self.irc: + self.irc.start() + Processor.run(self) def process(self, request): method = request['method'] diff --git a/processor.py b/processor.py index 4cbaa85..96fb3c2 100644 --- a/processor.py +++ b/processor.py @@ -35,6 +35,7 @@ class Processor(threading.Thread): threading.Thread.__init__(self) self.daemon = True self.dispatcher = None + self.queue = queue.Queue() def process(self, request): pass @@ -43,6 +44,16 @@ class Processor(threading.Thread): #print "response", response self.dispatcher.request_dispatcher.push_response(response) + def run(self): + while not self.shared.stopped(): + request = self.queue.get(10000000000) + try: + self.process(request) + except: + traceback.print_exc(file=sys.stdout) + + print "processor terminating" + class Dispatcher: @@ -110,17 +121,18 @@ class RequestDispatcher(threading.Thread): raise TypeError("self.shared not set in Processor") while not self.shared.stopped(): session, request = self.pop_request() - self.process(session, request) + self.do_dispatch(session, request) self.stop() def stop(self): pass - def process(self, session, request): + def do_dispatch(self, session, request): + """ dispatch request to the relevant processor """ + method = request['method'] params = request.get('params',[]) - suffix = method.split('.')[-1] if suffix == 'subscribe': session.subscribe_to_service(method, params) @@ -128,17 +140,14 @@ class RequestDispatcher(threading.Thread): # store session and id locally request['id'] = self.store_session_id(session, request['id']) - # dispatch request to the relevant module.. prefix = request['method'].split('.')[0] try: p = self.processors[prefix] except: print "error: no processor for", prefix return - try: - p.process(request) - except: - traceback.print_exc(file=sys.stdout) + + p.queue.put(request) if method in ['server.version']: session.version = params[0] -- 1.7.1