create dispatcher class; redefine processors as threads
[electrum-server.git] / abe_backend.py
index f66ddf7..dc7056a 100644 (file)
@@ -31,12 +31,11 @@ class AbeStore(Datastore_class):
         self.address_queue = Queue()
 
         self.dblock = thread.allocate_lock()
-        self.block_number = -1
-        self.watched_addresses = []
 
 
 
     def import_block(self, b, chain_ids=frozenset()):
+        #print "import block"
         block_id = super(AbeStore, self).import_block(b, chain_ids)
         for pos in xrange(len(b['transactions'])):
             tx = b['transactions'][pos]
@@ -369,50 +368,27 @@ class AbeStore(Datastore_class):
 
         return block_number
 
-    def watch_address(self, addr):
-        if addr not in self.watched_addresses:
-            self.watched_addresses.append(addr)
 
-    def run(self, processor):
-        
-        old_block_number = None
-        while not processor.shared.stopped():
-            self.block_number = self.main_iteration()
+from processor import Processor
 
-            if self.block_number != old_block_number:
-                old_block_number = self.block_number
-                processor.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+class AbeProcessor(Processor):
 
-            while True:
-                try:
-                    addr = self.address_queue.get(False)
-                except:
-                    break
-                if addr in self.watched_addresses:
-                    status = self.get_status( addr )
-                    processor.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
-
-            time.sleep(10)
-
-
-
-class AbeBackend:
-
-    def __init__(self,config, processor):
+    def __init__(self, config):
+        Processor.__init__(self)
         self.store = AbeStore(config)
-        self.store.processor = processor
-        thread.start_new_thread(self.store.run,(processor,))
+        self.block_number = -1
+        self.watched_addresses = []
 
-    def process(self, request, queue):
+    def process(self, request):
         message_id = request['id']
         method = request['method']
         params = request.get('params',[])
         result = ''
         if method == 'blockchain.numblocks.subscribe':
-            result = self.store.block_number
+            result = self.block_number
         elif method == 'blockchain.address.subscribe':
             address = params[0]
-            self.store.watch_address(address)
+            self.watch_address(address)
             status = self.store.get_status(address)
             result = status
         elif method == 'blockchain.address.get_history':
@@ -427,7 +403,34 @@ class AbeBackend:
 
         if result != '':
             response = { 'id':message_id, 'method':method, 'params':params, 'result':result }
-            queue.put(response)
+            self.push_response(response)
+
+
+    def watch_address(self, addr):
+        if addr not in self.watched_addresses:
+            self.watched_addresses.append(addr)
+
+
+    def run(self):
+        
+        old_block_number = None
+        while not self.shared.stopped():
+            self.block_number = self.store.main_iteration()
+
+            if self.block_number != old_block_number:
+                old_block_number = self.block_number
+                self.push_response({ 'method':'blockchain.numblocks.subscribe', 'result':self.block_number })
+
+            while True:
+                try:
+                    addr = self.store.address_queue.get(False)
+                except:
+                    break
+                if addr in self.watched_addresses:
+                    status = self.get_status( addr )
+                    self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr], 'result':status })
+
+            time.sleep(10)