traceback.print_exc(file=sys.stdout)
+def close_session(session_id):
+ print "lost connection", session_id
+ sessions.pop(session_id)
+ if session_id in sessions_sub_numblocks:
+ sessions_sub_numblocks.remove(session_id)
+
+
# one thread per client. put requests in a queue.
def tcp_client_thread(ipaddr,conn):
""" use a persistent connection. put commands in a queue."""
d = conn.recv(1024)
msg += d
if not d:
- print "lost connection", session_id
- sessions.pop(session_id)
- sessions_sub_numblocks.remove(session_id)
+ close_session(session_id)
break
while True:
if s ==-1:
break
else:
- c = msg[0:s]
+ c = msg[0:s].strip()
msg = msg[s+1:]
- c = json.loads(c)
+ if c == 'quit':
+ conn.close()
+ close_session(session_id)
+ return
+ try:
+ c = json.loads(c)
+ except:
+ print "json error", repr(c)
+ continue
try:
cmd = c['method']
data = c['params']
input_queue.put((session_id, cmd, data))
+
# read commands from the input queue. perform requests, etc. this should be called from the main thread.
def process_input_queue():
while not stopping:
session_id, cmd, data = input_queue.get()
+ if session_id not in sessions.keys():
+ continue
out = None
if cmd == 'address.subscribe':
subscribe_to_address(session_id,data)
sessions[session_id]['version'] = data
elif cmd == 'server.banner':
out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
+ elif cmd == 'server.peers':
+ out = json.dumps( { 'method':'server.peers', 'result':peer_list.values() } )
elif cmd == 'address.get_history':
address = data
out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
session_id, out = output_queue.get()
session = sessions.get(session_id)
if session:
- conn = session.get('conn')
- conn.send(out+'\n')
+ try:
+ conn = session.get('conn')
+ conn.send(out+'\n')
+ except:
+ close_session(session_id)
+