restructuring: each processor has its own queue
[electrum-server.git] / backends / abe / __init__.py
index eea0a03..24d309d 100644 (file)
@@ -7,7 +7,7 @@ import binascii
 import thread, traceback, sys, urllib, operator
 from json import dumps, loads
 from Queue import Queue
-import time
+import time, threading
 
 class AbeStore(Datastore_class):
 
@@ -409,8 +409,11 @@ class BlockchainProcessor(Processor):
         self.store = AbeStore(config)
         self.block_number = -1
         self.watched_addresses = []
+        threading.Timer(10, self.run_store_iteration).start()
 
     def process(self, request):
+        #print "abe process", request
+
         message_id = request['id']
         method = request['method']
         params = request.get('params',[])
@@ -442,27 +445,26 @@ class BlockchainProcessor(Processor):
             self.watched_addresses.append(addr)
 
 
-    def run(self):
+    def run_store_iteration(self):
+        if self.shared.stopped(): 
+            print "exit timer"
+            return
         
-        old_block_number = None
-        while not self.shared.stopped():
-            self.block_number = self.store.main_iteration()
-
-            if self.block_number != old_block_number:
-                old_block_number = self.block_number
-                print "block number:", self.block_number
-                self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
-
-            while True:
-                try:
-                    addr = self.store.address_queue.get(False)
-                except:
-                    break
-                if addr in self.watched_addresses:
-                    status = self.store.get_status( addr )
-                    self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-
-            time.sleep(10)
+        block_number = self.store.main_iteration()
+        if self.block_number != block_number:
+            self.block_number = block_number
+            print "block number:", self.block_number
+            self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
+
+        while True:
+            try:
+                addr = self.store.address_queue.get(False)
+            except:
+                break
+            if addr in self.watched_addresses:
+                status = self.store.get_status( addr )
+                self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
 
+        threading.Timer(10, self.run_store_iteration).start()