self.response_queue = queue.Queue()
self.lock = threading.Lock()
self.idlock = threading.Lock()
- self.sessions = []
+ self.sessions = {}
self.processors = {}
def push_response(self, session, item):
def pop_request(self):
return self.request_queue.get()
+ def get_session_by_address(self, address):
+ for x in self.sessions.values():
+ if x.address == address:
+ return x
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
+
+ lastgc = 0
+
while not self.shared.stopped():
session, request = self.pop_request()
try:
except:
traceback.print_exc(file=sys.stdout)
+ if time.time() - lastgc > 60.0:
+ self.collect_garbage()
+ lastgc = time.time()
+
self.stop()
def stop(self):
def get_sessions(self):
with self.lock:
- r = self.sessions[:]
+ r = self.sessions.values()
return r
def add_session(self, session):
+ key = session.key()
with self.lock:
- self.sessions.append(session)
+ self.sessions[key] = session
- def collect_garbage(self):
- # Deep copy entire sessions list and blank it
- # This is done to minimize lock contention
+ def remove_session(self, session):
+ key = session.key()
with self.lock:
- sessions = self.sessions[:]
-
- active_sessions = []
+ self.sessions.pop(key)
+ def collect_garbage(self):
now = time.time()
- for session in sessions:
- if (now - session.time) > 1000:
+ for session in self.sessions.values():
+ if (now - session.time) > session.timeout:
session.stop()
- bp = self.processors['blockchain']
-
- for session in sessions:
- if not session.stopped():
- # If session is still alive then re-add it back
- # to our internal register
- active_sessions.append(session)
- else:
- session.stop_subscriptions(bp)
-
- with self.lock:
- self.sessions = active_sessions[:]
-
class Session:
- def __init__(self):
+ def __init__(self, dispatcher):
+ self.dispatcher = dispatcher
+ self.bp = self.dispatcher.processors['blockchain']
self._stopped = False
self.lock = threading.Lock()
self.subscriptions = []
threading.Timer(2, self.info).start()
+ def key(self):
+ return self.name + self.address
+
+
# Debugging method. Doesn't need to be threadsafe.
def info(self):
for sub in self.subscriptions:
"%3d" % len(self.subscriptions),
self.version)
+ def stop(self):
+ with self.lock:
+ if self._stopped:
+ return
+ self._stopped = True
+
+ self.shutdown()
+ self.dispatcher.remove_session(self)
+ self.stop_subscriptions()
+
+
+ def shutdown(self):
+ pass
+
+
def stopped(self):
with self.lock:
return self._stopped
def subscribe_to_service(self, method, params):
with self.lock:
+ if self._stopped:
+ return
if (method, params) not in self.subscriptions:
self.subscriptions.append((method,params))
+ self.bp.do_subscribe(method, params, self)
- def stop_subscriptions(self, bp):
+ def stop_subscriptions(self):
with self.lock:
s = self.subscriptions[:]
-
for method, params in s:
- with bp.watch_lock:
- if method == 'blockchain.numblocks.subscribe':
- if self in bp.watch_blocks:
- bp.watch_blocks.remove(self)
- elif method == 'blockchain.headers.subscribe':
- if self in bp.watch_headers:
- bp.watch_headers.remove(self)
- elif method == "blockchain.address.subscribe":
- addr = params[0]
- l = bp.watched_addresses.get(addr)
- if not l:
- continue
- if self in l:
- l.remove(self)
- if l == []:
- bp.watched_addresses.pop(addr)
-
+ self.bp.do_unsubscribe(method, params, self)
with self.lock:
self.subscriptions = []