From 9526ce60749182cea0be7da5838fa04e3d59e0f1 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 15 Nov 2012 20:12:41 +0400 Subject: [PATCH] remove unsubscribed addresses --- backends/bitcoind/blockchain_processor.py | 11 +++---- processor.py | 41 +++++++++++++--------------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/backends/bitcoind/blockchain_processor.py b/backends/bitcoind/blockchain_processor.py index 3a0fdda..e6dcd57 100644 --- a/backends/bitcoind/blockchain_processor.py +++ b/backends/bitcoind/blockchain_processor.py @@ -537,16 +537,17 @@ class BlockchainProcessor(Processor): error = str(e) + ': ' + address print_log( "error:", error ) - elif method == 'blockchain.address.subscribe2': + elif method == 'blockchain.address.unsubscribe': try: address = params[0] - result = self.get_status(address, cache_only) - self.watch_address(address) + self.watched_addresses.remove(address) + print_log('unsubscribed', address) + result = "ok" except BaseException, e: error = str(e) + ': ' + address print_log( "error:", error ) - elif method == 'blockchain.address.get_history2': + elif method == 'blockchain.address.get_history': try: address = params[0] result = self.get_history( address, cache_only ) @@ -779,8 +780,6 @@ class BlockchainProcessor(Processor): if addr in self.watched_addresses: status = self.get_status( addr ) self.push_response({ 'id': None, 'method':'blockchain.address.subscribe', 'params':[addr, status] }) - self.push_response({ 'id': None, 'method':'blockchain.address.subscribe2', 'params':[addr, status] }) - if not self.shared.stopped(): threading.Timer(10, self.main_iteration).start() diff --git a/processor.py b/processor.py index 5590874..82124e0 100644 --- a/processor.py +++ b/processor.py @@ -151,18 +151,10 @@ class RequestDispatcher(threading.Thread): params = request.get('params',[]) suffix = method.split('.')[-1] - is_new = session.protocol_version >= 0.5 - - if is_new and method == 'blockchain.address.get_history': - method = 'blockchain.address.get_history2' - request['method'] = method - - if suffix == 'subscribe': - if is_new and method == 'blockchain.address.subscribe': - method = 'blockchain.address.subscribe2' - request['method'] = method - - session.subscribe_to_service(method, params) + if session is not None: + is_new = session.protocol_version >= 0.5 + if suffix == 'subscribe': + session.subscribe_to_service(method, params) # store session and id locally request['id'] = self.store_session_id(session, request['id']) @@ -253,7 +245,7 @@ class Session: return method, elif method == "blockchain.headers.subscribe": return method, - elif method in ["blockchain.address.subscribe", "blockchain.address.subscribe2"]: + elif method in ["blockchain.address.subscribe"]: if not params: return None else: @@ -268,9 +260,9 @@ class Session: class ResponseDispatcher(threading.Thread): - def __init__(self, shared, processor): + def __init__(self, shared, request_dispatcher): self.shared = shared - self.processor = processor + self.request_dispatcher = request_dispatcher threading.Thread.__init__(self) self.daemon = True @@ -279,7 +271,7 @@ class ResponseDispatcher(threading.Thread): self.update() def update(self): - response = self.processor.pop_response() + response = self.request_dispatcher.pop_response() #print "pop response", response internal_id = response.get('id') method = response.get('method') @@ -287,25 +279,30 @@ class ResponseDispatcher(threading.Thread): # A notification if internal_id is None: # and method is not None and params is not None: - self.notification(method, params, response) + found = self.notification(method, params, response) + if not found and method == 'blockchain.address.subscribe': + self.request_dispatcher.push_request(None,{'method':method.replace('.subscribe', '.unsubscribe'), 'params':params, 'id':None}) + pass # A response - elif internal_id is not None: # and method is None and params is None: + elif internal_id is not None: self.send_response(internal_id, response) else: print_log( "no method", response) def notification(self, method, params, response): subdesc = Session.build_subdesc(method, params) - for session in self.processor.sessions: + found = False + for session in self.request_dispatcher.sessions: if session.stopped(): continue if session.contains_subscription(subdesc): - if response.get('method') == "blockchain.address.subscribe2": - response['method'] = "blockchain.address.subscribe" session.send_response(response) + found = True + if not found: print "no subscriber for", subdesc + return found def send_response(self, internal_id, response): - session, message_id = self.processor.get_session_id(internal_id) + session, message_id = self.request_dispatcher.get_session_id(internal_id) if session: response['id'] = message_id session.send_response(response) -- 1.7.1