1 import thread, threading, time, socket, traceback, ast
7 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
10 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
13 class NativeServer(threading.Thread):
15 def __init__(self, shared, store, irc, banner, host, port):
16 threading.Thread.__init__(self)
27 def modified_addresses(self,a_session):
29 session = copy.deepcopy(a_session)
30 addresses = session['addresses']
31 session['last_time'] = time.time()
34 for addr in addresses:
35 status = self.store.get_status( addr )
36 msg_id, last_status = addresses.get( addr )
37 if last_status != status:
38 addresses[addr] = msg_id, status
44 def poll_session(self, session_id):
45 session = self.sessions.get(session_id)
47 print time.asctime(), "session not found", session_id
50 self.sessions[session_id]['last_time'] = time.time()
51 ret, addresses = self.modified_addresses(session)
52 if ret: self.sessions[session_id]['addresses'] = addresses
53 return repr( (self.store.block_number,ret))
56 def add_address_to_session(self, session_id, address):
57 status = self.store.get_status(address)
58 self.sessions[session_id]['addresses'][address] = ("", status)
59 self.sessions[session_id]['last_time'] = time.time()
63 def new_session(self, version, addresses):
64 session_id = random_string(10)
65 self.sessions[session_id] = { 'addresses':{}, 'version':version }
67 self.sessions[session_id]['addresses'][a] = ('','')
68 out = repr( (session_id, self.banner.replace('\\n','\n') ) )
69 self.sessions[session_id]['last_time'] = time.time()
73 def update_session(self, session_id,addresses):
74 """deprecated in 0.42, wad replaced by add_address_to_session"""
75 self.sessions[session_id]['addresses'] = {}
77 self.sessions[session_id]['addresses'][a] = ''
78 self.sessions[session_id]['last_time'] = time.time()
83 def native_client_thread(self, ipaddr,conn):
93 msg = msg.split('#', 1)[0]
96 cmd, data = ast.literal_eval(msg)
98 print "syntax error", repr(msg), ipaddr
102 out = self.do_command(cmd, data, ipaddr)
104 #print ipaddr, cmd, len(out)
108 print "error, could not send"
115 def do_command(self, cmd, data, ipaddr):
118 out = "%d"%block_number
120 elif cmd in ['session','new_session']:
123 addresses = ast.literal_eval(data)
126 version, addresses = ast.literal_eval(data)
127 if version[0]=="0": version = "v" + version
131 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
132 out = self.new_session(version, addresses)
134 elif cmd=='address.subscribe':
136 session_id, addr = ast.literal_eval(data)
138 traceback.print_exc(file=sys.stdout)
141 out = self.add_address_to_session(session_id,addr)
143 elif cmd=='update_session':
145 session_id, addresses = ast.literal_eval(data)
147 traceback.print_exc(file=sys.stdout)
149 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
150 out = self.update_session(session_id,addresses)
153 out = self.poll_session(data)
157 out = repr( self.store.get_history( address ) )
160 out = self.store.send_tx(data)
161 print timestr(), "sent tx:", ipaddr, out
164 out = repr(self.irc.get_peers())
172 def clean_session_thread(self):
173 while not self.shared.stopped():
176 for k,s in self.sessions.items():
177 if s.get('type') == 'persistent': continue
181 print "lost session", k
184 thread.start_new_thread(self.clean_session_thread, ())
186 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
188 s.bind((self.host, self.port))
190 while not self.shared.stopped():
191 conn, addr = s.accept()
193 thread.start_new_thread(self.native_client_thread, (addr, conn,))
195 # can't start new thread if there is no memory..
196 traceback.print_exc(file=sys.stdout)