use a dict for sessions, instead of a list. Deprecates #48
[electrum-server.git] / backends / bitcoind / blockchain_processor.py
index cd169af..fb7b957 100644 (file)
@@ -24,7 +24,12 @@ class BlockchainProcessor(Processor):
         self.shared = shared
         self.config = config
         self.up_to_date = False
-        self.watched_addresses = []
+
+        self.watch_lock = threading.Lock()
+        self.watch_blocks = []
+        self.watch_headers = []
+        self.watched_addresses = {}
+
         self.history_cache = {}
         self.chunk_cache = {}
         self.cache_lock = threading.Lock()
@@ -664,14 +669,16 @@ class BlockchainProcessor(Processor):
         for addr in self.batch_list.keys():
             self.invalidate_cache(addr)
 
-    def add_request(self, request):
+    def add_request(self, session, request):
         # see if we can get if from cache. if not, add to queue
-        if self.process(request, cache_only=True) == -1:
-            self.queue.put(request)
+        if self.process(session, request, cache_only=True) == -1:
+            self.queue.put((session, request))
+
+
 
-    def process(self, request, cache_only=False):
-        #print "abe process", request
 
+    def process(self, session, request, cache_only=False):
+        
         message_id = request['id']
         method = request['method']
         params = request.get('params', [])
@@ -679,35 +686,33 @@ class BlockchainProcessor(Processor):
         error = None
 
         if method == 'blockchain.numblocks.subscribe':
+            with self.watch_lock:
+                if session not in self.watch_blocks:
+                    self.watch_blocks.append(session)
             result = self.height
 
         elif method == 'blockchain.headers.subscribe':
+            with self.watch_lock:
+                if session not in self.watch_headers:
+                    self.watch_headers.append(session)
             result = self.header
 
         elif method == 'blockchain.address.subscribe':
             try:
                 address = params[0]
                 result = self.get_status(address, cache_only)
-                self.watch_address(address)
-            except BaseException, e:
-                error = str(e) + ': ' + address
-                print_log("error:", error)
+                with self.watch_lock:
+                    l = self.watched_addresses.get(address)
+                    if l is None:
+                        self.watched_addresses[address] = [session]
+                    elif session not in l:
+                        l.append(session)
 
-        elif method == 'blockchain.address.unsubscribe':
-            try:
-                password = params[0]
-                address = params[1]
-                if password == self.config.get('server', 'password'):
-                    self.watched_addresses.remove(address)
-                    # print_log('unsubscribed', address)
-                    result = "ok"
-                else:
-                    print_log('incorrect password')
-                    result = "authentication error"
             except BaseException, e:
                 error = str(e) + ': ' + address
                 print_log("error:", error)
 
+
         elif method == 'blockchain.address.get_history':
             try:
                 address = params[0]
@@ -774,15 +779,12 @@ class BlockchainProcessor(Processor):
             return -1
 
         if error:
-            self.push_response({'id': message_id, 'error': error})
+            self.push_response(session, {'id': message_id, 'error': error})
         elif result != '':
-            self.push_response({'id': message_id, 'result': result})
+            self.push_response(session, {'id': message_id, 'result': result})
 
-    def watch_address(self, addr):
-        if addr not in self.watched_addresses:
-            self.watched_addresses.append(addr)
 
-    def getfullblock(block_hash):
+    def getfullblock(self, block_hash):
         block = self.bitcoind('getblock', [block_hash])
 
         rawtxreq = []
@@ -805,9 +807,11 @@ class BlockchainProcessor(Processor):
         r = loads(respdata)
         rawtxdata = []
         for ir in r:
-            if r['error'] is not None:
-                raise BaseException(r['error'])
-            rawtxdata.append(r['result'])
+            if ir['error'] is not None:
+                self.shared.stop()
+                print_log("Error: make sure you run bitcoind with txindex=1; use -reindex if needed.")
+                raise BaseException(ir['error'])
+            rawtxdata.append(ir['result'])
         block['tx'] = rawtxdata
         return block
 
@@ -921,9 +925,12 @@ class BlockchainProcessor(Processor):
                 print_log("cache: invalidating", address)
                 self.history_cache.pop(address)
 
-        if address in self.watched_addresses:
+        with self.watch_lock:
+            sessions = self.watched_addresses.get(address)
+
+        if sessions:
             # TODO: update cache here. if new value equals cached value, do not send notification
-            self.address_queue.put(address)
+            self.address_queue.put((address,sessions))
 
     def main_iteration(self):
         if self.shared.stopped():
@@ -939,33 +946,36 @@ class BlockchainProcessor(Processor):
 
         if self.sent_height != self.height:
             self.sent_height = self.height
-            self.push_response({
-                'id': None,
-                'method': 'blockchain.numblocks.subscribe',
-                'params': [self.height],
-            })
+            for session in self.watch_blocks:
+                self.push_response(session, {
+                        'id': None,
+                        'method': 'blockchain.numblocks.subscribe',
+                        'params': [self.height],
+                        })
 
         if self.sent_header != self.header:
             print_log("blockchain: %d (%.3fs)" % (self.height, t2 - t1))
             self.sent_header = self.header
-            self.push_response({
-                'id': None,
-                'method': 'blockchain.headers.subscribe',
-                'params': [self.header],
-            })
+            for session in self.watch_headers:
+                self.push_response(session, {
+                        'id': None,
+                        'method': 'blockchain.headers.subscribe',
+                        'params': [self.header],
+                        })
 
         while True:
             try:
-                addr = self.address_queue.get(False)
+                addr, sessions = self.address_queue.get(False)
             except:
                 break
-            if addr in self.watched_addresses:
-                status = self.get_status(addr)
-                self.push_response({
-                    'id': None,
-                    'method': 'blockchain.address.subscribe',
-                    'params': [addr, status],
-                })
+
+            status = self.get_status(addr)
+            for session in sessions:
+                self.push_response(session, {
+                        'id': None,
+                        'method': 'blockchain.address.subscribe',
+                        'params': [addr, status],
+                        })
 
         if not self.shared.stopped():
             threading.Timer(10, self.main_iteration).start()