1b6a6ec08360eaf117f43455259310e06a333c50
[electrum-server.git] / native.py
1 import thread, threading, time, socket, traceback, ast
2
3
4
5 def random_string(N):
6     import random, string
7     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
8
9 def timestr():
10     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
11
12
13 class NativeServer(threading.Thread):
14
15     def __init__(self, shared, store, irc, banner, host, port):
16         threading.Thread.__init__(self)
17         self.banner = banner
18         self.store = store
19         self.irc = irc
20         self.sessions = {}
21         self.host = host
22         self.port = port
23         self.daemon = True
24         self.shared = shared
25
26
27     def modified_addresses(self,a_session):
28         import copy
29         session = copy.deepcopy(a_session)
30         addresses = session['addresses']
31         session['last_time'] = time.time()
32         ret = {}
33         k = 0
34         for addr in addresses:
35             status = self.store.get_status( addr )
36             msg_id, last_status = addresses.get( addr )
37             if last_status != status:
38                 addresses[addr] = msg_id, status
39                 ret[addr] = status
40
41         return ret, addresses
42
43
44     def poll_session(self, session_id): 
45         session = self.sessions.get(session_id)
46         if session is None:
47             print time.asctime(), "session not found", session_id
48             return -1, {}
49         else:
50             self.sessions[session_id]['last_time'] = time.time()
51             ret, addresses = self.modified_addresses(session)
52             if ret: self.sessions[session_id]['addresses'] = addresses
53             return repr( (self.store.block_number,ret))
54
55
56     def add_address_to_session(self, session_id, address):
57         status = self.store.get_status(address)
58         self.sessions[session_id]['addresses'][address] = ("", status)
59         self.sessions[session_id]['last_time'] = time.time()
60         return status
61
62
63     def new_session(self, version, addresses):
64         session_id = random_string(10)
65         self.sessions[session_id] = { 'addresses':{}, 'version':version }
66         for a in addresses:
67             self.sessions[session_id]['addresses'][a] = ('','')
68         out = repr( (session_id, self.banner.replace('\\n','\n') ) )
69         self.sessions[session_id]['last_time'] = time.time()
70         return out
71
72
73     def update_session(self, session_id,addresses):
74         """deprecated in 0.42, wad replaced by add_address_to_session"""
75         self.sessions[session_id]['addresses'] = {}
76         for a in addresses:
77             self.sessions[session_id]['addresses'][a] = ''
78         self.sessions[session_id]['last_time'] = time.time()
79         return 'ok'
80
81
82
83     def native_client_thread(self, ipaddr,conn):
84         try:
85             ipaddr = ipaddr[0]
86             msg = ''
87             while 1:
88                 d = conn.recv(1024)
89                 msg += d
90                 if not d: 
91                     break
92                 if '#' in msg:
93                     msg = msg.split('#', 1)[0]
94                     break
95             try:
96                 cmd, data = ast.literal_eval(msg)
97             except:
98                 print "syntax error", repr(msg), ipaddr
99                 conn.close()
100                 return
101
102             out = self.do_command(cmd, data, ipaddr)
103             if out:
104                 #print ipaddr, cmd, len(out)
105                 try:
106                     conn.send(out)
107                 except:
108                     print "error, could not send"
109
110         finally:
111             conn.close()
112
113
114
115     def do_command(self, cmd, data, ipaddr):
116
117         if cmd=='b':
118             out = "%d"%block_number
119
120         elif cmd in ['session','new_session']:
121             try:
122                 if cmd == 'session':
123                     addresses = ast.literal_eval(data)
124                     version = "old"
125                 else:
126                     version, addresses = ast.literal_eval(data)
127                     if version[0]=="0": version = "v" + version
128             except:
129                 print "error", data
130                 return None
131             print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
132             out = self.new_session(version, addresses)
133
134         elif cmd=='address.subscribe':
135             try:
136                 session_id, addr = ast.literal_eval(data)
137             except:
138                 traceback.print_exc(file=sys.stdout)
139                 print data
140                 return None
141             out = self.add_address_to_session(session_id,addr)
142
143         elif cmd=='update_session':
144             try:
145                 session_id, addresses = ast.literal_eval(data)
146             except:
147                 traceback.print_exc(file=sys.stdout)
148                 return None
149             print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
150             out = self.update_session(session_id,addresses)
151             
152         elif cmd=='poll': 
153             out = self.poll_session(data)
154
155         elif cmd == 'h': 
156             address = data
157             out = repr( self.store.get_history( address ) )
158
159         elif cmd =='tx':
160             out = self.store.send_tx(data)
161             print timestr(), "sent tx:", ipaddr, out
162
163         elif cmd == 'peers':
164             out = repr(self.irc.get_peers())
165
166         else:
167             out = None
168
169         return out
170
171
172     def clean_session_thread(self):
173         while not self.shared.stopped():
174             time.sleep(30)
175             t = time.time()
176             for k,s in self.sessions.items():
177                 if s.get('type') == 'persistent': continue
178                 t0 = s['last_time']
179                 if t - t0 > 5*60:
180                     self.sessions.pop(k)
181                     print "lost session", k
182             
183     def run(self):
184         thread.start_new_thread(self.clean_session_thread, ())
185
186         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
188         s.bind((self.host, self.port)) 
189         s.listen(1)
190         while not self.shared.stopped():
191             conn, addr = s.accept()
192             try:
193                 thread.start_new_thread(self.native_client_thread, (addr, conn,))
194             except:
195                 # can't start new thread if there is no memory..
196                 traceback.print_exc(file=sys.stdout)
197