restructuring: each processor has its own queue
authorThomasV <thomasv@gitorious>
Wed, 13 Jun 2012 14:24:15 +0000 (18:24 +0400)
committerThomasV <thomasv@gitorious>
Wed, 13 Jun 2012 14:24:15 +0000 (18:24 +0400)
backends/abe/__init__.py
backends/irc/__init__.py
processor.py

index eea0a03..24d309d 100644 (file)
@@ -7,7 +7,7 @@ import binascii
 import thread, traceback, sys, urllib, operator
 from json import dumps, loads
 from Queue import Queue
-import time
+import time, threading
 
 class AbeStore(Datastore_class):
 
@@ -409,8 +409,11 @@ class BlockchainProcessor(Processor):
         self.store = AbeStore(config)
         self.block_number = -1
         self.watched_addresses = []
+        threading.Timer(10, self.run_store_iteration).start()
 
     def process(self, request):
+        #print "abe process", request
+
         message_id = request['id']
         method = request['method']
         params = request.get('params',[])
@@ -442,27 +445,26 @@ class BlockchainProcessor(Processor):
             self.watched_addresses.append(addr)
 
 
-    def run(self):
+    def run_store_iteration(self):
+        if self.shared.stopped(): 
+            print "exit timer"
+            return
         
-        old_block_number = None
-        while not self.shared.stopped():
-            self.block_number = self.store.main_iteration()
-
-            if self.block_number != old_block_number:
-                old_block_number = self.block_number
-                print "block number:", self.block_number
-                self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
-
-            while True:
-                try:
-                    addr = self.store.address_queue.get(False)
-                except:
-                    break
-                if addr in self.watched_addresses:
-                    status = self.store.get_status( addr )
-                    self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-
-            time.sleep(10)
+        block_number = self.store.main_iteration()
+        if self.block_number != block_number:
+            self.block_number = block_number
+            print "block number:", self.block_number
+            self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
+
+        while True:
+            try:
+                addr = self.store.address_queue.get(False)
+            except:
+                break
+            if addr in self.watched_addresses:
+                status = self.store.get_status( addr )
+                self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
 
+        threading.Timer(10, self.run_store_iteration).start()
 
 
index 200d19f..87b6fa0 100644 (file)
@@ -6,24 +6,20 @@ def random_string(N):
 
 from processor import Processor
 
-class ServerProcessor(Processor):
 
-    def __init__(self, config):
-        Processor.__init__(self)
-        self.daemon = True
-        self.peers = {}
-        self.banner = config.get('server','banner')
-        self.host = config.get('server','host')
-        self.password = config.get('server','password')
+class IrcThread(threading.Thread):
 
+    def __init__(self, processor, config):
+        threading.Thread.__init__(self)
+        self.processor = processor
+        self.daemon = True
         self.stratum_tcp_port = config.get('server','stratum_tcp_port')
         self.stratum_http_port = config.get('server','stratum_http_port')
-
-        self.irc = config.get('server', 'irc') == 'yes'
-        self.nick = config.get('server', 'irc_nick') 
+        self.peers = {}
+        self.host = config.get('server','host')
+        self.nick = config.get('server', 'irc_nick')
         if not self.nick: self.nick = random_string(10)
 
-
     def get_peers(self):
         return self.peers.values()
 
@@ -38,12 +34,9 @@ class ServerProcessor(Processor):
 
 
     def run(self):
-        if not self.irc: 
-            return
-
         ircname = self.getname()
 
-        while not self.shared.stopped():
+        while not self.processor.shared.stopped():
             try:
                 s = socket.socket()
                 s.connect(('irc.freenode.net', 6667))
@@ -52,7 +45,7 @@ class ServerProcessor(Processor):
                 s.send('JOIN #electrum\n')
                 sf = s.makefile('r', 0)
                 t = 0
-                while not self.shared.stopped():
+                while not self.processor.shared.stopped():
                     line = sf.readline()
                     line = line.rstrip('\r\n')
                     line = line.split()
@@ -73,7 +66,7 @@ class ServerProcessor(Processor):
                         ports  = line[k+10:]
                         self.peers[name] = (ip, host, ports)
                     if time.time() - t > 5*60:
-                        self.push_response({'method':'server.peers', 'params':[self.get_peers()]})
+                        self.processor.push_response({'method':'server.peers', 'params':[self.get_peers()]})
                         s.send('NAMES #electrum\n')
                         t = time.time()
                         self.peers = {}
@@ -83,7 +76,35 @@ class ServerProcessor(Processor):
                 sf.close()
                 s.close()
 
+        print "quitting IRC"
+
+
 
+class ServerProcessor(Processor):
+
+    def __init__(self, config):
+        Processor.__init__(self)
+        self.daemon = True
+        self.banner = config.get('server','banner')
+        self.password = config.get('server','password')
+
+        if config.get('server', 'irc') == 'yes':
+            self.irc = IrcThread(self, config)
+        else: 
+            self.irc = None
+
+
+    def get_peers(self):
+        if self.irc:
+            return self.irc.get_peers()
+        else:
+            return []
+
+
+    def run(self):
+        if self.irc:
+            self.irc.start()
+        Processor.run(self)
 
     def process(self, request):
         method = request['method']
index 4cbaa85..96fb3c2 100644 (file)
@@ -35,6 +35,7 @@ class Processor(threading.Thread):
         threading.Thread.__init__(self)
         self.daemon = True
         self.dispatcher = None
+        self.queue = queue.Queue()
 
     def process(self, request):
         pass
@@ -43,6 +44,16 @@ class Processor(threading.Thread):
         #print "response", response
         self.dispatcher.request_dispatcher.push_response(response)
 
+    def run(self):
+        while not self.shared.stopped():
+            request = self.queue.get(10000000000)
+            try:
+                self.process(request)
+            except:
+                traceback.print_exc(file=sys.stdout)
+
+        print "processor terminating"
+            
 
 
 class Dispatcher:
@@ -110,17 +121,18 @@ class RequestDispatcher(threading.Thread):
             raise TypeError("self.shared not set in Processor")
         while not self.shared.stopped():
             session, request = self.pop_request()
-            self.process(session, request)
+            self.do_dispatch(session, request)
 
         self.stop()
 
     def stop(self):
         pass
 
-    def process(self, session, request):
+    def do_dispatch(self, session, request):
+        """ dispatch request to the relevant processor """
+
         method = request['method']
         params = request.get('params',[])
-
         suffix = method.split('.')[-1]
         if suffix == 'subscribe':
             session.subscribe_to_service(method, params)
@@ -128,17 +140,14 @@ class RequestDispatcher(threading.Thread):
         # store session and id locally
         request['id'] = self.store_session_id(session, request['id'])
 
-        # dispatch request to the relevant module..
         prefix = request['method'].split('.')[0]
         try:
             p = self.processors[prefix]
         except:
             print "error: no processor for", prefix
             return
-        try:
-            p.process(request)
-        except:
-            traceback.print_exc(file=sys.stdout)
+
+        p.queue.put(request)
 
         if method in ['server.version']:
             session.version = params[0]