add memory pool results to getaddressbalance
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index 28e87c8..a1f3822 100644 (file)
@@ -38,6 +38,7 @@ class BlockchainProcessor(Processor):
         self.headers_data = ''
         self.headers_path = config.get('leveldb', 'path_fulltree')
 
+        self.mempool_values = {}
         self.mempool_addresses = {}
         self.mempool_hist = {}
         self.mempool_hashes = set([])
@@ -87,7 +88,8 @@ class BlockchainProcessor(Processor):
         self.memorypool_update()
         print_log("Memory pool initialized.")
 
-        threading.Timer(10, self.main_iteration).start()
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
 
 
 
@@ -111,6 +113,7 @@ class BlockchainProcessor(Processor):
         try:
             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
         except:
+            print_log("error calling bitcoind")
             traceback.print_exc(file=sys.stdout)
             self.shared.stop()
 
@@ -252,6 +255,7 @@ class BlockchainProcessor(Processor):
                 hist = self.storage.get_history(addr)
                 is_known = True
             except:
+                print_log("error get_history")
                 self.shared.stop()
                 raise
             if hist:
@@ -262,7 +266,7 @@ class BlockchainProcessor(Processor):
 
         # add memory pool
         with self.mempool_lock:
-            for txid in self.mempool_hist.get(addr, []):
+            for txid, delta in self.mempool_hist.get(addr, []):
                 hist.append({'tx_hash':txid, 'height':0})
 
         # add something to distinguish between unused and empty addresses
@@ -274,6 +278,14 @@ class BlockchainProcessor(Processor):
         return hist
 
 
+    def get_unconfirmed_value(self, addr):
+        v = 0
+        with self.mempool_lock:
+            for txid, delta in self.mempool_hist.get(addr, []):
+                v += delta
+        return v
+
+
     def get_status(self, addr, cache_only=False):
         tx_points = self.get_history(addr, cache_only)
         if cache_only and tx_points == -1:
@@ -446,7 +458,7 @@ class BlockchainProcessor(Processor):
                 if session in l:
                     l.remove(session)
                 if session in l:
-                    print "error rc!!"
+                    print_log("error rc!!")
                     self.shared.stop()
                 if l == []:
                     self.watched_addresses.pop(addr)
@@ -482,10 +494,20 @@ class BlockchainProcessor(Processor):
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
+        elif method == 'blockchain.address.get_mempool':
+            try:
+                address = str(params[0])
+                result = self.get_unconfirmed_history(address, cache_only)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log("error:", error)
+
         elif method == 'blockchain.address.get_balance':
             try:
                 address = str(params[0])
-                result = self.storage.get_balance(address)
+                confirmed = self.storage.get_balance(address)
+                unconfirmed = self.get_unconfirmed_value(address)
+                result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
@@ -514,7 +536,7 @@ class BlockchainProcessor(Processor):
                 result = self.storage.get_address(txi)
             except BaseException, e:
                 error = str(e)
-                print_log("error:", error, txid, pos)
+                print_log("error:", error, params)
 
         elif method == 'blockchain.block.get_header':
             if cache_only:
@@ -596,6 +618,7 @@ class BlockchainProcessor(Processor):
         try:
             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
         except:
+            print_log("bitcoind error (getfullblock)")
             traceback.print_exc(file=sys.stdout)
             self.shared.stop()
 
@@ -612,7 +635,7 @@ class BlockchainProcessor(Processor):
 
     def catch_up(self, sync=True):
 
-        prh = None
+        prev_root_hash = None
         while not self.shared.stopped():
 
             self.mtime('')
@@ -683,6 +706,8 @@ class BlockchainProcessor(Processor):
         mempool_hashes = set(self.bitcoind('getrawmempool'))
         touched_addresses = set([])
 
+        # get new transactions
+        new_tx = {}
         for tx_hash in mempool_hashes:
             if tx_hash in self.mempool_hashes:
                 continue
@@ -691,40 +716,70 @@ class BlockchainProcessor(Processor):
             if not tx:
                 continue
 
-            mpa = self.mempool_addresses.get(tx_hash, [])
+            new_tx[tx_hash] = tx
+            self.mempool_hashes.add(tx_hash)
+
+        # remove older entries from mempool_hashes
+        self.mempool_hashes = mempool_hashes
+
+
+        # check all tx outputs
+        for tx_hash, tx in new_tx.items():
+            mpa = self.mempool_addresses.get(tx_hash, {})
+            out_values = []
+            for x in tx.get('outputs'):
+                out_values.append( x['value'] )
+
+                addr = x.get('address')
+                if not addr:
+                    continue
+                v = mpa.get(addr,0)
+                v += x['value']
+                mpa[addr] = v
+                touched_addresses.add(addr)
+
+            self.mempool_addresses[tx_hash] = mpa
+            self.mempool_values[tx_hash] = out_values
+
+        # check all inputs
+        for tx_hash, tx in new_tx.items():
+            mpa = self.mempool_addresses.get(tx_hash, {})
             for x in tx.get('inputs'):
                 # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
                 addr = x.get('address')
-                if addr and addr not in mpa:
-                    mpa.append(addr)
-                    touched_addresses.add(addr)
+                if not addr:
+                    continue
 
-            for x in tx.get('outputs'):
-                addr = x.get('address')
-                if addr and addr not in mpa:
-                    mpa.append(addr)
-                    touched_addresses.add(addr)
+                v = self.mempool_values.get(x.get('prevout_hash'))
+                if v:
+                    value = v[ x.get('prevout_n')]
+                else:
+                    txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+                    value = self.storage.get_utxo_value(addr,txi)
+
+                v = mpa.get(addr,0)
+                v -= value
+                mpa[addr] = v
+                touched_addresses.add(addr)
 
             self.mempool_addresses[tx_hash] = mpa
-            self.mempool_hashes.add(tx_hash)
 
-        # remove older entries from mempool_hashes
-        self.mempool_hashes = mempool_hashes
 
         # remove deprecated entries from mempool_addresses
         for tx_hash, addresses in self.mempool_addresses.items():
             if tx_hash not in self.mempool_hashes:
                 self.mempool_addresses.pop(tx_hash)
+                self.mempool_values.pop(tx_hash)
                 for addr in addresses:
                     touched_addresses.add(addr)
 
         # rebuild mempool histories
         new_mempool_hist = {}
         for tx_hash, addresses in self.mempool_addresses.items():
-            for addr in addresses:
+            for addr, delta in addresses.items():
                 h = new_mempool_hist.get(addr, [])
                 if tx_hash not in h:
-                    h.append(tx_hash)
+                    h.append((tx_hash, delta))
                 new_mempool_hist[addr] = h
 
         with self.mempool_lock:
@@ -748,10 +803,17 @@ class BlockchainProcessor(Processor):
             # TODO: update cache here. if new value equals cached value, do not send notification
             self.address_queue.put((address,sessions))
 
+    
+    def close(self):
+        self.timer.join()
+        print_log("Closing database...")
+        self.storage.close()
+        print_log("Database is closed")
+
+
     def main_iteration(self):
         if self.shared.stopped():
-            print_log("blockchain processor terminating")
-            self.storage.close()
+            print_log("Stopping timer")
             return
 
         with self.dblock:
@@ -794,7 +856,7 @@ class BlockchainProcessor(Processor):
                         'params': [addr, status],
                         })
 
-        if not self.shared.stopped():
-            threading.Timer(10, self.main_iteration).start()
-        else:
-            print_log("blockchain processor terminating")
+        # next iteration 
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
+