From: ThomasV Date: Wed, 28 Mar 2012 20:34:24 +0000 (+0400) Subject: generic processor; register backends X-Git-Url: https://git.novaco.in/?p=electrum-server.git;a=commitdiff_plain;h=cb1ebe091b58cb616458c1afd1b97251d61eabed generic processor; register backends --- diff --git a/abe_backend.py b/abe_backend.py index ae43dfd..f66ddf7 100644 --- a/abe_backend.py +++ b/abe_backend.py @@ -381,7 +381,7 @@ class AbeStore(Datastore_class): if self.block_number != old_block_number: old_block_number = self.block_number - processor.push_response({ 'method':'numblocks.subscribe', 'result':self.block_number }) + processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number }) while True: try: @@ -390,6 +390,44 @@ class AbeStore(Datastore_class): break if addr in self.watched_addresses: status = self.get_status( addr ) - processor.push_response({ 'method':'address.subscribe', 'params':[addr], 'result':status }) + processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status }) time.sleep(10) + + + +class AbeBackend: + + def __init__(self,config, processor): + self.store = AbeStore(config) + self.store.processor = processor + thread.start_new_thread(self.store.run,(processor,)) + + def process(self, request, queue): + message_id = request['id'] + method = request['method'] + params = request.get('params',[]) + result = '' + if method == 'blockchain.numblocks.subscribe': + result = self.store.block_number + elif method == 'blockchain.address.subscribe': + address = params[0] + self.store.watch_address(address) + status = self.store.get_status(address) + result = status + elif method == 'blockchain.address.get_history': + address = params[0] + result = self.store.get_history( address ) + elif method == 'blockchain.transaction.broadcast': + txo = self.store.send_tx(params[0]) + print "sent tx:", txo + result = txo + else: + print "unknown method", request + + if result != '': + response = { 'id':message_id, 'method':method, 'params':params, 'result':result } + queue.put(response) + + + diff --git a/irc.py b/irc.py index 8143593..bae1f80 100644 --- a/irc.py +++ b/irc.py @@ -60,3 +60,31 @@ class Irc(threading.Thread): finally: sf.close() s.close() + + +class ServerBackend: + + def __init__(self, config, processor): + self.banner = config.get('server','banner') + self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname')) + self.irc.processor = processor + if (config.get('server','irc') == 'yes' ): + self.irc.start() + + def process(self, request, queue): + method = request['method'] + + result = '' + if method == 'server.banner': + result = self.banner.replace('\\n','\n') + elif method == 'server.peers.subscribe': + result = self.irc.get_peers() + else: + print "unknown method", request + + if result!='': + response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result } + queue.put(response) + + + diff --git a/modules/python_bitcoin/__init__.py b/modules/python_bitcoin/__init__.py index cd9a4db..361b7dc 100644 --- a/modules/python_bitcoin/__init__.py +++ b/modules/python_bitcoin/__init__.py @@ -1,5 +1,5 @@ import bitcoin -import stratum +from processor import Processor import threading import time @@ -102,13 +102,6 @@ class NumblocksSubscribe: self.backend.blockchain.subscribe_reorganize(self.reorganize) self.backend.blockchain.fetch_last_depth(self.set_last_depth) self.latest = GhostValue() - self.subscribed = [] - - def subscribe(self, session, request): - last = self.latest.get() - self.push_response(session,{"id": request["id"], "result": last}) - with self.lock: - self.subscribed.append((session, request)) def set_last_depth(self, ec, last_depth): if ec: @@ -119,74 +112,72 @@ class NumblocksSubscribe: def reorganize(self, ec, fork_point, arrivals, replaced): latest = fork_point + len(arrivals) self.latest.set(latest) - subscribed = self.spring_clean() - for session, request in subscribed: - self.push_response(session,{"id": request["id"], "result": latest}) + self.push_response({"method":"numblocks.subscribe", "result": latest}) self.backend.blockchain.subscribe_reorganize(self.reorganize) - def spring_clean(self): - with self.lock: - self.subscribed = [sub for sub in self.subscribed - if not sub[0].stopped()] - return self.subscribed[:] class AddressGetHistory: def __init__(self, backend): self.backend = backend - def get(self, session, request): + def get(self, request): address = str(request["params"]) composed.payment_history(self.backend.blockchain, address, - bitcoin.bind(self.respond, session, request, bitcoin._1)) + bitcoin.bind(self.respond, request, bitcoin._1)) - def respond(self, session, request, result): - self.push_response(session,{"id": request["id"], "result": result}) + def respond(self, request, result): + self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result}) -class LibbitcoinProcessor(stratum.Processor): +class LibbitcoinProcessor(Processor): def __init__(self): self.backend = Backend() self.numblocks_subscribe = NumblocksSubscribe(self.backend) self.address_get_history = AddressGetHistory(self.backend) - stratum.Processor.__init__(self) + Processor.__init__(self) def stop(self): self.backend.stop() - def process(self, session, request): + def process(self, request): print "New request (lib)", request if request["method"] == "numblocks.subscribe": self.numblocks_subscribe.subscribe(session, request) elif request["method"] == "address.get_history": - self.address_get_history.get(session, request) + self.address_get_history.get(request) elif request["method"] == "server.banner": - self.push_response(session, {"id": request["id"], + self.push_response({"id": request["id"], "method": request["method"], "params":request["params"], "result": "libbitcoin using python-bitcoin bindings"}) elif request["method"] == "transaction.broadcast": - self.broadcast_transaction(session, request) + self.broadcast_transaction(request) # Execute and when ready, you call - # self.push_response(session,response) + # self.push_response(response) - def broadcast_transaction(self, session, request): + def broadcast_transaction(self, request): raw_tx = bitcoin.data_chunk(str(request["params"])) exporter = bitcoin.satoshi_exporter() try: tx = exporter.load_transaction(raw_tx) except RuntimeError: - response = {"id": request["id"], "result": None, + response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": None, "error": {"message": "Exception while parsing the transaction data.", "code": -4}} else: self.backend.protocol.broadcast_transaction(tx) tx_hash = str(bitcoin.hash_transaction(tx)) - response = {"id": request["id"], "result": tx_hash} - self.push_response(session,response) + response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash} + self.push_response(response) + + -def run(stratum): +def run(processor): + #processor = LibbitcoinProcessor() print "Warning: pre-alpha prototype. Full of bugs." - processor = LibbitcoinProcessor() - stratum.start(processor) + while not processor.shared.stopped(): + if raw_input() == "quit": + shared.stop() + time.sleep(1) diff --git a/processor.py b/processor.py index 28df39e..1fd7909 100644 --- a/processor.py +++ b/processor.py @@ -2,6 +2,7 @@ import json import socket import threading import time +import traceback, sys import Queue as queue class Shared: @@ -32,6 +33,7 @@ class Processor(threading.Thread): self.internal_id = 1 self.lock = threading.Lock() self.sessions = [] + self.processors = {} def push_response(self, item): self.response_queue.put(item) @@ -56,6 +58,9 @@ class Processor(threading.Thread): self.internal_id += 1 return r + def register(self, prefix, function): + self.processors[prefix] = function + def run(self): if self.shared is None: raise TypeError("self.shared not set in Processor") @@ -65,12 +70,26 @@ class Processor(threading.Thread): method = request['method'] params = request.get('params',[]) - if method in [ 'numblocks.subscribe', 'address.subscribe', 'server.peers']: + suffix = method.split('.')[-1] + if suffix == 'subscribe': session.subscribe_to_service(method, params) # store session and id locally request['id'] = self.store_session_id(session, request['id']) - self.process(request) + + # dispatch request to the relevant module.. + prefix = method.split('.')[0] + try: + func = self.processors[prefix] + except: + print "error: no processor for", prefix + continue + + try: + func(request,self.response_queue) + except: + traceback.print_exc(file=sys.stdout) + continue self.stop() diff --git a/server.py b/server.py index 675ef5f..97a9773 100755 --- a/server.py +++ b/server.py @@ -56,47 +56,11 @@ from processor import Shared, Processor, Dispatcher from stratum_http import HttpServer from stratum import TcpServer from native import NativeServer -from irc import Irc -from abe_backend import AbeStore - -class AbeProcessor(Processor): - def process(self,request): - message_id = request['id'] - method = request['method'] - params = request.get('params',[]) - #print request - - result = '' - if method == 'numblocks.subscribe': - result = store.block_number - elif method == 'address.subscribe': - address = params[0] - store.watch_address(address) - status = store.get_status(address) - result = status - elif method == 'client.version': - #session.version = params[0] - pass - elif method == 'server.banner': - result = config.get('server','banner').replace('\\n','\n') - elif method == 'server.peers': - result = irc.get_peers() - elif method == 'address.get_history': - address = params[0] - result = store.get_history( address ) - elif method == 'transaction.broadcast': - txo = store.send_tx(params[0]) - print "sent tx:", txo - result = txo - else: - print "unknown method", request - if result!='': - response = { 'id':message_id, 'method':method, 'params':params, 'result':result } - self.push_response(response) - def get_status(self,addr): - return store.get_status(addr) +import irc +import abe_backend +from processor import Processor @@ -116,25 +80,25 @@ if __name__ == '__main__': print out sys.exit(0) - processor = AbeProcessor() + processor = Processor() shared = Shared() # Bind shared to processor since constructor is user defined processor.shared = shared processor.start() - irc = Irc(processor, config.get('server','host'), config.get('server','ircname')) - if (config.get('server','irc') == 'yes' ): irc.start() + abe = abe_backend.AbeBackend(config, processor) + processor.register('blockchain', abe.process) - # backend - store = AbeStore(config) + sb = irc.ServerBackend(config, processor) + processor.register('server', sb.process) # dispatcher dispatcher = Dispatcher(shared, processor) dispatcher.start() - host = config.get('server','host') # Create various transports we need - transports = [ NativeServer(shared, store, irc, config.get('server','banner'), host, 50000), + host = config.get('server','host') + transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000), TcpServer(shared, processor, host, 50001), HttpServer(shared, processor, host, 8081), ] @@ -142,6 +106,7 @@ if __name__ == '__main__': server.start() print "starting Electrum server on", host - store.run(processor) + while not shared.stopped(): + time.sleep(1) print "server stopped" diff --git a/stratum.py b/stratum.py index f364d93..ce39aca 100644 --- a/stratum.py +++ b/stratum.py @@ -12,6 +12,7 @@ class TcpSession(Session): self._connection = connection self.address = address Session.__init__(self) + print "New session", address def connection(self): if self.stopped(): @@ -128,24 +129,3 @@ class TcpServer(threading.Thread): -class Stratum: - - def start(self, processor): - shared = Shared() - # Bind shared to processor since constructor is user defined - processor.shared = shared - processor.start() - # Create various transports we need - transports = TcpServer(shared, processor, "176.31.24.241", 50001), - for server in transports: - server.start() - while not shared.stopped(): - if raw_input() == "quit": - shared.stop() - time.sleep(1) - -if __name__ == "__main__": - processor = Processor() - app = Stratum() - app.start(processor) -