daemon; initial commit
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import select
21 import time
22 import sys
23 import os
24 import threading
25 import traceback
26 import json
27 import Queue
28 from network import Network
29
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.socket.connect(('', 8000))
43         self.message_id = 0
44         self.unanswered_requests = {}
45         self.subscriptions = {}
46         self.debug = True
47         self.lock = threading.Lock()
48         
49
50     def parse_json(self, message):
51         s = message.find('\n')
52         if s==-1: 
53             return None, message
54         j = json.loads( message[0:s] )
55         return j, message[s+1:]
56
57
58     def run(self):
59         # read responses and trigger callbacks
60         message = ''
61         while True:
62             try:
63                 data = self.socket.recv(1024)
64             except:
65                 data = ''
66             if not data:
67                 break
68
69             message += data
70             while True:
71                 response, message = self.parse_json(message)
72                 if response is not None: 
73                     self.process(response)
74                 else:
75                     break
76
77         print "NetworkProxy: exiting"
78
79
80     def process(self, response):
81         # runs callbacks
82         #print "<--", response
83
84         msg_id = response.get('id')
85         with self.lock: 
86             method, params, callback = self.unanswered_requests.pop(msg_id)
87
88         result = response.get('result')
89         callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
90
91
92     def send(self, messages, callback):
93         # detect if it is a subscription
94         with self.lock:
95             if self.subscriptions.get(callback) is None: 
96                 self.subscriptions[callback] = []
97             for message in messages:
98                 if message not in self.subscriptions[callback]:
99                     self.subscriptions[callback].append(message)
100
101         self.do_send( messages, callback )
102
103
104     def do_send(self, messages, callback):
105         """return the ids of the requests that we sent"""
106         out = ''
107         ids = []
108         for m in messages:
109             method, params = m 
110             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
111             self.unanswered_requests[self.message_id] = method, params, callback
112             ids.append(self.message_id)
113             # print "-->", request
114             self.message_id += 1
115             out += request + '\n'
116         while out:
117             sent = self.socket.send( out )
118             out = out[sent:]
119         return ids
120
121
122     def synchronous_get(self, requests, timeout=100000000):
123         queue = Queue.Queue()
124         ids = self.do_send(requests, queue.put)
125         id2 = ids[:]
126         res = {}
127         while ids:
128             r = queue.get(True, timeout)
129             _id = r.get('id')
130             if _id in ids:
131                 ids.remove(_id)
132                 res[_id] = r.get('result')
133         out = []
134         for _id in id2:
135             out.append(res[_id])
136         return out
137
138
139     def get_servers(self):
140         return self.synchronous_get([('network.getservers',[])])[0]
141
142     def stop(self):
143         return self.synchronous_get([('network.shutdown',[])])[0]
144
145
146
147
148
149
150 class ClientThread(threading.Thread):
151     # read messages from client (socket), and sends them to Network
152     # responses are sent back on the same socket
153
154     def __init__(self, server, network, socket):
155         threading.Thread.__init__(self)
156         self.server = server
157         self.daemon = True
158         self.s = socket
159         self.s.settimeout(0.1)
160         self.network = network
161         self.queue = Queue.Queue()
162         self.unanswered_requests = {}
163
164
165     def run(self):
166         message = ''
167         while True:
168             self.send_responses()
169             try:
170                 data = self.s.recv(1024)
171             except socket.timeout:
172                 continue
173
174             if not data:
175                 break
176             message += data
177
178             while True:
179                 cmd, message = self.parse_json(message)
180                 if not cmd:
181                     break
182                 self.process(cmd)
183
184         #print "client thread terminating"
185
186
187     def parse_json(self, message):
188         n = message.find('\n')
189         if n==-1: 
190             return None, message
191         j = json.loads( message[0:n] )
192         return j, message[n+1:]
193
194
195     def process(self, request):
196         #print "<--", request
197         method = request['method']
198         params = request['params']
199         _id = request['id']
200
201         if method.startswith('network.'):
202             if method == 'network.shutdown':
203                 self.server.running = False
204                 r = {'id':_id, 'result':True}
205             elif method == 'network.getservers':
206                 servers = self.network.get_servers()
207                 r = {'id':_id, 'result':servers}
208             else:
209                 r = {'id':_id, 'error':'unknown method'}
210             self.queue.put(r) 
211             return
212
213         def cb(i,r):
214             _id = r.get('id')
215             if _id is not None:
216                 my_id = self.unanswered_requests.pop(_id)
217                 r['id'] = my_id
218             self.queue.put(r)
219
220         new_id = self.network.interface.send([(method, params)], cb) [0]
221         self.unanswered_requests[new_id] = _id
222
223
224     def send_responses(self):
225         while True:
226             try:
227                 r = self.queue.get_nowait()
228             except Queue.Empty:
229                 break
230             out = json.dumps(r) + '\n'
231             while out:
232                 n = self.s.send(out)
233                 out = out[n:]
234             #print "-->", r
235         
236
237 #Server:
238 #   start network() object
239 #   accept connections, forward requests 
240
241
242 class NetworkServer:
243
244     def __init__(self, config):
245         network = Network(config)
246         if not network.start(wait=True):
247             print_msg("Not connected, aborting.")
248             sys.exit(1)
249         self.network = network
250         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
252         self.server.bind(('', 8000))
253         self.server.listen(5)
254         self.server.settimeout(1)
255         self.running = False
256         self.timeout = 60
257
258
259     def main_loop(self):
260         self.running = True
261         t = time.time()
262         while self.running:
263             try:
264                 connection, address = self.server.accept()
265             except socket.timeout:
266                 if time.time() - t > self.timeout:
267                     break
268                 continue
269             t = time.time()
270             client = ClientThread(self, self.network, connection)
271             client.start()
272         #print "Done."
273
274
275
276
277 def start_daemon(config):
278     pid = os.fork()
279     if (pid == 0): # The first child.
280         os.chdir("/")
281         os.setsid()
282         os.umask(0)
283         pid2 = os.fork()
284         if (pid2 == 0):  # Second child
285             server = NetworkServer(config)
286             try:
287                 server.main_loop()
288             except KeyboardInterrupt:
289                 print "Ctrl C - Stopping server"
290             sys.exit(1)
291
292         sys.exit(0)
293
294     # should use a signal
295     time.sleep(2)
296
297
298
299 if __name__ == '__main__':
300     import simple_config
301     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
302     server = NetworkServer(config)
303     try:
304         server.main_loop()
305     except KeyboardInterrupt:
306         print "Ctrl C - Stopping server"
307         sys.exit(1)