8ed0a3644f874d29bd65234a611ba9a4d521e1b4
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import time
21 import sys
22 import os
23 import threading
24 import traceback
25 import json
26 import Queue
27 from network import Network
28 from util import print_msg, print_stderr
29 from simple_config import SimpleConfig
30
31 DAEMON_PORT=8001
32
33 class NetworkProxy(threading.Thread):
34     # connects to daemon
35     # sends requests, runs callbacks
36
37     def __init__(self, config = {}):
38         threading.Thread.__init__(self)
39         self.daemon = True
40         self.config = SimpleConfig(config) if type(config) == type({}) else config
41         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43         self.daemon_port = config.get('daemon_port', DAEMON_PORT)
44         self.message_id = 0
45         self.unanswered_requests = {}
46         self.subscriptions = {}
47         self.debug = False
48         self.lock = threading.Lock()
49         self.pending_transactions_for_notifications = []
50
51
52     def start(self, start_daemon=False):
53         daemon_started = False
54         while True:
55             try:
56                 self.socket.connect(('', self.daemon_port))
57                 threading.Thread.start(self)
58                 return True
59
60             except socket.error:
61                 if not start_daemon:
62                     return False
63
64                 elif not daemon_started:
65                     print_stderr( "Starting daemon [%s]"%self.config.get('server'))
66                     daemon_started = True
67                     pid = os.fork()
68                     if (pid == 0): # The first child.
69                         os.chdir("/")
70                         os.setsid()
71                         os.umask(0)
72                         pid2 = os.fork()
73                         if (pid2 == 0):  # Second child
74                             server = NetworkServer(self.config)
75                             try:
76                                 server.main_loop()
77                             except KeyboardInterrupt:
78                                 print "Ctrl C - Stopping server"
79                             sys.exit(1)
80                         sys.exit(0)
81                 else:
82                     time.sleep(0.1)
83
84
85
86     def parse_json(self, message):
87         s = message.find('\n')
88         if s==-1: 
89             return None, message
90         j = json.loads( message[0:s] )
91         return j, message[s+1:]
92
93
94     def run(self):
95         # read responses and trigger callbacks
96         message = ''
97         while True:
98             try:
99                 data = self.socket.recv(1024)
100             except:
101                 data = ''
102             if not data:
103                 break
104
105             message += data
106             while True:
107                 response, message = self.parse_json(message)
108                 if response is not None: 
109                     self.process(response)
110                 else:
111                     break
112
113         print "NetworkProxy: exiting"
114
115
116     def process(self, response):
117         # runs callbacks
118         if self.debug: print "<--", response
119
120         msg_id = response.get('id')
121         with self.lock: 
122             method, params, callback = self.unanswered_requests.pop(msg_id)
123
124         result = response.get('result')
125         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
126
127
128     def subscribe(self, messages, callback):
129         # detect if it is a subscription
130         with self.lock:
131             if self.subscriptions.get(callback) is None: 
132                 self.subscriptions[callback] = []
133             for message in messages:
134                 if message not in self.subscriptions[callback]:
135                     self.subscriptions[callback].append(message)
136
137         self.send( messages, callback )
138
139
140     def send(self, messages, callback):
141         """return the ids of the requests that we sent"""
142         out = ''
143         ids = []
144         for m in messages:
145             method, params = m 
146             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
147             self.unanswered_requests[self.message_id] = method, params, callback
148             ids.append(self.message_id)
149             if self.debug: print "-->", request
150             self.message_id += 1
151             out += request + '\n'
152         while out:
153             sent = self.socket.send( out )
154             out = out[sent:]
155         return ids
156
157
158     def synchronous_get(self, requests, timeout=100000000):
159         queue = Queue.Queue()
160         ids = self.send(requests, lambda i,x: queue.put(x))
161         id2 = ids[:]
162         res = {}
163         while ids:
164             r = queue.get(True, timeout)
165             _id = r.get('id')
166             if _id in ids:
167                 ids.remove(_id)
168                 res[_id] = r.get('result')
169         out = []
170         for _id in id2:
171             out.append(res[_id])
172         return out
173
174
175     def get_servers(self):
176         return self.synchronous_get([('network.get_servers',[])])[0]
177
178     def get_header(self, height):
179         return self.synchronous_get([('network.get_header',[height])])[0]
180
181     def get_local_height(self):
182         return self.synchronous_get([('network.get_local_height',[])])[0]
183
184     def is_connected(self):
185         return self.synchronous_get([('network.is_connected',[])])[0]
186
187     def is_up_to_date(self):
188         return self.synchronous_get([('network.is_up_to_date',[])])[0]
189
190     def main_server(self):
191         return self.synchronous_get([('network.main_server',[])])[0]
192
193     def stop(self):
194         return self.synchronous_get([('daemon.shutdown',[])])[0]
195
196
197     def trigger_callback(self, cb):
198         pass
199
200
201
202
203
204
205 class ClientThread(threading.Thread):
206     # read messages from client (socket), and sends them to Network
207     # responses are sent back on the same socket
208
209     def __init__(self, server, network, socket):
210         threading.Thread.__init__(self)
211         self.server = server
212         self.daemon = True
213         self.s = socket
214         self.s.settimeout(0.1)
215         self.network = network
216         self.queue = Queue.Queue()
217         self.unanswered_requests = {}
218         self.debug = False
219
220
221     def run(self):
222         message = ''
223         while True:
224             self.send_responses()
225             try:
226                 data = self.s.recv(1024)
227             except socket.timeout:
228                 continue
229
230             if not data:
231                 break
232             message += data
233
234             while True:
235                 cmd, message = self.parse_json(message)
236                 if not cmd:
237                     break
238                 self.process(cmd)
239
240         #print "client thread terminating"
241
242
243     def parse_json(self, message):
244         n = message.find('\n')
245         if n==-1: 
246             return None, message
247         j = json.loads( message[0:n] )
248         return j, message[n+1:]
249
250
251     def process(self, request):
252         if self.debug: print "<--", request
253         method = request['method']
254         params = request['params']
255         _id = request['id']
256
257         if method.startswith('network.'):
258             out = {'id':_id}
259             try:
260                 f = getattr(self.network, method[8:])
261             except AttributeError:
262                 out['error'] = "unknown method"
263             try:
264                 out['result'] = f(*params)
265             except BaseException as e:
266                 out['error'] =str(e)
267             self.queue.put(out) 
268             return
269
270         if method == 'daemon.shutdown':
271             self.server.running = False
272             self.queue.put({'id':_id, 'result':True})
273             return
274
275         def cb(i,r):
276             _id = r.get('id')
277             if _id is not None:
278                 my_id = self.unanswered_requests.pop(_id)
279                 r['id'] = my_id
280             self.queue.put(r)
281
282         new_id = self.network.interface.send([(method, params)], cb) [0]
283         self.unanswered_requests[new_id] = _id
284
285
286     def send_responses(self):
287         while True:
288             try:
289                 r = self.queue.get_nowait()
290             except Queue.Empty:
291                 break
292             out = json.dumps(r) + '\n'
293             while out:
294                 n = self.s.send(out)
295                 out = out[n:]
296             if self.debug: print "-->", r
297         
298
299
300
301 class NetworkServer:
302
303     def __init__(self, config):
304         network = Network(config)
305         if not network.start(wait=True):
306             print_msg("Not connected, aborting.")
307             sys.exit(1)
308         self.network = network
309         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
310         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
311         self.daemon_port = config.get('daemon_port', DAEMON_PORT)
312         self.server.bind(('', self.daemon_port))
313         self.server.listen(5)
314         self.server.settimeout(1)
315         self.running = False
316         self.timeout = config.get('daemon_timeout', 60)
317
318
319     def main_loop(self):
320         self.running = True
321         t = time.time()
322         while self.running:
323             try:
324                 connection, address = self.server.accept()
325             except socket.timeout:
326                 if time.time() - t > self.timeout:
327                     break
328                 continue
329             t = time.time()
330             client = ClientThread(self, self.network, connection)
331             client.start()
332
333
334
335 if __name__ == '__main__':
336     import simple_config
337     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
338     server = NetworkServer(config)
339     try:
340         server.main_loop()
341     except KeyboardInterrupt:
342         print "Ctrl C - Stopping server"
343         sys.exit(1)