add missing Network methods to NetworkProxy
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import select
21 import time
22 import sys
23 import os
24 import threading
25 import traceback
26 import json
27 import Queue
28 from network import Network
29
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.socket.connect(('', 8000))
43         self.message_id = 0
44         self.unanswered_requests = {}
45         self.subscriptions = {}
46         self.debug = True
47         self.lock = threading.Lock()
48         
49
50     def parse_json(self, message):
51         s = message.find('\n')
52         if s==-1: 
53             return None, message
54         j = json.loads( message[0:s] )
55         return j, message[s+1:]
56
57
58     def run(self):
59         # read responses and trigger callbacks
60         message = ''
61         while True:
62             try:
63                 data = self.socket.recv(1024)
64             except:
65                 data = ''
66             if not data:
67                 break
68
69             message += data
70             while True:
71                 response, message = self.parse_json(message)
72                 if response is not None: 
73                     self.process(response)
74                 else:
75                     break
76
77         print "NetworkProxy: exiting"
78
79
80     def process(self, response):
81         # runs callbacks
82         #print "<--", response
83
84         msg_id = response.get('id')
85         with self.lock: 
86             method, params, callback = self.unanswered_requests.pop(msg_id)
87
88         result = response.get('result')
89         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
90
91
92     def subscribe(self, messages, callback):
93         # detect if it is a subscription
94         with self.lock:
95             if self.subscriptions.get(callback) is None: 
96                 self.subscriptions[callback] = []
97             for message in messages:
98                 if message not in self.subscriptions[callback]:
99                     self.subscriptions[callback].append(message)
100
101         self.do_send( messages, callback )
102
103
104     def do_send(self, messages, callback):
105         """return the ids of the requests that we sent"""
106         out = ''
107         ids = []
108         for m in messages:
109             method, params = m 
110             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
111             self.unanswered_requests[self.message_id] = method, params, callback
112             ids.append(self.message_id)
113             # print "-->", request
114             self.message_id += 1
115             out += request + '\n'
116         while out:
117             sent = self.socket.send( out )
118             out = out[sent:]
119         return ids
120
121
122     def synchronous_get(self, requests, timeout=100000000):
123         queue = Queue.Queue()
124         ids = self.do_send(requests, lambda i,x: queue.put(x))
125         id2 = ids[:]
126         res = {}
127         while ids:
128             r = queue.get(True, timeout)
129             _id = r.get('id')
130             if _id in ids:
131                 ids.remove(_id)
132                 res[_id] = r.get('result')
133         out = []
134         for _id in id2:
135             out.append(res[_id])
136         return out
137
138
139     def get_servers(self):
140         return self.synchronous_get([('network.get_servers',[])])[0]
141
142     def get_header(self, height):
143         return self.synchronous_get([('network.get_header',[height])])[0]
144
145     def get_local_height(self):
146         return self.synchronous_get([('network.get_local_height',[])])[0]
147
148     def is_connected(self):
149         return self.synchronous_get([('network.is_connected',[])])[0]
150
151     def is_up_to_date(self):
152         return self.synchronous_get([('network.is_up_to_date',[])])[0]
153
154     def main_server(self):
155         return self.synchronous_get([('network.main_server',[])])[0]
156
157     def stop(self):
158         return self.synchronous_get([('daemon.shutdown',[])])[0]
159
160
161     def trigger_callback(self, cb):
162         pass
163
164
165
166
167
168
169 class ClientThread(threading.Thread):
170     # read messages from client (socket), and sends them to Network
171     # responses are sent back on the same socket
172
173     def __init__(self, server, network, socket):
174         threading.Thread.__init__(self)
175         self.server = server
176         self.daemon = True
177         self.s = socket
178         self.s.settimeout(0.1)
179         self.network = network
180         self.queue = Queue.Queue()
181         self.unanswered_requests = {}
182
183
184     def run(self):
185         message = ''
186         while True:
187             self.send_responses()
188             try:
189                 data = self.s.recv(1024)
190             except socket.timeout:
191                 continue
192
193             if not data:
194                 break
195             message += data
196
197             while True:
198                 cmd, message = self.parse_json(message)
199                 if not cmd:
200                     break
201                 self.process(cmd)
202
203         #print "client thread terminating"
204
205
206     def parse_json(self, message):
207         n = message.find('\n')
208         if n==-1: 
209             return None, message
210         j = json.loads( message[0:n] )
211         return j, message[n+1:]
212
213
214     def process(self, request):
215         #print "<--", request
216         method = request['method']
217         params = request['params']
218         _id = request['id']
219
220         if method.startswith('network.'):
221             out = {'id':_id}
222             try:
223                 f = getattr(self.network, method[8:])
224             except AttributeError:
225                 out['error'] = "unknown method"
226             try:
227                 out['result'] = f(*params)
228             except BaseException as e:
229                 out['error'] =str(e)
230             self.queue.put(out) 
231
232         if method == 'daemon.shutdown':
233             self.server.running = False
234             self.queue.put({'id':_id, 'result':True})
235             return
236
237         def cb(i,r):
238             _id = r.get('id')
239             if _id is not None:
240                 my_id = self.unanswered_requests.pop(_id)
241                 r['id'] = my_id
242             self.queue.put(r)
243
244         new_id = self.network.interface.send([(method, params)], cb) [0]
245         self.unanswered_requests[new_id] = _id
246
247
248     def send_responses(self):
249         while True:
250             try:
251                 r = self.queue.get_nowait()
252             except Queue.Empty:
253                 break
254             out = json.dumps(r) + '\n'
255             while out:
256                 n = self.s.send(out)
257                 out = out[n:]
258             #print "-->", r
259         
260
261
262
263 class NetworkServer:
264
265     def __init__(self, config):
266         network = Network(config)
267         if not network.start(wait=True):
268             print_msg("Not connected, aborting.")
269             sys.exit(1)
270         self.network = network
271         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
272         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
273         self.server.bind(('', 8000))
274         self.server.listen(5)
275         self.server.settimeout(1)
276         self.running = False
277         self.timeout = 60
278
279
280     def main_loop(self):
281         self.running = True
282         t = time.time()
283         while self.running:
284             try:
285                 connection, address = self.server.accept()
286             except socket.timeout:
287                 if time.time() - t > self.timeout:
288                     break
289                 continue
290             t = time.time()
291             client = ClientThread(self, self.network, connection)
292             client.start()
293
294
295
296 def start_daemon(config):
297     pid = os.fork()
298     if (pid == 0): # The first child.
299         os.chdir("/")
300         os.setsid()
301         os.umask(0)
302         pid2 = os.fork()
303         if (pid2 == 0):  # Second child
304             server = NetworkServer(config)
305             try:
306                 server.main_loop()
307             except KeyboardInterrupt:
308                 print "Ctrl C - Stopping server"
309             sys.exit(1)
310
311         sys.exit(0)
312
313     # should use a signal
314     time.sleep(2)
315
316
317
318 if __name__ == '__main__':
319     import simple_config
320     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
321     server = NetworkServer(config)
322     try:
323         server.main_loop()
324     except KeyboardInterrupt:
325         print "Ctrl C - Stopping server"
326         sys.exit(1)