restructuring: each processor has its own queue
[electrum-server.git] / backends / abe / __init__.py
index 07a0bf2..24d309d 100644 (file)
@@ -1,4 +1,4 @@
-from Abe.abe import hash_to_address, decode_check_address
+from Abe.util import hash_to_address, decode_check_address
 from Abe.DataStore import DataStore as Datastore_class
 from Abe import DataStore, readconf, BCDataStream,  deserialize, util, base58
 
@@ -7,7 +7,7 @@ import binascii
 import thread, traceback, sys, urllib, operator
 from json import dumps, loads
 from Queue import Queue
-import time
+import time, threading
 
 class AbeStore(Datastore_class):
 
@@ -53,6 +53,10 @@ class AbeStore(Datastore_class):
         inrows = self.get_tx_inputs(txid, False)
         for row in inrows:
             _hash = self.binout(row[6])
+            if not _hash:
+                print "WARNING: missing tx_in for tx", txid
+                continue
+
             address = hash_to_address(chr(0), _hash)
             if self.tx_cache.has_key(address):
                 print "cache: invalidating", address
@@ -62,6 +66,10 @@ class AbeStore(Datastore_class):
         outrows = self.get_tx_outputs(txid, False)
         for row in outrows:
             _hash = self.binout(row[6])
+            if not _hash:
+                print "WARNING: missing tx_out for tx", txid
+                continue
+
             address = hash_to_address(chr(0), _hash)
             if self.tx_cache.has_key(address):
                 print "cache: invalidating", address
@@ -265,6 +273,9 @@ class AbeStore(Datastore_class):
             inrows = self.get_tx_inputs(tx_id)
             for row in inrows:
                 _hash = self.binout(row[6])
+                if not _hash:
+                    print "WARNING: missing tx_in for tx", tx_id, addr
+                    continue
                 address = hash_to_address(chr(0), _hash)
                 txinputs.append(address)
             txpoint['inputs'] = txinputs
@@ -272,6 +283,9 @@ class AbeStore(Datastore_class):
             outrows = self.get_tx_outputs(tx_id)
             for row in outrows:
                 _hash = self.binout(row[6])
+                if not _hash:
+                    print "WARNING: missing tx_out for tx", tx_id, addr
+                    continue
                 address = hash_to_address(chr(0), _hash)
                 txoutputs.append(address)
             txpoint['outputs'] = txoutputs
@@ -395,8 +409,11 @@ class BlockchainProcessor(Processor):
         self.store = AbeStore(config)
         self.block_number = -1
         self.watched_addresses = []
+        threading.Timer(10, self.run_store_iteration).start()
 
     def process(self, request):
+        #print "abe process", request
+
         message_id = request['id']
         method = request['method']
         params = request.get('params',[])
@@ -428,26 +445,26 @@ class BlockchainProcessor(Processor):
             self.watched_addresses.append(addr)
 
 
-    def run(self):
+    def run_store_iteration(self):
+        if self.shared.stopped(): 
+            print "exit timer"
+            return
         
-        old_block_number = None
-        while not self.shared.stopped():
-            self.block_number = self.store.main_iteration()
+        block_number = self.store.main_iteration()
+        if self.block_number != block_number:
+            self.block_number = block_number
+            print "block number:", self.block_number
+            self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
 
-            if self.block_number != old_block_number:
-                old_block_number = self.block_number
-                self.push_response({ 'method':'blockchain.numblocks.subscribe', 'params':[self.block_number] })
-
-            while True:
-                try:
-                    addr = self.store.address_queue.get(False)
-                except:
-                    break
-                if addr in self.watched_addresses:
-                    status = self.store.get_status( addr )
-                    self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
-
-            time.sleep(10)
+        while True:
+            try:
+                addr = self.store.address_queue.get(False)
+            except:
+                break
+            if addr in self.watched_addresses:
+                status = self.store.get_status( addr )
+                self.push_response({ 'method':'blockchain.address.subscribe', 'params':[addr, status] })
 
+        threading.Timer(10, self.run_store_iteration).start()