fix return, debug flags
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import select
21 import time
22 import sys
23 import os
24 import threading
25 import traceback
26 import json
27 import Queue
28 from network import Network
29 from util import print_msg
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.socket.connect(('', 8000))
43         self.message_id = 0
44         self.unanswered_requests = {}
45         self.subscriptions = {}
46         self.debug = False
47         self.lock = threading.Lock()
48         
49
50     def parse_json(self, message):
51         s = message.find('\n')
52         if s==-1: 
53             return None, message
54         j = json.loads( message[0:s] )
55         return j, message[s+1:]
56
57
58     def run(self):
59         # read responses and trigger callbacks
60         message = ''
61         while True:
62             try:
63                 data = self.socket.recv(1024)
64             except:
65                 data = ''
66             if not data:
67                 break
68
69             message += data
70             while True:
71                 response, message = self.parse_json(message)
72                 if response is not None: 
73                     self.process(response)
74                 else:
75                     break
76
77         print "NetworkProxy: exiting"
78
79
80     def process(self, response):
81         # runs callbacks
82         if self.debug: print "<--", response
83
84         msg_id = response.get('id')
85         with self.lock: 
86             method, params, callback = self.unanswered_requests.pop(msg_id)
87
88         result = response.get('result')
89         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
90
91
92     def subscribe(self, messages, callback):
93         # detect if it is a subscription
94         with self.lock:
95             if self.subscriptions.get(callback) is None: 
96                 self.subscriptions[callback] = []
97             for message in messages:
98                 if message not in self.subscriptions[callback]:
99                     self.subscriptions[callback].append(message)
100
101         self.do_send( messages, callback )
102
103
104     def do_send(self, messages, callback):
105         """return the ids of the requests that we sent"""
106         out = ''
107         ids = []
108         for m in messages:
109             method, params = m 
110             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
111             self.unanswered_requests[self.message_id] = method, params, callback
112             ids.append(self.message_id)
113             if self.debug: print "-->", request
114             self.message_id += 1
115             out += request + '\n'
116         while out:
117             sent = self.socket.send( out )
118             out = out[sent:]
119         return ids
120
121
122     def synchronous_get(self, requests, timeout=100000000):
123         queue = Queue.Queue()
124         ids = self.do_send(requests, lambda i,x: queue.put(x))
125         id2 = ids[:]
126         res = {}
127         while ids:
128             r = queue.get(True, timeout)
129             _id = r.get('id')
130             if _id in ids:
131                 ids.remove(_id)
132                 res[_id] = r.get('result')
133         out = []
134         for _id in id2:
135             out.append(res[_id])
136         return out
137
138
139     def get_servers(self):
140         return self.synchronous_get([('network.get_servers',[])])[0]
141
142     def get_header(self, height):
143         return self.synchronous_get([('network.get_header',[height])])[0]
144
145     def get_local_height(self):
146         return self.synchronous_get([('network.get_local_height',[])])[0]
147
148     def is_connected(self):
149         return self.synchronous_get([('network.is_connected',[])])[0]
150
151     def is_up_to_date(self):
152         return self.synchronous_get([('network.is_up_to_date',[])])[0]
153
154     def main_server(self):
155         return self.synchronous_get([('network.main_server',[])])[0]
156
157     def stop(self):
158         return self.synchronous_get([('daemon.shutdown',[])])[0]
159
160
161     def trigger_callback(self, cb):
162         pass
163
164
165
166
167
168
169 class ClientThread(threading.Thread):
170     # read messages from client (socket), and sends them to Network
171     # responses are sent back on the same socket
172
173     def __init__(self, server, network, socket):
174         threading.Thread.__init__(self)
175         self.server = server
176         self.daemon = True
177         self.s = socket
178         self.s.settimeout(0.1)
179         self.network = network
180         self.queue = Queue.Queue()
181         self.unanswered_requests = {}
182         self.debug = False
183
184
185     def run(self):
186         message = ''
187         while True:
188             self.send_responses()
189             try:
190                 data = self.s.recv(1024)
191             except socket.timeout:
192                 continue
193
194             if not data:
195                 break
196             message += data
197
198             while True:
199                 cmd, message = self.parse_json(message)
200                 if not cmd:
201                     break
202                 self.process(cmd)
203
204         #print "client thread terminating"
205
206
207     def parse_json(self, message):
208         n = message.find('\n')
209         if n==-1: 
210             return None, message
211         j = json.loads( message[0:n] )
212         return j, message[n+1:]
213
214
215     def process(self, request):
216         if self.debug: print "<--", request
217         method = request['method']
218         params = request['params']
219         _id = request['id']
220
221         if method.startswith('network.'):
222             out = {'id':_id}
223             try:
224                 f = getattr(self.network, method[8:])
225             except AttributeError:
226                 out['error'] = "unknown method"
227             try:
228                 out['result'] = f(*params)
229             except BaseException as e:
230                 out['error'] =str(e)
231             self.queue.put(out) 
232             return
233
234         if method == 'daemon.shutdown':
235             self.server.running = False
236             self.queue.put({'id':_id, 'result':True})
237             return
238
239         def cb(i,r):
240             _id = r.get('id')
241             if _id is not None:
242                 my_id = self.unanswered_requests.pop(_id)
243                 r['id'] = my_id
244             self.queue.put(r)
245
246         new_id = self.network.interface.send([(method, params)], cb) [0]
247         self.unanswered_requests[new_id] = _id
248
249
250     def send_responses(self):
251         while True:
252             try:
253                 r = self.queue.get_nowait()
254             except Queue.Empty:
255                 break
256             out = json.dumps(r) + '\n'
257             while out:
258                 n = self.s.send(out)
259                 out = out[n:]
260             if self.debug: print "-->", r
261         
262
263
264
265 class NetworkServer:
266
267     def __init__(self, config):
268         network = Network(config)
269         if not network.start(wait=True):
270             print_msg("Not connected, aborting.")
271             sys.exit(1)
272         self.network = network
273         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
274         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
275         self.server.bind(('', 8000))
276         self.server.listen(5)
277         self.server.settimeout(1)
278         self.running = False
279         self.timeout = 60
280
281
282     def main_loop(self):
283         self.running = True
284         t = time.time()
285         while self.running:
286             try:
287                 connection, address = self.server.accept()
288             except socket.timeout:
289                 if time.time() - t > self.timeout:
290                     break
291                 continue
292             t = time.time()
293             client = ClientThread(self, self.network, connection)
294             client.start()
295
296
297
298 def start_daemon(config):
299     pid = os.fork()
300     if (pid == 0): # The first child.
301         os.chdir("/")
302         os.setsid()
303         os.umask(0)
304         pid2 = os.fork()
305         if (pid2 == 0):  # Second child
306             server = NetworkServer(config)
307             try:
308                 server.main_loop()
309             except KeyboardInterrupt:
310                 print "Ctrl C - Stopping server"
311             sys.exit(1)
312
313         sys.exit(0)
314
315     # should use a signal
316     time.sleep(2)
317
318
319
320 if __name__ == '__main__':
321     import simple_config
322     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
323     server = NetworkServer(config)
324     try:
325         server.main_loop()
326     except KeyboardInterrupt:
327         print "Ctrl C - Stopping server"
328         sys.exit(1)