#print "response", response
self.dispatcher.request_dispatcher.push_response(session, response)
+ def close(self):
+ pass
+
def run(self):
while not self.shared.stopped():
- request, session = self.queue.get(10000000000)
+ try:
+ request, session = self.queue.get(True, timeout=1)
+ except:
+ continue
try:
self.process(request, session)
except:
traceback.print_exc(file=sys.stdout)
- print_log("processor terminating")
+ self.close()
class Dispatcher:
p.add_request(session, request)
if method in ['server.version']:
- session.version = params[0]
try:
+ session.version = params[0]
session.protocol_version = float(params[1])
except:
pass
def subscribe_to_service(self, method, params):
with self.lock:
+ if self._stopped:
+ return
if (method, params) not in self.subscriptions:
self.subscriptions.append((method,params))
+ self.bp.do_subscribe(method, params, self)
def stop_subscriptions(self):
- bp = self.bp
-
with self.lock:
s = self.subscriptions[:]
-
for method, params in s:
- with bp.watch_lock:
- if method == 'blockchain.numblocks.subscribe':
- if self in bp.watch_blocks:
- bp.watch_blocks.remove(self)
- elif method == 'blockchain.headers.subscribe':
- if self in bp.watch_headers:
- bp.watch_headers.remove(self)
- elif method == "blockchain.address.subscribe":
- addr = params[0]
- l = bp.watched_addresses.get(addr)
- if not l:
- continue
- if self in l:
- l.remove(self)
- if self in l:
- print "error rc!!"
- bp.shared.stop()
-
- if l == []:
- bp.watched_addresses.pop(addr)
-
+ self.bp.do_unsubscribe(method, params, self)
with self.lock:
self.subscriptions = []