join threads during server shutdown
authorThomasV <thomasv1@gmx.de>
Mon, 17 Feb 2014 16:57:24 +0000 (17:57 +0100)
committerThomasV <thomasv1@gmx.de>
Mon, 17 Feb 2014 16:57:24 +0000 (17:57 +0100)
backends/bitcoind/blockchain_processor.py
processor.py
server.py

index 73e1a64..0a133db 100644 (file)
@@ -87,7 +87,8 @@ class BlockchainProcessor(Processor):
         self.memorypool_update()
         print_log("Memory pool initialized.")
 
-        threading.Timer(10, self.main_iteration).start()
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
 
 
 
@@ -748,10 +749,17 @@ class BlockchainProcessor(Processor):
             # TODO: update cache here. if new value equals cached value, do not send notification
             self.address_queue.put((address,sessions))
 
+    
+    def close(self):
+        self.timer.join()
+        print_log("Closing database...")
+        self.storage.close()
+        print_log("Database is closed")
+
+
     def main_iteration(self):
         if self.shared.stopped():
-            print_log("blockchain processor terminating")
-            self.storage.close()
+            print_log("Stopping timer")
             return
 
         with self.dblock:
@@ -794,7 +802,7 @@ class BlockchainProcessor(Processor):
                         'params': [addr, status],
                         })
 
-        if not self.shared.stopped():
-            threading.Timer(10, self.main_iteration).start()
-        else:
-            print_log("blockchain processor terminating")
+        # next iteration 
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
+
index b94231b..cd7a66b 100644 (file)
@@ -44,15 +44,21 @@ class Processor(threading.Thread):
         #print "response", response
         self.dispatcher.request_dispatcher.push_response(session, response)
 
+    def close(self):
+        pass
+
     def run(self):
         while not self.shared.stopped():
-            request, session = self.queue.get(10000000000)
+            try:
+                request, session = self.queue.get(True, timeout=1)
+            except:
+                continue
             try:
                 self.process(request, session)
             except:
                 traceback.print_exc(file=sys.stdout)
 
-        print_log("processor terminating")
+        self.close()
 
 
 class Dispatcher:
index eb2b548..57a7b7b 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -196,4 +196,6 @@ if __name__ == '__main__':
         except:
             shared.stop()
 
+    server_proc.join()
+    chain_proc.join()
     print_log("Electrum Server stopped")