From 5ee042c213baa4ae8a0f538de64644ae9abe7b36 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 29 Mar 2012 14:02:07 +0400 Subject: [PATCH] create dispatcher class; redefine processors as threads --- abe_backend.py | 73 ++++++++++++++++++----------------- irc.py | 36 ++++++++---------- modules/__init__.pyc | Bin 106 -> 127 bytes modules/python_bitcoin/__init__.py | 33 ++++++++-------- processor.py | 43 ++++++++++++++++++--- server.py | 58 +++++++++++------------------ transports/native.py | 11 +++-- transports/stratum_http.py | 8 ++-- transports/stratum_tcp.py | 26 ++++++------ 9 files changed, 151 insertions(+), 137 deletions(-) diff --git a/abe_backend.py b/abe_backend.py index f66ddf7..dc7056a 100644 --- a/abe_backend.py +++ b/abe_backend.py @@ -31,12 +31,11 @@ class AbeStore(Datastore_class): self.address_queue = Queue() self.dblock = thread.allocate_lock() - self.block_number = -1 - self.watched_addresses = [] def import_block(self, b, chain_ids=frozenset()): + #print "import block" block_id = super(AbeStore, self).import_block(b, chain_ids) for pos in xrange(len(b['transactions'])): tx = b['transactions'][pos] @@ -369,50 +368,27 @@ class AbeStore(Datastore_class): return block_number - def watch_address(self, addr): - if addr not in self.watched_addresses: - self.watched_addresses.append(addr) - def run(self, processor): - - old_block_number = None - while not processor.shared.stopped(): - self.block_number = self.main_iteration() +from processor import Processor - if self.block_number != old_block_number: - old_block_number = self.block_number - processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) +class AbeProcessor(Processor): - while True: - try: - addr = self.address_queue.get(False) - except: - break - if addr in self.watched_addresses: - status = self.get_status( addr ) - processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) - - time.sleep(10) - - - -class AbeBackend: - - def __init__(self,config, processor): + def __init__(self, config): + Processor.__init__(self) self.store = AbeStore(config) - self.store.processor = processor - thread.start_new_thread(self.store.run,(processor,)) + self.block_number = -1 + self.watched_addresses = [] - def process(self, request, queue): + def process(self, request): message_id = request['id'] method = request['method'] params = request.get('params',[]) result = '' if method == 'blockchain.numblocks.subscribe': - result = self.store.block_number + result = self.block_number elif method == 'blockchain.address.subscribe': address = params[0] - self.store.watch_address(address) + self.watch_address(address) status = self.store.get_status(address) result = status elif method == 'blockchain.address.get_history': @@ -427,7 +403,34 @@ class AbeBackend: if result != '': response = { 'id':message_id, 'method':method, 'params':params, 'result':result } - queue.put(response) + self.push_response(response) + + + def watch_address(self, addr): + if addr not in self.watched_addresses: + self.watched_addresses.append(addr) + + + def run(self): + + old_block_number = None + while not self.shared.stopped(): + self.block_number = self.store.main_iteration() + + if self.block_number != old_block_number: + old_block_number = self.block_number + self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) + + while True: + try: + addr = self.store.address_queue.get(False) + except: + break + if addr in self.watched_addresses: + status = self.get_status( addr ) + self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) + + time.sleep(10) diff --git a/irc.py b/irc.py index bae1f80..908089a 100644 --- a/irc.py +++ b/irc.py @@ -6,23 +6,27 @@ def random_string(N): import random, string return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) +from processor import Processor -class Irc(threading.Thread): +class ServerProcessor(Processor): - def __init__(self, processor, host, nick): - self.processor = processor - threading.Thread.__init__(self) + def __init__(self, config): + Processor.__init__(self) self.daemon = True self.peers = {} - self.host = host - self.nick = nick + self.banner = config.get('server','banner') + self.host = config.get('server','host') + self.nick = config.get('server','ircname') + self.irc = config.get('server','irc') == 'yes' def get_peers(self): return self.peers.values() def run(self): + if not self.irc: + return NICK = 'E_'+random_string(10) - while not self.processor.shared.stopped(): + while not self.shared.stopped(): try: s = socket.socket() s.connect(('irc.freenode.net', 6667)) @@ -31,7 +35,7 @@ class Irc(threading.Thread): s.send('JOIN #electrum\n') sf = s.makefile('r', 0) t = 0 - while not self.processor.shared.stopped(): + while not self.shared.stopped(): line = sf.readline() line = line.rstrip('\r\n') line = line.split() @@ -51,7 +55,7 @@ class Irc(threading.Thread): host = line[k+9] self.peers[name] = (ip,host) if time.time() - t > 5*60: - self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]}) + self.push_response({'method':'server.peers', 'result':[self.get_peers()]}) s.send('NAMES #electrum\n') t = time.time() self.peers = {} @@ -62,29 +66,21 @@ class Irc(threading.Thread): s.close() -class ServerBackend: - - def __init__(self, config, processor): - self.banner = config.get('server','banner') - self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname')) - self.irc.processor = processor - if (config.get('server','irc') == 'yes' ): - self.irc.start() - def process(self, request, queue): + def process(self, request): method = request['method'] result = '' if method == 'server.banner': result = self.banner.replace('\\n','\n') elif method == 'server.peers.subscribe': - result = self.irc.get_peers() + result = self.get_peers() else: print "unknown method", request if result!='': response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result } - queue.put(response) + self.push_response(response) diff --git a/modules/__init__.pyc b/modules/__init__.pyc index f6cbf88c89e298a753820d181c36a5043d68197f..67cc5f84919dc5198fce6ce6cbebb6b407d49a38 100644 GIT binary patch delta 41 wcmd0*XJh`%%Qe$#BAdCS1_J|wettoTerir?a!FBXu6}W9QCVt{{zOMj0Q$oW;Q#;t delta 20 bcmb==Vq^Zy%jIW2k. -import time, json, socket, operator, thread, ast, sys, re, traceback +import time, sys, traceback import ConfigParser -from json import dumps, loads -import urllib -import threading config = ConfigParser.ConfigParser() # set some defaults, which will be overwritten by the config file @@ -48,32 +45,28 @@ try: except: pass - password = config.get('server','password') +host = config.get('server','host') +use_libbitcoin = False -from processor import Shared, Processor, Dispatcher - - +from processor import Dispatcher from transports.stratum_http import HttpServer from transports.stratum_tcp import TcpServer from transports.native import NativeServer +from irc import ServerProcessor +from abe_backend import AbeProcessor - -import irc -import abe_backend -from processor import Processor - - - - - +if use_libbitcoin: + from modules.python_bitcoin import LibBitcoinProcessor as BlockchainProcessor +else: + from abe_backend import AbeProcessor as BlockchainProcessor if __name__ == '__main__': if len(sys.argv)>1: import jsonrpclib - server = jsonrpclib.Server('http://%s:8081'%config.get('server','host')) + server = jsonrpclib.Server('http://%s:8081'%host) cmd = sys.argv[1] if cmd == 'stop': out = server.stop(password) @@ -82,27 +75,21 @@ if __name__ == '__main__': print out sys.exit(0) - processor = Processor() - shared = Shared() - # Bind shared to processor since constructor is user defined - processor.shared = shared - processor.start() + # Create hub + dispatcher = Dispatcher() + shared = dispatcher.shared - abe = abe_backend.AbeBackend(config, processor) - processor.register('blockchain', abe.process) + # Create and register processors + abe = BlockchainProcessor(config) + dispatcher.register('blockchain', abe) - sb = irc.ServerBackend(config, processor) - processor.register('server', sb.process) - - # dispatcher - dispatcher = Dispatcher(shared, processor) - dispatcher.start() + sb = ServerProcessor(config) + dispatcher.register('server', sb) # Create various transports we need - host = config.get('server','host') - transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000), - TcpServer(shared, processor, host, 50001), - HttpServer(shared, processor, host, 8081), + transports = [ NativeServer(shared, abe, sb, config.get('server','banner'), host, 50000), + TcpServer(dispatcher, host, 50001), + HttpServer(dispatcher, host, 8081), ] for server in transports: server.start() @@ -111,4 +98,3 @@ if __name__ == '__main__': while not shared.stopped(): time.sleep(1) print "server stopped" - diff --git a/transports/native.py b/transports/native.py index 1b6a6ec..729a428 100644 --- a/transports/native.py +++ b/transports/native.py @@ -1,4 +1,4 @@ -import thread, threading, time, socket, traceback, ast +import thread, threading, time, socket, traceback, ast, sys @@ -12,10 +12,11 @@ def timestr(): class NativeServer(threading.Thread): - def __init__(self, shared, store, irc, banner, host, port): + def __init__(self, shared, abe, irc, banner, host, port): threading.Thread.__init__(self) self.banner = banner - self.store = store + self.abe = abe + self.store = abe.store self.irc = irc self.sessions = {} self.host = host @@ -50,7 +51,7 @@ class NativeServer(threading.Thread): self.sessions[session_id]['last_time'] = time.time() ret, addresses = self.modified_addresses(session) if ret: self.sessions[session_id]['addresses'] = addresses - return repr( (self.store.block_number,ret)) + return repr( (self.abe.block_number,ret)) def add_address_to_session(self, session_id, address): @@ -115,7 +116,7 @@ class NativeServer(threading.Thread): def do_command(self, cmd, data, ipaddr): if cmd=='b': - out = "%d"%block_number + out = "%d"%self.abe.block_number elif cmd in ['session','new_session']: try: diff --git a/transports/stratum_http.py b/transports/stratum_http.py index 7a8ece2..a7ef49d 100644 --- a/transports/stratum_http.py +++ b/transports/stratum_http.py @@ -341,9 +341,9 @@ class HttpSession(Session): self.pending_responses.append(response) class HttpServer(threading.Thread): - def __init__(self, shared, _processor, host, port): - self.shared = shared - self.processor = _processor + def __init__(self, dispatcher, host, port): + self.shared = dispatcher.shared + self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host @@ -370,7 +370,7 @@ class HttpServer(threading.Thread): #print session, request session = self.server.sessions.get(session_id) if session: - self.processor.process(session, request) + self.dispatcher.process(session, request) def do_stop(self, session, request): self.shared.stop() diff --git a/transports/stratum_tcp.py b/transports/stratum_tcp.py index ce39aca..ab6c5c3 100644 --- a/transports/stratum_tcp.py +++ b/transports/stratum_tcp.py @@ -4,7 +4,7 @@ import threading import time import Queue as queue -from processor import Session, Dispatcher, Shared +from processor import Session, Dispatcher class TcpSession(Session): @@ -41,9 +41,9 @@ class TcpSession(Session): class TcpClientRequestor(threading.Thread): - def __init__(self, shared, processor, session): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, session): + self.shared = dispatcher.shared + self.dispatcher = dispatcher self.message = "" self.session = session threading.Thread.__init__(self) @@ -86,7 +86,7 @@ class TcpClientRequestor(threading.Thread): try: command = json.loads(raw_command) except: - self.processor.push_response({"error": "bad JSON", "request": raw_command}) + self.dispatcher.push_response({"error": "bad JSON", "request": raw_command}) return True try: @@ -96,17 +96,17 @@ class TcpClientRequestor(threading.Thread): method = command['method'] except KeyError: # Return an error JSON in response. - self.processor.push_response({"error": "syntax error", "request": raw_command}) + self.dispatcher.push_response({"error": "syntax error", "request": raw_command}) else: - self.processor.push_request(self.session,command) + self.dispatcher.push_request(self.session,command) return True class TcpServer(threading.Thread): - def __init__(self, shared, processor, host, port): - self.shared = shared - self.processor = processor + def __init__(self, dispatcher, host, port): + self.shared = dispatcher.shared + self.dispatcher = dispatcher.request_dispatcher threading.Thread.__init__(self) self.daemon = True self.host = host @@ -121,10 +121,10 @@ class TcpServer(threading.Thread): sock.listen(1) while not self.shared.stopped(): session = TcpSession(*sock.accept()) - client_req = TcpClientRequestor(self.shared, self.processor, session) + client_req = TcpClientRequestor(self.dispatcher, session) client_req.start() - self.processor.add_session(session) - self.processor.collect_garbage() + self.dispatcher.add_session(session) + self.dispatcher.collect_garbage() -- 1.7.1