self.queue.put((session, request))
+ def do_subscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session not in self.watch_blocks:
+ self.watch_blocks.append(session)
+
+ elif method == 'blockchain.headers.subscribe':
+ if session not in self.watch_headers:
+ self.watch_headers.append(session)
+
+ elif method == 'blockchain.address.subscribe':
+ address = params[0]
+ l = self.watched_addresses.get(address)
+ if l is None:
+ self.watched_addresses[address] = [session]
+ elif session not in l:
+ l.append(session)
+
+
+ def do_unsubscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session in self.watch_blocks:
+ self.watch_blocks.remove(session)
+ elif method == 'blockchain.headers.subscribe':
+ if session in self.watch_headers:
+ self.watch_headers.remove(session)
+ elif method == "blockchain.address.subscribe":
+ addr = params[0]
+ l = self.watched_addresses.get(addr)
+ if not l:
+ return
+ if session in l:
+ l.remove(session)
+ if session in l:
+ print "error rc!!"
+ self.shared.stop()
+ if l == []:
+ self.watched_addresses.pop(addr)
def process(self, session, request, cache_only=False):
error = None
if method == 'blockchain.numblocks.subscribe':
- with self.watch_lock:
- if session not in self.watch_blocks:
- self.watch_blocks.append(session)
result = self.height
elif method == 'blockchain.headers.subscribe':
- with self.watch_lock:
- if session not in self.watch_headers:
- self.watch_headers.append(session)
result = self.header
elif method == 'blockchain.address.subscribe':
try:
address = params[0]
result = self.get_status(address, cache_only)
- with self.watch_lock:
- l = self.watched_addresses.get(address)
- if l is None:
- self.watched_addresses[address] = [session]
- elif session not in l:
- l.append(session)
-
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
-
elif method == 'blockchain.address.get_history':
try:
address = params[0]
def subscribe_to_service(self, method, params):
with self.lock:
+ if self._stopped:
+ return
if (method, params) not in self.subscriptions:
self.subscriptions.append((method,params))
+ self.bp.do_subscribe(method, params, self)
def stop_subscriptions(self):
- bp = self.bp
-
with self.lock:
s = self.subscriptions[:]
-
for method, params in s:
- with bp.watch_lock:
- if method == 'blockchain.numblocks.subscribe':
- if self in bp.watch_blocks:
- bp.watch_blocks.remove(self)
- elif method == 'blockchain.headers.subscribe':
- if self in bp.watch_headers:
- bp.watch_headers.remove(self)
- elif method == "blockchain.address.subscribe":
- addr = params[0]
- l = bp.watched_addresses.get(addr)
- if not l:
- continue
- if self in l:
- l.remove(self)
- if self in l:
- print "error rc!!"
- bp.shared.stop()
-
- if l == []:
- bp.watched_addresses.pop(addr)
-
+ self.bp.do_unsubscribe(method, params, self)
with self.lock:
self.subscriptions = []