self.queue.put((session, request))
+ def do_subscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session not in self.watch_blocks:
+ self.watch_blocks.append(session)
+
+ elif method == 'blockchain.headers.subscribe':
+ if session not in self.watch_headers:
+ self.watch_headers.append(session)
+
+ elif method == 'blockchain.address.subscribe':
+ address = params[0]
+ l = self.watched_addresses.get(address)
+ if l is None:
+ self.watched_addresses[address] = [session]
+ elif session not in l:
+ l.append(session)
+
+
+ def do_unsubscribe(self, method, params, session):
+ with self.watch_lock:
+ if method == 'blockchain.numblocks.subscribe':
+ if session in self.watch_blocks:
+ self.watch_blocks.remove(session)
+ elif method == 'blockchain.headers.subscribe':
+ if session in self.watch_headers:
+ self.watch_headers.remove(session)
+ elif method == "blockchain.address.subscribe":
+ addr = params[0]
+ l = self.watched_addresses.get(addr)
+ if not l:
+ return
+ if session in l:
+ l.remove(session)
+ if session in l:
+ print "error rc!!"
+ self.shared.stop()
+ if l == []:
+ self.watched_addresses.pop(addr)
def process(self, session, request, cache_only=False):
error = None
if method == 'blockchain.numblocks.subscribe':
- with self.watch_lock:
- if session not in self.watch_blocks:
- self.watch_blocks.append(session)
result = self.height
elif method == 'blockchain.headers.subscribe':
- with self.watch_lock:
- if session not in self.watch_headers:
- self.watch_headers.append(session)
result = self.header
elif method == 'blockchain.address.subscribe':
try:
address = params[0]
result = self.get_status(address, cache_only)
- with self.watch_lock:
- l = self.watched_addresses.get(address)
- if l is None:
- self.watched_addresses[address] = [session]
- elif session not in l:
- l.append(session)
-
except BaseException, e:
error = str(e) + ': ' + address
print_log("error:", error)
-
elif method == 'blockchain.address.get_history':
try:
address = params[0]