rename do_send as send. fixes #645
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import time
21 import sys
22 import os
23 import threading
24 import traceback
25 import json
26 import Queue
27 from network import Network
28 from util import print_msg
29 from simple_config import SimpleConfig
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config = {}):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = SimpleConfig(config) if type(config) == type({}) else config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.daemon_port = config.get('daemon_port', 8000)
43         self.message_id = 0
44         self.unanswered_requests = {}
45         self.subscriptions = {}
46         self.debug = False
47         self.lock = threading.Lock()
48
49
50     def start(self, start_daemon=False):
51         daemon_started = False
52         while True:
53             try:
54                 self.socket.connect(('', self.daemon_port))
55                 threading.Thread.start(self)
56                 return True
57
58             except socket.error:
59                 if not start_daemon:
60                     return False
61
62                 elif not daemon_started:
63                     print "Starting daemon [%s]"%self.config.get('server')
64                     daemon_started = True
65                     pid = os.fork()
66                     if (pid == 0): # The first child.
67                         os.chdir("/")
68                         os.setsid()
69                         os.umask(0)
70                         pid2 = os.fork()
71                         if (pid2 == 0):  # Second child
72                             server = NetworkServer(self.config)
73                             try:
74                                 server.main_loop()
75                             except KeyboardInterrupt:
76                                 print "Ctrl C - Stopping server"
77                             sys.exit(1)
78                         sys.exit(0)
79                 else:
80                     time.sleep(0.1)
81
82
83
84     def parse_json(self, message):
85         s = message.find('\n')
86         if s==-1: 
87             return None, message
88         j = json.loads( message[0:s] )
89         return j, message[s+1:]
90
91
92     def run(self):
93         # read responses and trigger callbacks
94         message = ''
95         while True:
96             try:
97                 data = self.socket.recv(1024)
98             except:
99                 data = ''
100             if not data:
101                 break
102
103             message += data
104             while True:
105                 response, message = self.parse_json(message)
106                 if response is not None: 
107                     self.process(response)
108                 else:
109                     break
110
111         print "NetworkProxy: exiting"
112
113
114     def process(self, response):
115         # runs callbacks
116         if self.debug: print "<--", response
117
118         msg_id = response.get('id')
119         with self.lock: 
120             method, params, callback = self.unanswered_requests.pop(msg_id)
121
122         result = response.get('result')
123         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
124
125
126     def subscribe(self, messages, callback):
127         # detect if it is a subscription
128         with self.lock:
129             if self.subscriptions.get(callback) is None: 
130                 self.subscriptions[callback] = []
131             for message in messages:
132                 if message not in self.subscriptions[callback]:
133                     self.subscriptions[callback].append(message)
134
135         self.send( messages, callback )
136
137
138     def send(self, messages, callback):
139         """return the ids of the requests that we sent"""
140         out = ''
141         ids = []
142         for m in messages:
143             method, params = m 
144             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
145             self.unanswered_requests[self.message_id] = method, params, callback
146             ids.append(self.message_id)
147             if self.debug: print "-->", request
148             self.message_id += 1
149             out += request + '\n'
150         while out:
151             sent = self.socket.send( out )
152             out = out[sent:]
153         return ids
154
155
156     def synchronous_get(self, requests, timeout=100000000):
157         queue = Queue.Queue()
158         ids = self.send(requests, lambda i,x: queue.put(x))
159         id2 = ids[:]
160         res = {}
161         while ids:
162             r = queue.get(True, timeout)
163             _id = r.get('id')
164             if _id in ids:
165                 ids.remove(_id)
166                 res[_id] = r.get('result')
167         out = []
168         for _id in id2:
169             out.append(res[_id])
170         return out
171
172
173     def get_servers(self):
174         return self.synchronous_get([('network.get_servers',[])])[0]
175
176     def get_header(self, height):
177         return self.synchronous_get([('network.get_header',[height])])[0]
178
179     def get_local_height(self):
180         return self.synchronous_get([('network.get_local_height',[])])[0]
181
182     def is_connected(self):
183         return self.synchronous_get([('network.is_connected',[])])[0]
184
185     def is_up_to_date(self):
186         return self.synchronous_get([('network.is_up_to_date',[])])[0]
187
188     def main_server(self):
189         return self.synchronous_get([('network.main_server',[])])[0]
190
191     def stop(self):
192         return self.synchronous_get([('daemon.shutdown',[])])[0]
193
194
195     def trigger_callback(self, cb):
196         pass
197
198
199
200
201
202
203 class ClientThread(threading.Thread):
204     # read messages from client (socket), and sends them to Network
205     # responses are sent back on the same socket
206
207     def __init__(self, server, network, socket):
208         threading.Thread.__init__(self)
209         self.server = server
210         self.daemon = True
211         self.s = socket
212         self.s.settimeout(0.1)
213         self.network = network
214         self.queue = Queue.Queue()
215         self.unanswered_requests = {}
216         self.debug = False
217
218
219     def run(self):
220         message = ''
221         while True:
222             self.send_responses()
223             try:
224                 data = self.s.recv(1024)
225             except socket.timeout:
226                 continue
227
228             if not data:
229                 break
230             message += data
231
232             while True:
233                 cmd, message = self.parse_json(message)
234                 if not cmd:
235                     break
236                 self.process(cmd)
237
238         #print "client thread terminating"
239
240
241     def parse_json(self, message):
242         n = message.find('\n')
243         if n==-1: 
244             return None, message
245         j = json.loads( message[0:n] )
246         return j, message[n+1:]
247
248
249     def process(self, request):
250         if self.debug: print "<--", request
251         method = request['method']
252         params = request['params']
253         _id = request['id']
254
255         if method.startswith('network.'):
256             out = {'id':_id}
257             try:
258                 f = getattr(self.network, method[8:])
259             except AttributeError:
260                 out['error'] = "unknown method"
261             try:
262                 out['result'] = f(*params)
263             except BaseException as e:
264                 out['error'] =str(e)
265             self.queue.put(out) 
266             return
267
268         if method == 'daemon.shutdown':
269             self.server.running = False
270             self.queue.put({'id':_id, 'result':True})
271             return
272
273         def cb(i,r):
274             _id = r.get('id')
275             if _id is not None:
276                 my_id = self.unanswered_requests.pop(_id)
277                 r['id'] = my_id
278             self.queue.put(r)
279
280         new_id = self.network.interface.send([(method, params)], cb) [0]
281         self.unanswered_requests[new_id] = _id
282
283
284     def send_responses(self):
285         while True:
286             try:
287                 r = self.queue.get_nowait()
288             except Queue.Empty:
289                 break
290             out = json.dumps(r) + '\n'
291             while out:
292                 n = self.s.send(out)
293                 out = out[n:]
294             if self.debug: print "-->", r
295         
296
297
298
299 class NetworkServer:
300
301     def __init__(self, config):
302         network = Network(config)
303         if not network.start(wait=True):
304             print_msg("Not connected, aborting.")
305             sys.exit(1)
306         self.network = network
307         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
308         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
309         self.daemon_port = config.get('daemon_port', 8000)
310         self.server.bind(('', self.daemon_port))
311         self.server.listen(5)
312         self.server.settimeout(1)
313         self.running = False
314         self.timeout = config.get('daemon_timeout', 60)
315
316
317     def main_loop(self):
318         self.running = True
319         t = time.time()
320         while self.running:
321             try:
322                 connection, address = self.server.accept()
323             except socket.timeout:
324                 if time.time() - t > self.timeout:
325                     break
326                 continue
327             t = time.time()
328             client = ClientThread(self, self.network, connection)
329             client.start()
330
331
332
333 if __name__ == '__main__':
334     import simple_config
335     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
336     server = NetworkServer(config)
337     try:
338         server.main_loop()
339     except KeyboardInterrupt:
340         print "Ctrl C - Stopping server"
341         sys.exit(1)