2ed7ff92db091ff537b2b2b68e8aff343ff8a04e
[electrum-server.git] / transports / native.py
1 import thread, threading, time, socket, traceback, ast, sys
2
3
4
5 def random_string(N):
6     import random, string
7     return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
8
9 def timestr():
10     return time.strftime("[%d/%m/%Y-%H:%M:%S]")
11
12
13 def new_to_old(s):
14     s = s.replace("'block_hash'", "'blk_hash'")
15     s = s.replace("'index'", "'pos'")
16     s = s.replace("'timestamp'", "'nTime'")
17     s = s.replace("'is_input'", "'is_in'")
18     s = s.replace("'raw_output_script'","'raw_scriptPubKey'")
19     return s
20
21
22 class NativeServer(threading.Thread):
23
24     def __init__(self, shared, abe, irc, banner, host, port):
25         threading.Thread.__init__(self)
26         self.banner = banner
27         self.abe = abe
28         self.store = abe.store
29         self.irc = irc
30         self.sessions = {}
31         self.host = host
32         self.port = port
33         self.daemon = True
34         self.shared = shared
35
36
37     def modified_addresses(self,a_session):
38         import copy
39         session = copy.deepcopy(a_session)
40         addresses = session['addresses']
41         session['last_time'] = time.time()
42         ret = {}
43         k = 0
44         for addr in addresses.keys():
45             status = self.store.get_status( addr )
46             msg_id, last_status = addresses[addr]
47             if last_status != status:
48                 addresses[addr] = msg_id, status
49                 ret[addr] = status
50
51         return ret, addresses
52
53
54     def poll_session(self, session_id): 
55         session = self.sessions.get(session_id)
56         if session is None:
57             print time.asctime(), "session not found", session_id
58             return -1, {}
59         else:
60             self.sessions[session_id]['last_time'] = time.time()
61             ret, addresses = self.modified_addresses(session)
62             if ret: self.sessions[session_id]['addresses'] = addresses
63             return repr( (self.abe.block_number,ret))
64
65
66     def add_address_to_session(self, session_id, address):
67         status = self.store.get_status(address)
68         self.sessions[session_id]['addresses'][address] = ("", status)
69         self.sessions[session_id]['last_time'] = time.time()
70         return status
71
72
73     def new_session(self, version, addresses):
74         session_id = random_string(10)
75         self.sessions[session_id] = { 'addresses':{}, 'version':version }
76         for a in addresses:
77             self.sessions[session_id]['addresses'][a] = ('','')
78         out = repr( (session_id, self.banner.replace('\\n','\n') ) )
79         self.sessions[session_id]['last_time'] = time.time()
80         return out
81
82
83     def update_session(self, session_id,addresses):
84         """deprecated in 0.42, wad replaced by add_address_to_session"""
85         self.sessions[session_id]['addresses'] = {}
86         for a in addresses:
87             self.sessions[session_id]['addresses'][a] = ('','')
88         self.sessions[session_id]['last_time'] = time.time()
89         return 'ok'
90
91
92
93     def native_client_thread(self, ipaddr,conn):
94         try:
95             ipaddr = ipaddr[0]
96             msg = ''
97             while 1:
98                 d = conn.recv(1024)
99                 msg += d
100                 if not d: 
101                     break
102                 if '#' in msg:
103                     msg = msg.split('#', 1)[0]
104                     break
105             try:
106                 cmd, data = ast.literal_eval(msg)
107             except:
108                 print "syntax error", repr(msg), ipaddr
109                 conn.close()
110                 return
111
112             out = self.do_command(cmd, data, ipaddr)
113             if out:
114                 #print ipaddr, cmd, len(out)
115                 try:
116                     conn.send(out)
117                 except:
118                     print "error, could not send"
119
120         finally:
121             conn.close()
122
123
124
125     def do_command(self, cmd, data, ipaddr):
126
127         if cmd=='b':
128             out = "%d"%self.abe.block_number
129
130         elif cmd in ['session','new_session']:
131             try:
132                 if cmd == 'session':
133                     addresses = ast.literal_eval(data)
134                     version = "old"
135                 else:
136                     version, addresses = ast.literal_eval(data)
137                     if version[0]=="0": version = "v" + version
138             except:
139                 print "error", data
140                 return None
141             print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
142             out = self.new_session(version, addresses)
143
144         elif cmd=='address.subscribe':
145             try:
146                 session_id, addr = ast.literal_eval(data)
147             except:
148                 traceback.print_exc(file=sys.stdout)
149                 print data
150                 return None
151             out = self.add_address_to_session(session_id,addr)
152
153         elif cmd=='update_session':
154             try:
155                 session_id, addresses = ast.literal_eval(data)
156             except:
157                 traceback.print_exc(file=sys.stdout)
158                 return None
159             print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
160             out = self.update_session(session_id,addresses)
161             
162         elif cmd=='poll': 
163             out = self.poll_session(data)
164
165         elif cmd == 'h': 
166             address = data
167             out = repr( self.store.get_history( address ) )
168             out = new_to_old(out)
169
170         elif cmd =='tx':
171             out = self.store.send_tx(data)
172             print timestr(), "sent tx:", ipaddr, out
173
174         elif cmd == 'peers':
175             out = repr(self.irc.get_peers())
176
177         else:
178             out = None
179
180         return out
181
182
183     def clean_session_thread(self):
184         while not self.shared.stopped():
185             time.sleep(30)
186             t = time.time()
187             for k,s in self.sessions.items():
188                 if s.get('type') == 'persistent': continue
189                 t0 = s['last_time']
190                 if t - t0 > 5*60:
191                     self.sessions.pop(k)
192                     print "lost session", k
193             
194     def run(self):
195         thread.start_new_thread(self.clean_session_thread, ())
196
197         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
198         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
199         s.bind((self.host, self.port)) 
200         s.listen(1)
201         while not self.shared.stopped():
202             conn, addr = s.accept()
203             try:
204                 thread.start_new_thread(self.native_client_thread, (addr, conn,))
205             except:
206                 # can't start new thread if there is no memory..
207                 traceback.print_exc(file=sys.stdout)
208