def pop_request(self):
return self.request_queue.get()
+ def collect_garbage(self):
+ # Deep copy entire sessions list and blank it
+ # This is done to minimise lock contention
+ with self.lock:
+ sessions = self.sessions[:]
+ self.sessions = []
+ for session in sessions:
+ if not session.stopped():
+ # If session is still alive then re-add it back
+ # to our internal register
+ self.add_session(session)
+
def run(self):
if self.shared is None:
raise TypeError("self.shared not set in Processor")
while not self.shared.stopped():
- # Deep copy entire sessions list and blank it
- # This is done to minimise lock contention
- with self.lock:
- sessions = self.sessions[:]
- self.sessions = []
- for session in sessions:
- if not session.stopped():
- # If session is still alive then re-add it back
- # to our internal register
- self.add_session(session)
-
+ self.collect_garbage()
session, request = self.pop_request()
self.process(session, request)
# When ready, you call
# self.push_response(session,response)
+ def update_from_blocknum(self,block_number):
+ for session in self.sessions:
+ if not session.stopped():
+ if session.numblocks_sub is not None:
+ response = { 'id':session.numblocks_sub, 'result':block_number }
+ self.push_response(session,response)
+
+ def update_from_address(self,addr):
+ for session in self.sessions:
+ if not session.stopped():
+ m = session.addresses_sub.get(addr)
+ if m:
+ status = self.get_status( addr )
+ message_id, last_status = m
+ if status != last_status:
+ session.subscribe_to_address(addr,message_id, status)
+ response = { 'id':message_id, 'result':status }
+ self.push_response(session,response)
+
+ def get_status(self,addr):
+ # return status of an address
+ # return store.get_status(addr)
+ pass
+
+
class Session:
def __init__(self, connection, address):
self.lock = threading.Lock()
self.numblocks_sub = None
self.addresses_sub = {}
+ print "new session", address
def stop(self):
self._connection.close()
# Possible race condition here by having session
# close connection?
# I assume Python connections are thread safe interfaces
- connection = session.connection()
try:
+ connection = session.connection()
connection.send(raw_response + "\n")
except:
session.stop()
def run(self):
while not self.shared.stopped():
if not self.update():
- self.session.stop()
- return
+ break
+
+ while self.parse():
+ pass
def update(self):
data = self.receive()
- if data is None:
+ if not data:
# close_session
- self.stop()
+ self.session.stop()
return False
self.message += data
- if not self.parse():
- return False
return True
def receive(self):
try:
return self.session.connection().recv(1024)
except socket.error:
- return None
+ return ''
def parse(self):
raw_buffer = self.message.find('\n')
if raw_buffer == -1:
- return True
+ return False
raw_command = self.message[0:raw_buffer].strip()
self.message = self.message[raw_buffer + 1:]
if raw_command == 'quit':
+ self.session.stop()
return False
try:
processor.shared = shared
processor.start()
# Create various transports we need
- transports = TcpServer(shared, processor, "ecdsa.org", 50002),
+ transports = TcpServer(shared, processor, "176.31.24.241", 50001),
for server in transports:
server.start()
while not shared.stopped():