1 import thread, threading, time, socket, traceback, ast, sys
7 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
10 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
13 class NativeServer(threading.Thread):
15 def __init__(self, shared, abe, irc, banner, host, port):
16 threading.Thread.__init__(self)
19 self.store = abe.store
28 def modified_addresses(self,a_session):
30 session = copy.deepcopy(a_session)
31 addresses = session['addresses']
32 session['last_time'] = time.time()
35 for addr in addresses:
36 status = self.store.get_status( addr )
37 msg_id, last_status = addresses.get( addr )
38 if last_status != status:
39 addresses[addr] = msg_id, status
45 def poll_session(self, session_id):
46 session = self.sessions.get(session_id)
48 print time.asctime(), "session not found", session_id
51 self.sessions[session_id]['last_time'] = time.time()
52 ret, addresses = self.modified_addresses(session)
53 if ret: self.sessions[session_id]['addresses'] = addresses
54 return repr( (self.abe.block_number,ret))
57 def add_address_to_session(self, session_id, address):
58 status = self.store.get_status(address)
59 self.sessions[session_id]['addresses'][address] = ("", status)
60 self.sessions[session_id]['last_time'] = time.time()
64 def new_session(self, version, addresses):
65 session_id = random_string(10)
66 self.sessions[session_id] = { 'addresses':{}, 'version':version }
68 self.sessions[session_id]['addresses'][a] = ('','')
69 out = repr( (session_id, self.banner.replace('\\n','\n') ) )
70 self.sessions[session_id]['last_time'] = time.time()
74 def update_session(self, session_id,addresses):
75 """deprecated in 0.42, wad replaced by add_address_to_session"""
76 self.sessions[session_id]['addresses'] = {}
78 self.sessions[session_id]['addresses'][a] = ''
79 self.sessions[session_id]['last_time'] = time.time()
84 def native_client_thread(self, ipaddr,conn):
94 msg = msg.split('#', 1)[0]
97 cmd, data = ast.literal_eval(msg)
99 print "syntax error", repr(msg), ipaddr
103 out = self.do_command(cmd, data, ipaddr)
105 #print ipaddr, cmd, len(out)
109 print "error, could not send"
116 def do_command(self, cmd, data, ipaddr):
119 out = "%d"%self.abe.block_number
121 elif cmd in ['session','new_session']:
124 addresses = ast.literal_eval(data)
127 version, addresses = ast.literal_eval(data)
128 if version[0]=="0": version = "v" + version
132 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
133 out = self.new_session(version, addresses)
135 elif cmd=='address.subscribe':
137 session_id, addr = ast.literal_eval(data)
139 traceback.print_exc(file=sys.stdout)
142 out = self.add_address_to_session(session_id,addr)
144 elif cmd=='update_session':
146 session_id, addresses = ast.literal_eval(data)
148 traceback.print_exc(file=sys.stdout)
150 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
151 out = self.update_session(session_id,addresses)
154 out = self.poll_session(data)
158 out = repr( self.store.get_history( address ) )
161 out = self.store.send_tx(data)
162 print timestr(), "sent tx:", ipaddr, out
165 out = repr(self.irc.get_peers())
173 def clean_session_thread(self):
174 while not self.shared.stopped():
177 for k,s in self.sessions.items():
178 if s.get('type') == 'persistent': continue
182 print "lost session", k
185 thread.start_new_thread(self.clean_session_thread, ())
187 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
188 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
189 s.bind((self.host, self.port))
191 while not self.shared.stopped():
192 conn, addr = s.accept()
194 thread.start_new_thread(self.native_client_thread, (addr, conn,))
196 # can't start new thread if there is no memory..
197 traceback.print_exc(file=sys.stdout)