1 import thread, threading, time, socket, traceback, ast, sys
7 return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
10 return time.strftime("[%d/%m/%Y-%H:%M:%S]")
14 s = s.replace("'block_hash'", "'blk_hash'")
15 s = s.replace("'index'", "'pos'")
16 s = s.replace("'timestamp'", "'nTime'")
17 s = s.replace("'is_input'", "'is_in'")
18 s = s.replace("'raw_output_script'","'raw_scriptPubKey'")
22 class NativeServer(threading.Thread):
24 def __init__(self, shared, abe, irc, banner, host, port):
25 threading.Thread.__init__(self)
28 self.store = abe.store
37 def modified_addresses(self,a_session):
39 session = copy.deepcopy(a_session)
40 addresses = session['addresses']
41 session['last_time'] = time.time()
44 for addr in addresses.keys():
45 status = self.store.get_status( addr )
46 msg_id, last_status = addresses[addr]
47 if last_status != status:
48 addresses[addr] = msg_id, status
54 def poll_session(self, session_id):
55 session = self.sessions.get(session_id)
57 print time.asctime(), "session not found", session_id
60 self.sessions[session_id]['last_time'] = time.time()
61 ret, addresses = self.modified_addresses(session)
62 if ret: self.sessions[session_id]['addresses'] = addresses
63 return repr( (self.abe.block_number,ret))
66 def add_address_to_session(self, session_id, address):
67 status = self.store.get_status(address)
68 self.sessions[session_id]['addresses'][address] = ("", status)
69 self.sessions[session_id]['last_time'] = time.time()
73 def new_session(self, version, addresses):
74 session_id = random_string(10)
75 self.sessions[session_id] = { 'addresses':{}, 'version':version }
77 self.sessions[session_id]['addresses'][a] = ('','')
78 out = repr( (session_id, self.banner.replace('\\n','\n') ) )
79 self.sessions[session_id]['last_time'] = time.time()
83 def update_session(self, session_id,addresses):
84 """deprecated in 0.42, wad replaced by add_address_to_session"""
85 self.sessions[session_id]['addresses'] = {}
87 self.sessions[session_id]['addresses'][a] = ('','')
88 self.sessions[session_id]['last_time'] = time.time()
93 def native_client_thread(self, ipaddr,conn):
103 msg = msg.split('#', 1)[0]
106 cmd, data = ast.literal_eval(msg)
108 print "syntax error", repr(msg), ipaddr
112 out = self.do_command(cmd, data, ipaddr)
114 #print ipaddr, cmd, len(out)
118 print "error, could not send"
125 def do_command(self, cmd, data, ipaddr):
128 out = "%d"%self.abe.block_number
130 elif cmd in ['session','new_session']:
133 addresses = ast.literal_eval(data)
136 version, addresses = ast.literal_eval(data)
137 if version[0]=="0": version = "v" + version
141 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
142 out = self.new_session(version, addresses)
144 elif cmd=='address.subscribe':
146 session_id, addr = ast.literal_eval(data)
148 traceback.print_exc(file=sys.stdout)
151 out = self.add_address_to_session(session_id,addr)
153 elif cmd=='update_session':
155 session_id, addresses = ast.literal_eval(data)
157 traceback.print_exc(file=sys.stdout)
159 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
160 out = self.update_session(session_id,addresses)
163 out = self.poll_session(data)
167 out = repr( self.store.get_history( address ) )
168 out = new_to_old(out)
171 out = self.store.send_tx(data)
172 print timestr(), "sent tx:", ipaddr, out
175 out = repr(self.irc.get_peers())
183 def clean_session_thread(self):
184 while not self.shared.stopped():
187 for k,s in self.sessions.items():
188 if s.get('type') == 'persistent': continue
192 print "lost session", k
195 thread.start_new_thread(self.clean_session_thread, ())
197 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
198 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
199 s.bind((self.host, self.port))
201 while not self.shared.stopped():
202 conn, addr = s.accept()
204 thread.start_new_thread(self.native_client_thread, (addr, conn,))
206 # can't start new thread if there is no memory..
207 traceback.print_exc(file=sys.stdout)