create dispatcher class; redefine processors as threads
authorThomasV <thomasv@gitorious>
Thu, 29 Mar 2012 10:02:07 +0000 (14:02 +0400)
committerThomasV <thomasv@gitorious>
Thu, 29 Mar 2012 10:02:07 +0000 (14:02 +0400)
abe_backend.py
irc.py
modules/__init__.pyc
modules/python_bitcoin/__init__.py
processor.py
server.py
transports/native.py
transports/stratum_http.py
transports/stratum_tcp.py

index f66ddf7..dc7056a 100644 (file)
@@ -31,12 +31,11 @@ class AbeStore(Datastore_class):
         self.address_queue = Queue()
 
         self.dblock = thread.allocate_lock()
-        self.block_number = -1
-        self.watched_addresses = []
 
 
 
     def import_block(self, b, chain_ids=frozenset()):
+        #print "import block"
         block_id = super(AbeStore, self).import_block(b, chain_ids)
         for pos in xrange(len(b['transactions'])):
             tx = b['transactions'][pos]
@@ -369,50 +368,27 @@ class AbeStore(Datastore_class):
 
         return block_number
 
-    def watch_address(self, addr):
-        if addr not in self.watched_addresses:
-            self.watched_addresses.append(addr)
 
-    def run(self, processor):
-        
-        old_block_number = None
-        while not processor.shared.stopped():
-            self.block_number = self.main_iteration()
+from processor import Processor
 
-            if self.block_number != old_block_number:
-                old_block_number = self.block_number
-                processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+class AbeProcessor(Processor):
 
-            while True:
-                try:
-                    addr = self.address_queue.get(False)
-                except:
-                    break
-                if addr in self.watched_addresses:
-                    status = self.get_status( addr )
-                    processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
-
-            time.sleep(10)
-
-
-
-class AbeBackend:
-
-    def __init__(self,config, processor):
+    def __init__(self, config):
+        Processor.__init__(self)
         self.store = AbeStore(config)
-        self.store.processor = processor
-        thread.start_new_thread(self.store.run,(processor,))
+        self.block_number = -1
+        self.watched_addresses = []
 
-    def process(self, request, queue):
+    def process(self, request):
         message_id = request['id']
         method = request['method']
         params = request.get('params',[])
         result = ''
         if method == 'blockchain.numblocks.subscribe':
-            result = self.store.block_number
+            result = self.block_number
         elif method == 'blockchain.address.subscribe':
             address = params[0]
-            self.store.watch_address(address)
+            self.watch_address(address)
             status = self.store.get_status(address)
             result = status
         elif method == 'blockchain.address.get_history':
@@ -427,7 +403,34 @@ class AbeBackend:
 
         if result != '':
             response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
-            queue.put(response)
+            self.push_response(response)
+
+
+    def watch_address(self, addr):
+        if addr not in self.watched_addresses:
+            self.watched_addresses.append(addr)
+
+
+    def run(self):
+        
+        old_block_number = None
+        while not self.shared.stopped():
+            self.block_number = self.store.main_iteration()
+
+            if self.block_number != old_block_number:
+                old_block_number = self.block_number
+                self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+
+            while True:
+                try:
+                    addr = self.store.address_queue.get(False)
+                except:
+                    break
+                if addr in self.watched_addresses:
+                    status = self.get_status( addr )
+                    self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
+
+            time.sleep(10)
 
 
 
diff --git a/irc.py b/irc.py
index bae1f80..908089a 100644 (file)
--- a/irc.py
+++ b/irc.py
@@ -6,23 +6,27 @@ def random_string(N):
     import random, string
     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
 
+from processor import Processor
 
-class Irc(threading.Thread):
+class ServerProcessor(Processor):
 
-    def __init__(self, processor, host, nick):
-        self.processor = processor
-        threading.Thread.__init__(self)
+    def __init__(self, config):
+        Processor.__init__(self)
         self.daemon = True
         self.peers = {}
-        self.host = host
-        self.nick = nick
+        self.banner = config.get('server','banner')
+        self.host = config.get('server','host')
+        self.nick = config.get('server','ircname')
+        self.irc = config.get('server','irc') == 'yes'
 
     def get_peers(self):
         return self.peers.values()
 
     def run(self):
+        if not self.irc: 
+            return
         NICK = 'E_'+random_string(10)
-        while not self.processor.shared.stopped():
+        while not self.shared.stopped():
             try:
                 s = socket.socket()
                 s.connect(('irc.freenode.net', 6667))
@@ -31,7 +35,7 @@ class Irc(threading.Thread):
                 s.send('JOIN #electrum\n')
                 sf = s.makefile('r', 0)
                 t = 0
-                while not self.processor.shared.stopped():
+                while not self.shared.stopped():
                     line = sf.readline()
                     line = line.rstrip('\r\n')
                     line = line.split()
@@ -51,7 +55,7 @@ class Irc(threading.Thread):
                         host = line[k+9]
                         self.peers[name] = (ip,host)
                     if time.time() - t > 5*60:
-                        self.processor.push_response({'method':'server.peers', 'result':[self.get_peers()]})
+                        self.push_response({'method':'server.peers', 'result':[self.get_peers()]})
                         s.send('NAMES #electrum\n')
                         t = time.time()
                         self.peers = {}
@@ -62,29 +66,21 @@ class Irc(threading.Thread):
                 s.close()
 
 
-class ServerBackend:
-
-    def __init__(self, config, processor):
-        self.banner = config.get('server','banner')
-        self.irc = Irc(processor, config.get('server','host'), config.get('server','ircname'))
-        self.irc.processor = processor
-        if (config.get('server','irc') == 'yes' ): 
-            self.irc.start()
 
-    def process(self, request, queue):
+    def process(self, request):
         method = request['method']
 
         result = ''
         if method == 'server.banner':
             result = self.banner.replace('\\n','\n')
         elif method == 'server.peers.subscribe':
-            result = self.irc.get_peers()
+            result = self.get_peers()
         else:
             print "unknown method", request
 
         if result!='':
             response = { 'id':request['id'], 'method':method, 'params':request['params'], 'result':result }
-            queue.put(response)
+            self.push_response(response)
 
 
 
index f6cbf88..67cc5f8 100644 (file)
Binary files a/modules/__init__.pyc and b/modules/__init__.pyc differ
index 361b7dc..6f91305 100644 (file)
@@ -96,8 +96,9 @@ class GhostValue:
 
 class NumblocksSubscribe:
 
-    def __init__(self, backend):
+    def __init__(self, backend, processor):
         self.backend = backend
+        self.processor = processor
         self.lock = threading.Lock()
         self.backend.blockchain.subscribe_reorganize(self.reorganize)
         self.backend.blockchain.fetch_last_depth(self.set_last_depth)
@@ -112,14 +113,15 @@ class NumblocksSubscribe:
     def reorganize(self, ec, fork_point, arrivals, replaced):
         latest = fork_point + len(arrivals)
         self.latest.set(latest)
-        self.push_response({"method":"numblocks.subscribe", "result": latest})
+        self.processor.push_response({"method":"numblocks.subscribe", "result": latest})
         self.backend.blockchain.subscribe_reorganize(self.reorganize)
 
 
 class AddressGetHistory:
 
-    def __init__(self, backend):
+    def __init__(self, backend, processor):
         self.backend = backend
+        self.processor = processor
 
     def get(self, request):
         address = str(request["params"])
@@ -127,15 +129,16 @@ class AddressGetHistory:
             bitcoin.bind(self.respond, request, bitcoin._1))
 
     def respond(self, request, result):
-        self.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
+        self.processor.push_response({"id": request["id"], "method":request["method"], "params":request["params"], "result": result})
+
 
 class LibbitcoinProcessor(Processor):
 
-    def __init__(self):
-        self.backend = Backend()
-        self.numblocks_subscribe = NumblocksSubscribe(self.backend)
-        self.address_get_history = AddressGetHistory(self.backend)
+    def __init__(self, config):
         Processor.__init__(self)
+        self.backend = Backend()
+        self.numblocks_subscribe = NumblocksSubscribe(self.backend, self)
+        self.address_get_history = AddressGetHistory(self.backend, self)
 
     def stop(self):
         self.backend.stop()
@@ -171,13 +174,9 @@ class LibbitcoinProcessor(Processor):
             response = {"id": request["id"], "method": request["method"], "params":request["params"], "result": tx_hash}
         self.push_response(response)
 
-
-
-def run(processor):
-    #processor = LibbitcoinProcessor()
-    print "Warning: pre-alpha prototype. Full of bugs."
-    while not processor.shared.stopped():
-        if raw_input() == "quit":
-            shared.stop()
-        time.sleep(1)
+    def run(self):
+        # this class is a thread. it does nothing in this example.
+        print "Warning: pre-alpha prototype. Full of bugs."
+        while not self.shared.stopped():
+            time.sleep(1)
 
index a18594d..c6da150 100644 (file)
@@ -24,7 +24,39 @@ class Shared:
 class Processor(threading.Thread):
 
     def __init__(self):
-        self.shared = None
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.dispatcher = None
+
+    def process(self, request):
+        pass
+
+    def push_response(self, response):
+        self.dispatcher.request_dispatcher.push_response(response)
+
+
+
+class Dispatcher:
+
+    def __init__(self):
+        self.shared = Shared()
+        self.request_dispatcher = RequestDispatcher(self.shared)
+        self.request_dispatcher.start()
+        self.response_dispatcher = ResponseDispatcher(self.shared, self.request_dispatcher)
+        self.response_dispatcher.start()
+
+    def register(self, prefix, processor):
+        processor.dispatcher = self
+        processor.shared = self.shared
+        processor.start()
+        self.request_dispatcher.processors[prefix] = processor
+
+
+
+class RequestDispatcher(threading.Thread):
+
+    def __init__(self, shared):
+        self.shared = shared
         threading.Thread.__init__(self)
         self.daemon = True
         self.request_queue = queue.Queue()
@@ -58,9 +90,6 @@ class Processor(threading.Thread):
             self.internal_id += 1
             return r
 
-    def register(self, prefix, function):
-        self.processors[prefix] = function
-
     def run(self):
         if self.shared is None:
             raise TypeError("self.shared not set in Processor")
@@ -87,12 +116,12 @@ class Processor(threading.Thread):
         # dispatch request to the relevant module..
         prefix = request['method'].split('.')[0]
         try:
-            func = self.processors[prefix]
+            p = self.processors[prefix]
         except:
             print "error: no processor for", prefix
             return
         try:
-            func(request,self.response_queue)
+            p.process(request)
         except:
             traceback.print_exc(file=sys.stdout)
 
@@ -130,7 +159,7 @@ class Session:
             self.subscriptions.append((method, params))
     
 
-class Dispatcher(threading.Thread):
+class ResponseDispatcher(threading.Thread):
 
     def __init__(self, shared, processor):
         self.shared = shared
index 9ee8dc8..2005d8b 100755 (executable)
--- a/server.py
+++ b/server.py
 # License along with this program.  If not, see
 # <http://www.gnu.org/licenses/agpl.html>.
 
-import time, json, socket, operator, thread, ast, sys, re, traceback
+import time, sys, traceback
 import ConfigParser
-from json import dumps, loads
-import urllib
-import threading
 
 config = ConfigParser.ConfigParser()
 # set some defaults, which will be overwritten by the config file
@@ -48,32 +45,28 @@ try:
 except:
     pass
 
-
 password = config.get('server','password')
+host = config.get('server','host')
+use_libbitcoin = False
 
 
-from processor import Shared, Processor, Dispatcher
-
-
+from processor import Dispatcher
 from transports.stratum_http import HttpServer
 from transports.stratum_tcp import TcpServer
 from transports.native import NativeServer
+from irc import ServerProcessor
+from abe_backend import AbeProcessor
 
-
-import irc 
-import abe_backend 
-from processor import Processor
-
-
-
-
-
+if use_libbitcoin:
+    from modules.python_bitcoin import LibBitcoinProcessor as BlockchainProcessor
+else:
+    from abe_backend import AbeProcessor as BlockchainProcessor
 
 if __name__ == '__main__':
 
     if len(sys.argv)>1:
         import jsonrpclib
-        server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
+        server = jsonrpclib.Server('http://%s:8081'%host)
         cmd = sys.argv[1]
         if cmd == 'stop':
             out = server.stop(password)
@@ -82,27 +75,21 @@ if __name__ == '__main__':
         print out
         sys.exit(0)
 
-    processor = Processor()
-    shared = Shared()
-    # Bind shared to processor since constructor is user defined
-    processor.shared = shared
-    processor.start()
+    # Create hub
+    dispatcher = Dispatcher()
+    shared = dispatcher.shared
 
-    abe = abe_backend.AbeBackend(config, processor)
-    processor.register('blockchain', abe.process)
+    # Create and register processors
+    abe = BlockchainProcessor(config)
+    dispatcher.register('blockchain', abe)
 
-    sb = irc.ServerBackend(config, processor)
-    processor.register('server', sb.process)
-
-    # dispatcher
-    dispatcher = Dispatcher(shared, processor)
-    dispatcher.start()
+    sb = ServerProcessor(config)
+    dispatcher.register('server', sb)
 
     # Create various transports we need
-    host = config.get('server','host')
-    transports = [ NativeServer(shared, abe.store, sb.irc, config.get('server','banner'), host, 50000),
-                   TcpServer(shared, processor, host, 50001),
-                   HttpServer(shared, processor, host, 8081),
+    transports = [ NativeServer(shared, abe, sb, config.get('server','banner'), host, 50000),
+                   TcpServer(dispatcher, host, 50001),
+                   HttpServer(dispatcher, host, 8081),
                    ]
     for server in transports:
         server.start()
@@ -111,4 +98,3 @@ if __name__ == '__main__':
     while not shared.stopped():
         time.sleep(1)
     print "server stopped"
-
index 1b6a6ec..729a428 100644 (file)
@@ -1,4 +1,4 @@
-import thread, threading, time, socket, traceback, ast
+import thread, threading, time, socket, traceback, ast, sys
 
 
 
@@ -12,10 +12,11 @@ def timestr():
 
 class NativeServer(threading.Thread):
 
-    def __init__(self, shared, store, irc, banner, host, port):
+    def __init__(self, shared, abe, irc, banner, host, port):
         threading.Thread.__init__(self)
         self.banner = banner
-        self.store = store
+        self.abe = abe
+        self.store = abe.store
         self.irc = irc
         self.sessions = {}
         self.host = host
@@ -50,7 +51,7 @@ class NativeServer(threading.Thread):
             self.sessions[session_id]['last_time'] = time.time()
             ret, addresses = self.modified_addresses(session)
             if ret: self.sessions[session_id]['addresses'] = addresses
-            return repr( (self.store.block_number,ret))
+            return repr( (self.abe.block_number,ret))
 
 
     def add_address_to_session(self, session_id, address):
@@ -115,7 +116,7 @@ class NativeServer(threading.Thread):
     def do_command(self, cmd, data, ipaddr):
 
         if cmd=='b':
-            out = "%d"%block_number
+            out = "%d"%self.abe.block_number
 
         elif cmd in ['session','new_session']:
             try:
index 7a8ece2..a7ef49d 100644 (file)
@@ -341,9 +341,9 @@ class HttpSession(Session):
         self.pending_responses.append(response)
 
 class HttpServer(threading.Thread):
-    def __init__(self, shared, _processor, host, port):
-        self.shared = shared
-        self.processor = _processor
+    def __init__(self, dispatcher, host, port):
+        self.shared = dispatcher.shared
+        self.dispatcher = dispatcher.request_dispatcher
         threading.Thread.__init__(self)
         self.daemon = True
         self.host = host
@@ -370,7 +370,7 @@ class HttpServer(threading.Thread):
         #print session, request
         session = self.server.sessions.get(session_id)
         if session:
-            self.processor.process(session, request)
+            self.dispatcher.process(session, request)
 
     def do_stop(self, session, request):
         self.shared.stop()
index ce39aca..ab6c5c3 100644 (file)
@@ -4,7 +4,7 @@ import threading
 import time
 import Queue as queue
 
-from processor import Session, Dispatcher, Shared
+from processor import Session, Dispatcher
 
 class TcpSession(Session):
 
@@ -41,9 +41,9 @@ class TcpSession(Session):
 
 class TcpClientRequestor(threading.Thread):
 
-    def __init__(self, shared, processor, session):
-        self.shared = shared
-        self.processor = processor
+    def __init__(self, dispatcher, session):
+        self.shared = dispatcher.shared
+        self.dispatcher = dispatcher
         self.message = ""
         self.session = session
         threading.Thread.__init__(self)
@@ -86,7 +86,7 @@ class TcpClientRequestor(threading.Thread):
         try:
             command = json.loads(raw_command)
         except:
-            self.processor.push_response({"error": "bad JSON", "request": raw_command})
+            self.dispatcher.push_response({"error": "bad JSON", "request": raw_command})
             return True
 
         try:
@@ -96,17 +96,17 @@ class TcpClientRequestor(threading.Thread):
             method = command['method']
         except KeyError:
             # Return an error JSON in response.
-            self.processor.push_response({"error": "syntax error", "request": raw_command})
+            self.dispatcher.push_response({"error": "syntax error", "request": raw_command})
         else:
-            self.processor.push_request(self.session,command)
+            self.dispatcher.push_request(self.session,command)
 
         return True
 
 class TcpServer(threading.Thread):
 
-    def __init__(self, shared, processor, host, port):
-        self.shared = shared
-        self.processor = processor
+    def __init__(self, dispatcher, host, port):
+        self.shared = dispatcher.shared
+        self.dispatcher = dispatcher.request_dispatcher
         threading.Thread.__init__(self)
         self.daemon = True
         self.host = host
@@ -121,10 +121,10 @@ class TcpServer(threading.Thread):
         sock.listen(1)
         while not self.shared.stopped():
             session = TcpSession(*sock.accept())
-            client_req = TcpClientRequestor(self.shared, self.processor, session)
+            client_req = TcpClientRequestor(self.dispatcher, session)
             client_req.start()
-            self.processor.add_session(session)
-            self.processor.collect_garbage()
+            self.dispatcher.add_session(session)
+            self.dispatcher.collect_garbage()