add more debug messages to log
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index 459e37f..94cfbfa 100644 (file)
@@ -87,7 +87,8 @@ class BlockchainProcessor(Processor):
         self.memorypool_update()
         print_log("Memory pool initialized.")
 
-        threading.Timer(10, self.main_iteration).start()
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
 
 
 
@@ -111,6 +112,7 @@ class BlockchainProcessor(Processor):
         try:
             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
         except:
+            print_log("error calling bitcoind")
             traceback.print_exc(file=sys.stdout)
             self.shared.stop()
 
@@ -238,6 +240,7 @@ class BlockchainProcessor(Processor):
             print_log("ERROR: cannot parse", txid)
             return None
 
+
     def get_history(self, addr, cache_only=False):
         with self.cache_lock:
             hist = self.history_cache.get(addr)
@@ -248,10 +251,10 @@ class BlockchainProcessor(Processor):
 
         with self.dblock:
             try:
-                h = self.storage.get_history(str((addr)))
-                hist = self.storage.deserialize(h)
+                hist = self.storage.get_history(addr)
                 is_known = True
             except:
+                print_log("error get_history")
                 self.shared.stop()
                 raise
             if hist:
@@ -260,19 +263,10 @@ class BlockchainProcessor(Processor):
                 hist = []
                 is_known = False
 
-        # sort history, because redeeming transactions are next to the corresponding txout
-        hist.sort(key=lambda tup: tup[2])
-
         # add memory pool
         with self.mempool_lock:
             for txid in self.mempool_hist.get(addr, []):
-                hist.append((txid, 0, 0))
-
-        # uniqueness
-        hist = set(map(lambda x: (x[0], x[2]), hist))
-
-        # convert to dict
-        hist = map(lambda x: {'tx_hash': x[0], 'height': x[1]}, hist)
+                hist.append({'tx_hash':txid, 'height':0})
 
         # add something to distinguish between unused and empty addresses
         if hist == [] and is_known:
@@ -282,6 +276,7 @@ class BlockchainProcessor(Processor):
             self.history_cache[addr] = hist
         return hist
 
+
     def get_status(self, addr, cache_only=False):
         tx_points = self.get_history(addr, cache_only)
         if cache_only and tx_points == -1:
@@ -454,7 +449,7 @@ class BlockchainProcessor(Processor):
                 if session in l:
                     l.remove(session)
                 if session in l:
-                    print "error rc!!"
+                    print_log("error rc!!")
                     self.shared.stop()
                 if l == []:
                     self.watched_addresses.pop(addr)
@@ -476,7 +471,7 @@ class BlockchainProcessor(Processor):
 
         elif method == 'blockchain.address.subscribe':
             try:
-                address = params[0]
+                address = str(params[0])
                 result = self.get_status(address, cache_only)
             except BaseException, e:
                 error = str(e) + ': ' + address
@@ -484,18 +479,52 @@ class BlockchainProcessor(Processor):
 
         elif method == 'blockchain.address.get_history':
             try:
-                address = params[0]
+                address = str(params[0])
                 result = self.get_history(address, cache_only)
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
+        elif method == 'blockchain.address.get_balance':
+            try:
+                address = str(params[0])
+                result = self.storage.get_balance(address)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log("error:", error)
+
+        elif method == 'blockchain.address.get_proof':
+            try:
+                address = str(params[0])
+                result = self.storage.get_proof(address)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log("error:", error)
+
+        elif method == 'blockchain.address.listunspent':
+            try:
+                address = str(params[0])
+                result = self.storage.listunspent(address)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log("error:", error)
+
+        elif method == 'blockchain.utxo.get_address':
+            try:
+                txid = str(params[0])
+                pos = int(params[1])
+                txi = (txid + int_to_hex(pos, 4)).decode('hex')
+                result = self.storage.get_address(txi)
+            except BaseException, e:
+                error = str(e)
+                print_log("error:", error, params)
+
         elif method == 'blockchain.block.get_header':
             if cache_only:
                 result = -1
             else:
                 try:
-                    height = params[0]
+                    height = int(params[0])
                     result = self.get_header(height)
                 except BaseException, e:
                     error = str(e) + ': %d' % height
@@ -506,7 +535,7 @@ class BlockchainProcessor(Processor):
                 result = -1
             else:
                 try:
-                    index = params[0]
+                    index = int(params[0])
                     result = self.get_chunk(index)
                 except BaseException, e:
                     error = str(e) + ': %d' % index
@@ -570,6 +599,7 @@ class BlockchainProcessor(Processor):
         try:
             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
         except:
+            print_log("bitcoind error (getfullblock)")
             traceback.print_exc(file=sys.stdout)
             self.shared.stop()
 
@@ -586,7 +616,7 @@ class BlockchainProcessor(Processor):
 
     def catch_up(self, sync=True):
 
-        prh = None
+        prev_root_hash = None
         while not self.shared.stopped():
 
             self.mtime('')
@@ -610,6 +640,8 @@ class BlockchainProcessor(Processor):
 
             if (next_block.get('previousblockhash') == self.storage.last_hash) and not revert:
 
+                prev_root_hash = self.storage.get_root_hash()
+
                 self.import_block(next_block, next_block_hash, self.storage.height+1, sync)
                 self.storage.height = self.storage.height + 1
                 self.write_header(self.block2header(next_block), sync)
@@ -623,12 +655,7 @@ class BlockchainProcessor(Processor):
                     self.mtimes['daemon'] = 0
                     self.mtimes['import'] = 0
 
-                if prh:
-                    assert prh == self.storage.get_root_hash().encode('hex')
-                    prh = None
-
             else:
-                prh = self.storage.get_root_hash().encode('hex')
 
                 # revert current block
                 block = self.getfullblock(self.storage.last_hash)
@@ -643,8 +670,13 @@ class BlockchainProcessor(Processor):
                 self.header = self.read_header(self.storage.height)
                 self.storage.last_hash = self.hash_header(self.header)
 
+                if prev_root_hash:
+                    assert prev_root_hash == self.storage.get_root_hash()
+                    prev_root_hash = None
+
 
         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
+        self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
 
         if self.shared.stopped(): 
             print_log( "closing database" )
@@ -720,10 +752,17 @@ class BlockchainProcessor(Processor):
             # TODO: update cache here. if new value equals cached value, do not send notification
             self.address_queue.put((address,sessions))
 
+    
+    def close(self):
+        self.timer.join()
+        print_log("Closing database...")
+        self.storage.close()
+        print_log("Database is closed")
+
+
     def main_iteration(self):
         if self.shared.stopped():
-            print_log("blockchain processor terminating")
-            self.storage.close()
+            print_log("Stopping timer")
             return
 
         with self.dblock:
@@ -766,7 +805,7 @@ class BlockchainProcessor(Processor):
                         'params': [addr, status],
                         })
 
-        if not self.shared.stopped():
-            threading.Timer(10, self.main_iteration).start()
-        else:
-            print_log("blockchain processor terminating")
+        # next iteration 
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
+