fix memory pool
authorThomasV <thomasv@gitorious>
Thu, 15 Nov 2012 07:17:22 +0000 (11:17 +0400)
committerThomasV <thomasv@gitorious>
Thu, 15 Nov 2012 07:17:22 +0000 (11:17 +0400)
backends/bitcoind/blockchain_processor.py

index 9218fba..78ef690 100644 (file)
@@ -62,8 +62,11 @@ class BlockchainProcessor(Processor):
         self.cache_lock = threading.Lock()
         self.headers_data = ''
 
+        self.mempool_addresses = {}
         self.mempool_hist = {}
-        self.known_mempool_hashes = []
+        self.mempool_hashes = []
+        self.mempool_lock = threading.Lock()
+
         self.address_queue = Queue()
         self.dbpath = config.get('leveldb', 'path')
 
@@ -264,8 +267,9 @@ class BlockchainProcessor(Processor):
         # check uniqueness too...
 
         # add memory pool
-        for txid in self.mempool_hist.get(addr,[]):
-            hist.append((txid, 0, 0))
+        with self.mempool_lock:
+            for txid in self.mempool_hist.get(addr,[]):
+                hist.append((txid, 0, 0))
 
         hist = map(lambda x: {'tx_hash':x[0], 'height':x[2]}, hist)
         # add something to distinguish between unused and empty addresses
@@ -453,8 +457,7 @@ class BlockchainProcessor(Processor):
                       "write:%.2f "%(t3-t2), 
                       "max:", max_len, max_addr)
 
-        # invalidate cache
-        for addr in self.batch_list.keys(): self.update_history_cache(addr)
+        for addr in self.batch_list.keys(): self.invalidate_cache(addr)
 
 
 
@@ -632,24 +635,54 @@ class BlockchainProcessor(Processor):
         mempool_hashes = self.bitcoind('getrawmempool')
 
         for tx_hash in mempool_hashes:
-            if tx_hash in self.known_mempool_hashes: continue
-            self.known_mempool_hashes.append(tx_hash)
+            if tx_hash in self.mempool_hashes: continue
 
             tx = self.get_transaction(tx_hash)
             if not tx: continue
 
-            for x in tx.get('inputs') + tx.get('outputs'):
+            for x in tx.get('inputs'):
+                txi = (x.get('prevout_hash') + int_to_hex(x.get('prevout_n'), 4)).decode('hex')
+                try:
+                    addr = self.db.Get(txi)    
+                except:
+                    continue
+                l = self.mempool_addresses.get(tx_hash, [])
+                if addr not in l: 
+                    l.append( addr )
+                    self.mempool_addresses[tx_hash] = l
+
+            for x in tx.get('outputs'):
                 addr = x.get('address')
-                hist = self.mempool_hist.get(addr, [])
-                if tx_hash not in hist: 
-                    hist.append( tx_hash )
-                    self.mempool_hist[addr] = hist
-                    self.update_history_cache(addr)
+                l = self.mempool_addresses.get(tx_hash, [])
+                if addr not in l: 
+                    l.append( addr )
+                    self.mempool_addresses[tx_hash] = l
+
+            self.mempool_hashes.append(tx_hash)
 
-        self.known_mempool_hashes = mempool_hashes
+        # remove older entries from mempool_hashes
+        self.mempool_hashes = mempool_hashes
 
+        # remove deprecated entries from mempool_addresses
+        for tx_hash, addresses in self.mempool_addresses.items():
+            if tx_hash not in self.mempool_hashes:
+                self.mempool_addresses.pop(tx_hash)
 
-    def update_history_cache(self, address):
+        # rebuild histories
+        with self.mempool_lock:
+            self.mempool_hist = {}
+            for tx_hash, addresses in self.mempool_addresses.items():
+                for addr in addresses:
+                    h = self.mempool_hist.get(addr, [])
+                    if tx_hash not in h: 
+                        h.append( tx_hash )
+                        self.mempool_hist[addr] = h
+                        self.invalidate_cache(addr)
+
+
+
+
+    def invalidate_cache(self, address):
         with self.cache_lock:
             if self.history_cache.has_key(address):
                 print_log( "cache: invalidating", address )
@@ -669,6 +702,9 @@ class BlockchainProcessor(Processor):
             t2 = time.time()
 
         self.memorypool_update()
+        t3 = time.time()
+        print "mempool:", len(self.mempool_addresses), len(self.mempool_hist), "%.3fs"%(t3 - t2)
+
 
         if self.sent_height != self.height:
             self.sent_height = self.height