2cc43131e42a9a81199c8d4a5debf74e0497a90e
[electrum-server.git] / transports / native.py
1 import thread, threading, time, socket, traceback, ast, sys
2
3
4 from processor import timestr, random_string
5
6
7
8 def new_to_old(s):
9     s = s.replace("'block_hash'", "'blk_hash'")
10     s = s.replace("'index'", "'pos'")
11     s = s.replace("'timestamp'", "'nTime'")
12     s = s.replace("'is_input'", "'is_in'")
13     s = s.replace("'raw_output_script'","'raw_scriptPubKey'")
14     return s
15
16
17 class NativeServer(threading.Thread):
18
19     def __init__(self, shared, abe, irc, banner, host, port):
20         threading.Thread.__init__(self)
21         self.banner = banner
22         self.abe = abe
23         self.store = abe.store
24         self.irc = irc
25         self.sessions = {}
26         self.host = host
27         self.port = port
28         self.daemon = True
29         self.shared = shared
30
31
32     def modified_addresses(self,a_session):
33         import copy
34         session = copy.deepcopy(a_session)
35         addresses = session['addresses']
36         session['last_time'] = time.time()
37         ret = {}
38         k = 0
39         for addr in addresses.keys():
40             status = self.store.get_status( addr )
41             msg_id, last_status = addresses[addr]
42             if last_status != status:
43                 addresses[addr] = msg_id, status
44                 ret[addr] = status
45
46         return ret, addresses
47
48
49     def poll_session(self, session_id): 
50         session = self.sessions.get(session_id)
51         if session is None:
52             print time.asctime(), "session not found", session_id
53             return -1, {}
54         else:
55             self.sessions[session_id]['last_time'] = time.time()
56             ret, addresses = self.modified_addresses(session)
57             if ret: self.sessions[session_id]['addresses'] = addresses
58             return repr( (self.abe.block_number,ret))
59
60
61     def add_address_to_session(self, session_id, address):
62         status = self.store.get_status(address)
63         self.sessions[session_id]['addresses'][address] = ("", status)
64         self.sessions[session_id]['last_time'] = time.time()
65         return status
66
67
68     def new_session(self, version, addresses):
69         session_id = random_string(10)
70         self.sessions[session_id] = { 'addresses':{}, 'version':version }
71         for a in addresses:
72             self.sessions[session_id]['addresses'][a] = ('','')
73         out = repr( (session_id, self.banner.replace('\\n','\n') ) )
74         self.sessions[session_id]['last_time'] = time.time()
75         return out
76
77
78     def update_session(self, session_id,addresses):
79         """deprecated in 0.42, wad replaced by add_address_to_session"""
80         self.sessions[session_id]['addresses'] = {}
81         for a in addresses:
82             self.sessions[session_id]['addresses'][a] = ('','')
83         self.sessions[session_id]['last_time'] = time.time()
84         return 'ok'
85
86
87
88     def native_client_thread(self, ipaddr,conn):
89         try:
90             ipaddr = ipaddr[0]
91             msg = ''
92             while 1:
93                 d = conn.recv(1024)
94                 msg += d
95                 if not d: 
96                     break
97                 if '#' in msg:
98                     msg = msg.split('#', 1)[0]
99                     break
100             try:
101                 cmd, data = ast.literal_eval(msg)
102             except:
103                 print "syntax error", repr(msg), ipaddr
104                 conn.close()
105                 return
106
107             out = self.do_command(cmd, data, ipaddr)
108             if out:
109                 #print ipaddr, cmd, len(out)
110                 try:
111                     conn.send(out)
112                 except:
113                     print "error, could not send"
114
115         finally:
116             conn.close()
117
118
119
120     def do_command(self, cmd, data, ipaddr):
121
122         if cmd=='b':
123             out = "%d"%self.abe.block_number
124
125         elif cmd in ['session','new_session']:
126             try:
127                 if cmd == 'session':
128                     addresses = ast.literal_eval(data)
129                     version = "old"
130                 else:
131                     version, addresses = ast.literal_eval(data)
132                     if version[0]=="0": version = "v" + version
133             except:
134                 print "error", data
135                 return None
136             print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
137             out = self.new_session(version, addresses)
138
139         elif cmd=='address.subscribe':
140             try:
141                 session_id, addr = ast.literal_eval(data)
142             except:
143                 traceback.print_exc(file=sys.stdout)
144                 print data
145                 return None
146             out = self.add_address_to_session(session_id,addr)
147
148         elif cmd=='update_session':
149             try:
150                 session_id, addresses = ast.literal_eval(data)
151             except:
152                 traceback.print_exc(file=sys.stdout)
153                 return None
154             print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
155             out = self.update_session(session_id,addresses)
156             
157         elif cmd=='poll': 
158             out = self.poll_session(data)
159
160         elif cmd == 'h': 
161             address = data
162             out = repr( self.store.get_history( address ) )
163             out = new_to_old(out)
164
165         elif cmd =='tx':
166             out = self.store.send_tx(data)
167             print timestr(), "sent tx:", ipaddr, out
168
169         elif cmd == 'peers':
170             out = repr(self.irc.get_peers())
171
172         else:
173             out = None
174
175         return out
176
177
178     def clean_session_thread(self):
179         while not self.shared.stopped():
180             time.sleep(30)
181             t = time.time()
182             for k,s in self.sessions.items():
183                 if s.get('type') == 'persistent': continue
184                 t0 = s['last_time']
185                 if t - t0 > 5*60:
186                     self.sessions.pop(k)
187                     print "lost session", k
188             
189     def run(self):
190         thread.start_new_thread(self.clean_session_thread, ())
191
192         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
193         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
194         s.bind((self.host, self.port)) 
195         s.listen(1)
196         while not self.shared.stopped():
197             conn, addr = s.accept()
198             try:
199                 thread.start_new_thread(self.native_client_thread, (addr, conn,))
200             except:
201                 # can't start new thread if there is no memory..
202                 traceback.print_exc(file=sys.stdout)
203