--- /dev/null
+import thread, threading, time, socket, traceback, ast
+
+
+
+def random_string(N):
+ import random, string
+ return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
+
+def timestr():
+ return time.strftime("[%d/%m/%Y-%H:%M:%S]")
+
+
+class NativeServer(threading.Thread):
+
+ def __init__(self, shared, store, irc, banner, host, port):
+ threading.Thread.__init__(self)
+ self.banner = banner
+ self.store = store
+ self.irc = irc
+ self.sessions = {}
+ self.host = host
+ self.port = port
+ self.daemon = True
+ self.shared = shared
+
+
+ def modified_addresses(self,a_session):
+ import copy
+ session = copy.deepcopy(a_session)
+ addresses = session['addresses']
+ session['last_time'] = time.time()
+ ret = {}
+ k = 0
+ for addr in addresses:
+ status = self.store.get_status( addr )
+ msg_id, last_status = addresses.get( addr )
+ if last_status != status:
+ addresses[addr] = msg_id, status
+ ret[addr] = status
+
+ return ret, addresses
+
+
+ def poll_session(self, session_id):
+ session = self.sessions.get(session_id)
+ if session is None:
+ print time.asctime(), "session not found", session_id
+ return -1, {}
+ else:
+ self.sessions[session_id]['last_time'] = time.time()
+ ret, addresses = self.modified_addresses(session)
+ if ret: self.sessions[session_id]['addresses'] = addresses
+ return repr( (self.store.block_number,ret))
+
+
+ def add_address_to_session(self, session_id, address):
+ status = self.store.get_status(address)
+ self.sessions[session_id]['addresses'][address] = ("", status)
+ self.sessions[session_id]['last_time'] = time.time()
+ return status
+
+
+ def new_session(self, version, addresses):
+ session_id = random_string(10)
+ self.sessions[session_id] = { 'addresses':{}, 'version':version }
+ for a in addresses:
+ self.sessions[session_id]['addresses'][a] = ('','')
+ out = repr( (session_id, self.banner.replace('\\n','\n') ) )
+ self.sessions[session_id]['last_time'] = time.time()
+ return out
+
+
+ def update_session(self, session_id,addresses):
+ """deprecated in 0.42, wad replaced by add_address_to_session"""
+ self.sessions[session_id]['addresses'] = {}
+ for a in addresses:
+ self.sessions[session_id]['addresses'][a] = ''
+ self.sessions[session_id]['last_time'] = time.time()
+ return 'ok'
+
+
+
+ def native_client_thread(self, ipaddr,conn):
+ try:
+ ipaddr = ipaddr[0]
+ msg = ''
+ while 1:
+ d = conn.recv(1024)
+ msg += d
+ if not d:
+ break
+ if '#' in msg:
+ msg = msg.split('#', 1)[0]
+ break
+ try:
+ cmd, data = ast.literal_eval(msg)
+ except:
+ print "syntax error", repr(msg), ipaddr
+ conn.close()
+ return
+
+ out = self.do_command(cmd, data, ipaddr)
+ if out:
+ #print ipaddr, cmd, len(out)
+ try:
+ conn.send(out)
+ except:
+ print "error, could not send"
+
+ finally:
+ conn.close()
+
+
+
+ def do_command(self, cmd, data, ipaddr):
+
+ if cmd=='b':
+ out = "%d"%block_number
+
+ elif cmd in ['session','new_session']:
+ try:
+ if cmd == 'session':
+ addresses = ast.literal_eval(data)
+ version = "old"
+ else:
+ version, addresses = ast.literal_eval(data)
+ if version[0]=="0": version = "v" + version
+ except:
+ print "error", data
+ return None
+ print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
+ out = self.new_session(version, addresses)
+
+ elif cmd=='address.subscribe':
+ try:
+ session_id, addr = ast.literal_eval(data)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ print data
+ return None
+ out = self.add_address_to_session(session_id,addr)
+
+ elif cmd=='update_session':
+ try:
+ session_id, addresses = ast.literal_eval(data)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ return None
+ print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
+ out = self.update_session(session_id,addresses)
+
+ elif cmd=='poll':
+ out = self.poll_session(data)
+
+ elif cmd == 'h':
+ address = data
+ out = repr( self.store.get_history( address ) )
+
+ elif cmd =='tx':
+ out = self.store.send_tx(data)
+ print timestr(), "sent tx:", ipaddr, out
+
+ elif cmd == 'peers':
+ out = repr(self.irc.get_peers())
+
+ else:
+ out = None
+
+ return out
+
+
+ def clean_session_thread(self):
+ while not self.shared.stopped():
+ time.sleep(30)
+ t = time.time()
+ for k,s in self.sessions.items():
+ if s.get('type') == 'persistent': continue
+ t0 = s['last_time']
+ if t - t0 > 5*60:
+ self.sessions.pop(k)
+ print "lost session", k
+
+ def run(self):
+ thread.start_new_thread(self.clean_session_thread, ())
+
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind((self.host, self.port))
+ s.listen(1)
+ while not self.shared.stopped():
+ conn, addr = s.accept()
+ try:
+ thread.start_new_thread(self.native_client_thread, (addr, conn,))
+ except:
+ # can't start new thread if there is no memory..
+ traceback.print_exc(file=sys.stdout)
+