Simplify block parsing
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index 09644b9..3d93fd8 100644 (file)
@@ -9,7 +9,7 @@ import time
 import threading
 import traceback
 import urllib
-
+from ltc_scrypt import getPoWHash
 from backends.bitcoind import deserialize
 from processor import Processor, print_log
 from utils import *
@@ -38,6 +38,7 @@ class BlockchainProcessor(Processor):
         self.headers_data = ''
         self.headers_path = config.get('leveldb', 'path_fulltree')
 
+        self.mempool_values = {}
         self.mempool_addresses = {}
         self.mempool_hist = {}
         self.mempool_hashes = set([])
@@ -64,7 +65,7 @@ class BlockchainProcessor(Processor):
                 self.bitcoind('getinfo')
                 break
             except:
-                print_log('cannot contact bitcoind...')
+                print_log('cannot contact novacoind...')
                 time.sleep(5)
                 continue
 
@@ -87,7 +88,8 @@ class BlockchainProcessor(Processor):
         self.memorypool_update()
         print_log("Memory pool initialized.")
 
-        threading.Timer(10, self.main_iteration).start()
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
 
 
 
@@ -111,6 +113,7 @@ class BlockchainProcessor(Processor):
         try:
             respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
         except:
+            print_log("error calling novacoind")
             traceback.print_exc(file=sys.stdout)
             self.shared.stop()
 
@@ -132,8 +135,7 @@ class BlockchainProcessor(Processor):
         }
 
     def get_header(self, height):
-        block_hash = self.bitcoind('getblockhash', [height])
-        b = self.bitcoind('getblock', [block_hash])
+        b = self.bitcoind('getblockbynumber', [height])
         return self.block2header(b)
 
     def init_headers(self, db_height):
@@ -171,7 +173,7 @@ class BlockchainProcessor(Processor):
         self.flush_headers()
 
     def hash_header(self, header):
-        return rev_hex(Hash(header_to_string(header).decode('hex')).encode('hex'))
+        return rev_hex(getPoWHash(header_to_string(header).decode('hex')).encode('hex'))
 
     def read_header(self, block_height):
         if os.path.exists(self.headers_filename):
@@ -252,7 +254,8 @@ class BlockchainProcessor(Processor):
                 hist = self.storage.get_history(addr)
                 is_known = True
             except:
-                self.shared.stop()
+                print_log("error get_history")
+                traceback.print_exc(file=sys.stdout)
                 raise
             if hist:
                 is_known = True
@@ -262,7 +265,7 @@ class BlockchainProcessor(Processor):
 
         # add memory pool
         with self.mempool_lock:
-            for txid in self.mempool_hist.get(addr, []):
+            for txid, delta in self.mempool_hist.get(addr, []):
                 hist.append({'tx_hash':txid, 'height':0})
 
         # add something to distinguish between unused and empty addresses
@@ -274,6 +277,14 @@ class BlockchainProcessor(Processor):
         return hist
 
 
+    def get_unconfirmed_value(self, addr):
+        v = 0
+        with self.mempool_lock:
+            for txid, delta in self.mempool_hist.get(addr, []):
+                v += delta
+        return v
+
+
     def get_status(self, addr, cache_only=False):
         tx_points = self.get_history(addr, cache_only)
         if cache_only and tx_points == -1:
@@ -290,8 +301,7 @@ class BlockchainProcessor(Processor):
 
     def get_merkle(self, tx_hash, height):
 
-        block_hash = self.bitcoind('getblockhash', [height])
-        b = self.bitcoind('getblock', [block_hash])
+        b = self.bitcoind('getblockbynumber', [height])
         tx_list = b.get('tx')
         tx_pos = tx_list.index(tx_hash)
 
@@ -347,21 +357,21 @@ class BlockchainProcessor(Processor):
 
     def deserialize_block(self, block):
         txlist = block.get('tx')
+
         tx_hashes = []  # ordered txids
         txdict = {}     # deserialized tx
-        is_coinbase = True
-        for raw_tx in txlist:
+
+        for i, raw_tx in enumerate(txlist):
             tx_hash = hash_encode(Hash(raw_tx.decode('hex')))
             vds = deserialize.BCDataStream()
             vds.write(raw_tx.decode('hex'))
             try:
-                tx = deserialize.parse_Transaction(vds, is_coinbase)
+                tx = deserialize.parse_Transaction(vds, i == 0) # first transaction is always coinbase
             except:
                 print_log("ERROR: cannot parse", tx_hash)
                 continue
             tx_hashes.append(tx_hash)
             txdict[tx_hash] = tx
-            is_coinbase = False
         return tx_hashes, txdict
 
 
@@ -446,7 +456,7 @@ class BlockchainProcessor(Processor):
                 if session in l:
                     l.remove(session)
                 if session in l:
-                    print "error rc!!"
+                    print_log("error rc!!")
                     self.shared.stop()
                 if l == []:
                     self.watched_addresses.pop(addr)
@@ -482,18 +492,28 @@ class BlockchainProcessor(Processor):
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
+        elif method == 'blockchain.address.get_mempool':
+            try:
+                address = str(params[0])
+                result = self.get_unconfirmed_history(address, cache_only)
+            except BaseException, e:
+                error = str(e) + ': ' + address
+                print_log("error:", error)
+
         elif method == 'blockchain.address.get_balance':
             try:
                 address = str(params[0])
-                result = self.storage.get_balance(address)
+                confirmed = self.storage.get_balance(address)
+                unconfirmed = self.get_unconfirmed_value(address)
+                result = { 'confirmed':confirmed, 'unconfirmed':unconfirmed }
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
-        elif method == 'blockchain.address.get_path':
+        elif method == 'blockchain.address.get_proof':
             try:
                 address = str(params[0])
-                result = self.storage.get_address_path(address)
+                result = self.storage.get_proof(address)
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
@@ -506,12 +526,22 @@ class BlockchainProcessor(Processor):
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
+        elif method == 'blockchain.utxo.get_address':
+            try:
+                txid = str(params[0])
+                pos = int(params[1])
+                txi = (txid + int_to_hex(pos, 4)).decode('hex')
+                result = self.storage.get_address(txi)
+            except BaseException, e:
+                error = str(e)
+                print_log("error:", error, params)
+
         elif method == 'blockchain.block.get_header':
             if cache_only:
                 result = -1
             else:
                 try:
-                    height = params[0]
+                    height = int(params[0])
                     result = self.get_header(height)
                 except BaseException, e:
                     error = str(e) + ': %d' % height
@@ -522,7 +552,7 @@ class BlockchainProcessor(Processor):
                 result = -1
             else:
                 try:
-                    index = params[0]
+                    index = int(params[0])
                     result = self.get_chunk(index)
                 except BaseException, e:
                     error = str(e) + ': %d' % index
@@ -568,41 +598,9 @@ class BlockchainProcessor(Processor):
         elif result != '':
             self.push_response(session, {'id': message_id, 'result': result})
 
-
-    def getfullblock(self, block_hash):
-        block = self.bitcoind('getblock', [block_hash])
-
-        rawtxreq = []
-        i = 0
-        for txid in block['tx']:
-            rawtxreq.append({
-                "method": "getrawtransaction",
-                "params": [txid],
-                "id": i,
-            })
-            i += 1
-
-        postdata = dumps(rawtxreq)
-        try:
-            respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
-        except:
-            traceback.print_exc(file=sys.stdout)
-            self.shared.stop()
-
-        r = loads(respdata)
-        rawtxdata = []
-        for ir in r:
-            if ir['error'] is not None:
-                self.shared.stop()
-                print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
-                raise BaseException(ir['error'])
-            rawtxdata.append(ir['result'])
-        block['tx'] = rawtxdata
-        return block
-
     def catch_up(self, sync=True):
 
-        prh = None
+        prev_root_hash = None
         while not self.shared.stopped():
 
             self.mtime('')
@@ -617,8 +615,8 @@ class BlockchainProcessor(Processor):
 
             # not done..
             self.up_to_date = False
-            next_block_hash = self.bitcoind('getblockhash', [self.storage.height + 1])
-            next_block = self.getfullblock(next_block_hash)
+            next_block = self.bitcoind('getblockbynumber', [self.storage.height + 1, True])
+            next_block_hash = next_block.get('hash')
             self.mtime('daemon')
 
             # fixme: this is unsafe, if we revert when the undo info is not yet written
@@ -644,7 +642,7 @@ class BlockchainProcessor(Processor):
             else:
 
                 # revert current block
-                block = self.getfullblock(self.storage.last_hash)
+                block = self.bitcoind('getblock', [self.storage.last_hash, True])
                 print_log("blockchain reorg", self.storage.height, block.get('previousblockhash'), self.storage.last_hash)
                 self.import_block(block, self.storage.last_hash, self.storage.height, sync, revert=True)
                 self.pop_header()
@@ -661,8 +659,8 @@ class BlockchainProcessor(Processor):
                     prev_root_hash = None
 
 
-
         self.header = self.block2header(self.bitcoind('getblock', [self.storage.last_hash]))
+        self.header['utxo_root'] = self.storage.get_root_hash().encode('hex')
 
         if self.shared.stopped(): 
             print_log( "closing database" )
@@ -673,6 +671,8 @@ class BlockchainProcessor(Processor):
         mempool_hashes = set(self.bitcoind('getrawmempool'))
         touched_addresses = set([])
 
+        # get new transactions
+        new_tx = {}
         for tx_hash in mempool_hashes:
             if tx_hash in self.mempool_hashes:
                 continue
@@ -681,40 +681,74 @@ class BlockchainProcessor(Processor):
             if not tx:
                 continue
 
-            mpa = self.mempool_addresses.get(tx_hash, [])
-            for x in tx.get('inputs'):
-                # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
-                addr = x.get('address')
-                if addr and addr not in mpa:
-                    mpa.append(addr)
-                    touched_addresses.add(addr)
+            new_tx[tx_hash] = tx
+            self.mempool_hashes.add(tx_hash)
 
+        # remove older entries from mempool_hashes
+        self.mempool_hashes = mempool_hashes
+
+
+        # check all tx outputs
+        for tx_hash, tx in new_tx.items():
+            mpa = self.mempool_addresses.get(tx_hash, {})
+            out_values = []
             for x in tx.get('outputs'):
+                out_values.append( x['value'] )
+
                 addr = x.get('address')
-                if addr and addr not in mpa:
-                    mpa.append(addr)
-                    touched_addresses.add(addr)
+                if not addr:
+                    continue
+                v = mpa.get(addr,0)
+                v += x['value']
+                mpa[addr] = v
+                touched_addresses.add(addr)
+
+            self.mempool_addresses[tx_hash] = mpa
+            self.mempool_values[tx_hash] = out_values
+
+        # check all inputs
+        for tx_hash, tx in new_tx.items():
+            mpa = self.mempool_addresses.get(tx_hash, {})
+            for x in tx.get('inputs'):
+                # we assume that the input address can be parsed by deserialize(); this is true for Electrum transactions
+                addr = x.get('address')
+                if not addr:
+                    continue
+
+                v = self.mempool_values.get(x.get('prevout_hash'))
+                if v:
+                    value = v[ x.get('prevout_n')]
+                else:
+                    txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+                    try:
+                        value = self.storage.get_utxo_value(addr,txi)
+                    except:
+                        print_log("utxo not in database; postponing mempool update")
+                        return
+
+                v = mpa.get(addr,0)
+                v -= value
+                mpa[addr] = v
+                touched_addresses.add(addr)
 
             self.mempool_addresses[tx_hash] = mpa
-            self.mempool_hashes.add(tx_hash)
 
-        # remove older entries from mempool_hashes
-        self.mempool_hashes = mempool_hashes
 
         # remove deprecated entries from mempool_addresses
         for tx_hash, addresses in self.mempool_addresses.items():
             if tx_hash not in self.mempool_hashes:
                 self.mempool_addresses.pop(tx_hash)
+                self.mempool_values.pop(tx_hash)
                 for addr in addresses:
                     touched_addresses.add(addr)
 
         # rebuild mempool histories
         new_mempool_hist = {}
         for tx_hash, addresses in self.mempool_addresses.items():
-            for addr in addresses:
+            for addr, delta in addresses.items():
                 h = new_mempool_hist.get(addr, [])
                 if tx_hash not in h:
-                    h.append(tx_hash)
+                    h.append((tx_hash, delta))
                 new_mempool_hist[addr] = h
 
         with self.mempool_lock:
@@ -738,10 +772,17 @@ class BlockchainProcessor(Processor):
             # TODO: update cache here. if new value equals cached value, do not send notification
             self.address_queue.put((address,sessions))
 
+    
+    def close(self):
+        self.timer.join()
+        print_log("Closing database...")
+        self.storage.close()
+        print_log("Database is closed")
+
+
     def main_iteration(self):
         if self.shared.stopped():
-            print_log("blockchain processor terminating")
-            self.storage.close()
+            print_log("Stopping timer")
             return
 
         with self.dblock:
@@ -784,7 +825,7 @@ class BlockchainProcessor(Processor):
                         'params': [addr, status],
                         })
 
-        if not self.shared.stopped():
-            threading.Timer(10, self.main_iteration).start()
-        else:
-            print_log("blockchain processor terminating")
+        # next iteration 
+        self.timer = threading.Timer(10, self.main_iteration)
+        self.timer.start()
+