fix memleak with subscriptions
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index fb7b957..f90e1bc 100644 (file)
@@ -37,7 +37,7 @@ class BlockchainProcessor(Processor):
 
         self.mempool_addresses = {}
         self.mempool_hist = {}
-        self.mempool_hashes = []
+        self.mempool_hashes = set([])
         self.mempool_lock = threading.Lock()
 
         self.address_queue = Queue()
@@ -675,6 +675,45 @@ class BlockchainProcessor(Processor):
             self.queue.put((session, request))
 
 
+    def do_subscribe(self, method, params, session):
+        with self.watch_lock:
+            if method == 'blockchain.numblocks.subscribe':
+                if session not in self.watch_blocks:
+                    self.watch_blocks.append(session)
+
+            elif method == 'blockchain.headers.subscribe':
+                if session not in self.watch_headers:
+                    self.watch_headers.append(session)
+
+            elif method == 'blockchain.address.subscribe':
+                address = params[0]
+                l = self.watched_addresses.get(address)
+                if l is None:
+                    self.watched_addresses[address] = [session]
+                elif session not in l:
+                    l.append(session)
+
+
+    def do_unsubscribe(self, method, params, session):
+        with self.watch_lock:
+            if method == 'blockchain.numblocks.subscribe':
+                if session in self.watch_blocks:
+                    self.watch_blocks.remove(session)
+            elif method == 'blockchain.headers.subscribe':
+                if session in self.watch_headers:
+                    self.watch_headers.remove(session)
+            elif method == "blockchain.address.subscribe":
+                addr = params[0]
+                l = self.watched_addresses.get(addr)
+                if not l:
+                    return
+                if session in l:
+                    l.remove(session)
+                if session in l:
+                    print "error rc!!"
+                    self.shared.stop()
+                if l == []:
+                    self.watched_addresses.pop(addr)
 
 
     def process(self, session, request, cache_only=False):
@@ -686,33 +725,19 @@ class BlockchainProcessor(Processor):
         error = None
 
         if method == 'blockchain.numblocks.subscribe':
-            with self.watch_lock:
-                if session not in self.watch_blocks:
-                    self.watch_blocks.append(session)
             result = self.height
 
         elif method == 'blockchain.headers.subscribe':
-            with self.watch_lock:
-                if session not in self.watch_headers:
-                    self.watch_headers.append(session)
             result = self.header
 
         elif method == 'blockchain.address.subscribe':
             try:
                 address = params[0]
                 result = self.get_status(address, cache_only)
-                with self.watch_lock:
-                    l = self.watched_addresses.get(address)
-                    if l is None:
-                        self.watched_addresses[address] = [session]
-                    elif session not in l:
-                        l.append(session)
-
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
-
         elif method == 'blockchain.address.get_history':
             try:
                 address = params[0]
@@ -863,10 +888,11 @@ class BlockchainProcessor(Processor):
 
         self.header = self.block2header(self.bitcoind('getblock', [self.last_hash]))
 
+
     def memorypool_update(self):
-        mempool_hashes = self.bitcoind('getrawmempool')
+        mempool_hashes = set(self.bitcoind('getrawmempool'))
+        touched_addresses = set([])
 
-        touched_addresses = []
         for tx_hash in mempool_hashes:
             if tx_hash in self.mempool_hashes:
                 continue
@@ -881,16 +907,16 @@ class BlockchainProcessor(Processor):
                 addr = x.get('address')
                 if addr and addr not in mpa:
                     mpa.append(addr)
-                    touched_addresses.append(addr)
+                    touched_addresses.add(addr)
 
             for x in tx.get('outputs'):
                 addr = x.get('address')
                 if addr and addr not in mpa:
                     mpa.append(addr)
-                    touched_addresses.append(addr)
+                    touched_addresses.add(addr)
 
             self.mempool_addresses[tx_hash] = mpa
-            self.mempool_hashes.append(tx_hash)
+            self.mempool_hashes.add(tx_hash)
 
         # remove older entries from mempool_hashes
         self.mempool_hashes = mempool_hashes
@@ -900,7 +926,7 @@ class BlockchainProcessor(Processor):
             if tx_hash not in self.mempool_hashes:
                 self.mempool_addresses.pop(tx_hash)
                 for addr in addresses:
-                    touched_addresses.append(addr)
+                    touched_addresses.add(addr)
 
         # rebuild mempool histories
         new_mempool_hist = {}