1 import thread, threading, time, socket, traceback, ast, sys
4 from processor import timestr, random_string
9 s = s.replace("'block_hash'", "'blk_hash'")
10 s = s.replace("'index'", "'pos'")
11 s = s.replace("'timestamp'", "'nTime'")
12 s = s.replace("'is_input'", "'is_in'")
13 s = s.replace("'raw_output_script'","'raw_scriptPubKey'")
17 class NativeServer(threading.Thread):
19 def __init__(self, shared, abe, irc, banner, host, port):
20 threading.Thread.__init__(self)
23 self.store = abe.store
32 def modified_addresses(self,a_session):
34 session = copy.deepcopy(a_session)
35 addresses = session['addresses']
36 session['last_time'] = time.time()
39 for addr in addresses.keys():
40 status = self.store.get_status( addr )
41 msg_id, last_status = addresses[addr]
42 if last_status != status:
43 addresses[addr] = msg_id, status
49 def poll_session(self, session_id):
50 session = self.sessions.get(session_id)
52 print time.asctime(), "session not found", session_id
55 self.sessions[session_id]['last_time'] = time.time()
56 ret, addresses = self.modified_addresses(session)
57 if ret: self.sessions[session_id]['addresses'] = addresses
58 return repr( (self.abe.block_number,ret))
61 def add_address_to_session(self, session_id, address):
62 status = self.store.get_status(address)
63 self.sessions[session_id]['addresses'][address] = ("", status)
64 self.sessions[session_id]['last_time'] = time.time()
68 def new_session(self, version, addresses):
69 session_id = random_string(10)
70 self.sessions[session_id] = { 'addresses':{}, 'version':version }
72 self.sessions[session_id]['addresses'][a] = ('','')
73 out = repr( (session_id, self.banner.replace('\\n','\n') ) )
74 self.sessions[session_id]['last_time'] = time.time()
78 def update_session(self, session_id,addresses):
79 """deprecated in 0.42, wad replaced by add_address_to_session"""
80 self.sessions[session_id]['addresses'] = {}
82 self.sessions[session_id]['addresses'][a] = ('','')
83 self.sessions[session_id]['last_time'] = time.time()
88 def native_client_thread(self, ipaddr,conn):
98 msg = msg.split('#', 1)[0]
101 cmd, data = ast.literal_eval(msg)
103 print "syntax error", repr(msg), ipaddr
107 out = self.do_command(cmd, data, ipaddr)
109 #print ipaddr, cmd, len(out)
113 print "error, could not send"
120 def do_command(self, cmd, data, ipaddr):
123 out = "%d"%self.abe.block_number
125 elif cmd in ['session','new_session']:
128 addresses = ast.literal_eval(data)
131 version, addresses = ast.literal_eval(data)
132 if version[0]=="0": version = "v" + version
136 print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
137 out = self.new_session(version, addresses)
139 elif cmd=='address.subscribe':
141 session_id, addr = ast.literal_eval(data)
143 traceback.print_exc(file=sys.stdout)
146 out = self.add_address_to_session(session_id,addr)
148 elif cmd=='update_session':
150 session_id, addresses = ast.literal_eval(data)
152 traceback.print_exc(file=sys.stdout)
154 print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
155 out = self.update_session(session_id,addresses)
158 out = self.poll_session(data)
162 out = repr( self.store.get_history( address ) )
163 out = new_to_old(out)
166 out = self.store.send_tx(data)
167 print timestr(), "sent tx:", ipaddr, out
170 out = repr(self.irc.get_peers())
178 def clean_session_thread(self):
179 while not self.shared.stopped():
182 for k,s in self.sessions.items():
183 if s.get('type') == 'persistent': continue
187 print "lost session", k
190 thread.start_new_thread(self.clean_session_thread, ())
192 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
193 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
194 s.bind((self.host, self.port))
196 while not self.shared.stopped():
197 conn, addr = s.accept()
199 thread.start_new_thread(self.native_client_thread, (addr, conn,))
201 # can't start new thread if there is no memory..
202 traceback.print_exc(file=sys.stdout)