create dispatcher class; redefine processors as threads
[electrum-server.git] / transports / native.py
1 import thread, threading, time, socket, traceback, ast, sys
2
3
4
5 def random_string(N):
6     import random, string
7     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
8
9 def timestr():
10     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
11
12
13 class NativeServer(threading.Thread):
14
15     def __init__(self, shared, abe, irc, banner, host, port):
16         threading.Thread.__init__(self)
17         self.banner = banner
18         self.abe = abe
19         self.store = abe.store
20         self.irc = irc
21         self.sessions = {}
22         self.host = host
23         self.port = port
24         self.daemon = True
25         self.shared = shared
26
27
28     def modified_addresses(self,a_session):
29         import copy
30         session = copy.deepcopy(a_session)
31         addresses = session['addresses']
32         session['last_time'] = time.time()
33         ret = {}
34         k = 0
35         for addr in addresses:
36             status = self.store.get_status( addr )
37             msg_id, last_status = addresses.get( addr )
38             if last_status != status:
39                 addresses[addr] = msg_id, status
40                 ret[addr] = status
41
42         return ret, addresses
43
44
45     def poll_session(self, session_id): 
46         session = self.sessions.get(session_id)
47         if session is None:
48             print time.asctime(), "session not found", session_id
49             return -1, {}
50         else:
51             self.sessions[session_id]['last_time'] = time.time()
52             ret, addresses = self.modified_addresses(session)
53             if ret: self.sessions[session_id]['addresses'] = addresses
54             return repr( (self.abe.block_number,ret))
55
56
57     def add_address_to_session(self, session_id, address):
58         status = self.store.get_status(address)
59         self.sessions[session_id]['addresses'][address] = ("", status)
60         self.sessions[session_id]['last_time'] = time.time()
61         return status
62
63
64     def new_session(self, version, addresses):
65         session_id = random_string(10)
66         self.sessions[session_id] = { 'addresses':{}, 'version':version }
67         for a in addresses:
68             self.sessions[session_id]['addresses'][a] = ('','')
69         out = repr( (session_id, self.banner.replace('\\n','\n') ) )
70         self.sessions[session_id]['last_time'] = time.time()
71         return out
72
73
74     def update_session(self, session_id,addresses):
75         """deprecated in 0.42, wad replaced by add_address_to_session"""
76         self.sessions[session_id]['addresses'] = {}
77         for a in addresses:
78             self.sessions[session_id]['addresses'][a] = ''
79         self.sessions[session_id]['last_time'] = time.time()
80         return 'ok'
81
82
83
84     def native_client_thread(self, ipaddr,conn):
85         try:
86             ipaddr = ipaddr[0]
87             msg = ''
88             while 1:
89                 d = conn.recv(1024)
90                 msg += d
91                 if not d: 
92                     break
93                 if '#' in msg:
94                     msg = msg.split('#', 1)[0]
95                     break
96             try:
97                 cmd, data = ast.literal_eval(msg)
98             except:
99                 print "syntax error", repr(msg), ipaddr
100                 conn.close()
101                 return
102
103             out = self.do_command(cmd, data, ipaddr)
104             if out:
105                 #print ipaddr, cmd, len(out)
106                 try:
107                     conn.send(out)
108                 except:
109                     print "error, could not send"
110
111         finally:
112             conn.close()
113
114
115
116     def do_command(self, cmd, data, ipaddr):
117
118         if cmd=='b':
119             out = "%d"%self.abe.block_number
120
121         elif cmd in ['session','new_session']:
122             try:
123                 if cmd == 'session':
124                     addresses = ast.literal_eval(data)
125                     version = "old"
126                 else:
127                     version, addresses = ast.literal_eval(data)
128                     if version[0]=="0": version = "v" + version
129             except:
130                 print "error", data
131                 return None
132             print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
133             out = self.new_session(version, addresses)
134
135         elif cmd=='address.subscribe':
136             try:
137                 session_id, addr = ast.literal_eval(data)
138             except:
139                 traceback.print_exc(file=sys.stdout)
140                 print data
141                 return None
142             out = self.add_address_to_session(session_id,addr)
143
144         elif cmd=='update_session':
145             try:
146                 session_id, addresses = ast.literal_eval(data)
147             except:
148                 traceback.print_exc(file=sys.stdout)
149                 return None
150             print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
151             out = self.update_session(session_id,addresses)
152             
153         elif cmd=='poll': 
154             out = self.poll_session(data)
155
156         elif cmd == 'h': 
157             address = data
158             out = repr( self.store.get_history( address ) )
159
160         elif cmd =='tx':
161             out = self.store.send_tx(data)
162             print timestr(), "sent tx:", ipaddr, out
163
164         elif cmd == 'peers':
165             out = repr(self.irc.get_peers())
166
167         else:
168             out = None
169
170         return out
171
172
173     def clean_session_thread(self):
174         while not self.shared.stopped():
175             time.sleep(30)
176             t = time.time()
177             for k,s in self.sessions.items():
178                 if s.get('type') == 'persistent': continue
179                 t0 = s['last_time']
180                 if t - t0 > 5*60:
181                     self.sessions.pop(k)
182                     print "lost session", k
183             
184     def run(self):
185         thread.start_new_thread(self.clean_session_thread, ())
186
187         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
188         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
189         s.bind((self.host, self.port)) 
190         s.listen(1)
191         while not self.shared.stopped():
192             conn, addr = s.accept()
193             try:
194                 thread.start_new_thread(self.native_client_thread, (addr, conn,))
195             except:
196                 # can't start new thread if there is no memory..
197                 traceback.print_exc(file=sys.stdout)
198