New port numbers
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import time
21 import sys
22 import os
23 import threading
24 import traceback
25 import json
26 import Queue
27 from network import Network
28 from util import print_msg, print_stderr
29 from simple_config import SimpleConfig
30
31 DAEMON_PORT=8001
32
33 class NetworkProxy(threading.Thread):
34     # connects to daemon
35     # sends requests, runs callbacks
36
37     def __init__(self, config=None):
38         if config is None:
39             config = {}  # Do not use mutables as default arguments!
40         threading.Thread.__init__(self)
41         self.daemon = True
42         self.config = SimpleConfig(config) if type(config) == type({}) else config
43         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
44         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
45         self.daemon_port = config.get('daemon_port', DAEMON_PORT)
46         self.message_id = 0
47         self.unanswered_requests = {}
48         self.subscriptions = {}
49         self.debug = False
50         self.lock = threading.Lock()
51         self.pending_transactions_for_notifications = []
52
53
54     def start(self, start_daemon=False):
55         daemon_started = False
56         while True:
57             try:
58                 self.socket.connect(('', self.daemon_port))
59                 threading.Thread.start(self)
60                 return True
61
62             except socket.error:
63                 if not start_daemon:
64                     return False
65
66                 elif not daemon_started:
67                     print_stderr( "Starting daemon [%s]"%self.config.get('server'))
68                     daemon_started = True
69                     pid = os.fork()
70                     if (pid == 0): # The first child.
71                         os.chdir("/")
72                         os.setsid()
73                         os.umask(0)
74                         pid2 = os.fork()
75                         if (pid2 == 0):  # Second child
76                             server = NetworkServer(self.config)
77                             try:
78                                 server.main_loop()
79                             except KeyboardInterrupt:
80                                 print "Ctrl C - Stopping server"
81                             sys.exit(1)
82                         sys.exit(0)
83                 else:
84                     time.sleep(0.1)
85
86
87
88     def parse_json(self, message):
89         s = message.find('\n')
90         if s==-1: 
91             return None, message
92         j = json.loads( message[0:s] )
93         return j, message[s+1:]
94
95
96     def run(self):
97         # read responses and trigger callbacks
98         message = ''
99         while True:
100             try:
101                 data = self.socket.recv(1024)
102             except:
103                 data = ''
104             if not data:
105                 break
106
107             message += data
108             while True:
109                 response, message = self.parse_json(message)
110                 if response is not None: 
111                     self.process(response)
112                 else:
113                     break
114
115         print "NetworkProxy: exiting"
116
117
118     def process(self, response):
119         # runs callbacks
120         if self.debug: print "<--", response
121
122         msg_id = response.get('id')
123         with self.lock: 
124             method, params, callback = self.unanswered_requests.pop(msg_id)
125
126         result = response.get('result')
127         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
128
129
130     def subscribe(self, messages, callback):
131         # detect if it is a subscription
132         with self.lock:
133             if self.subscriptions.get(callback) is None: 
134                 self.subscriptions[callback] = []
135             for message in messages:
136                 if message not in self.subscriptions[callback]:
137                     self.subscriptions[callback].append(message)
138
139         self.send( messages, callback )
140
141
142     def send(self, messages, callback):
143         """return the ids of the requests that we sent"""
144         out = ''
145         ids = []
146         for m in messages:
147             method, params = m 
148             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
149             self.unanswered_requests[self.message_id] = method, params, callback
150             ids.append(self.message_id)
151             if self.debug: print "-->", request
152             self.message_id += 1
153             out += request + '\n'
154         while out:
155             sent = self.socket.send( out )
156             out = out[sent:]
157         return ids
158
159
160     def synchronous_get(self, requests, timeout=100000000):
161         queue = Queue.Queue()
162         ids = self.send(requests, lambda i,x: queue.put(x))
163         id2 = ids[:]
164         res = {}
165         while ids:
166             r = queue.get(True, timeout)
167             _id = r.get('id')
168             if _id in ids:
169                 ids.remove(_id)
170                 res[_id] = r.get('result')
171         out = []
172         for _id in id2:
173             out.append(res[_id])
174         return out
175
176
177     def get_servers(self):
178         return self.synchronous_get([('network.get_servers',[])])[0]
179
180     def get_header(self, height):
181         return self.synchronous_get([('network.get_header',[height])])[0]
182
183     def get_local_height(self):
184         return self.synchronous_get([('network.get_local_height',[])])[0]
185
186     def is_connected(self):
187         return self.synchronous_get([('network.is_connected',[])])[0]
188
189     def is_up_to_date(self):
190         return self.synchronous_get([('network.is_up_to_date',[])])[0]
191
192     def main_server(self):
193         return self.synchronous_get([('network.main_server',[])])[0]
194
195     def stop(self):
196         return self.synchronous_get([('daemon.shutdown',[])])[0]
197
198
199     def trigger_callback(self, cb):
200         pass
201
202
203
204
205
206
207 class ClientThread(threading.Thread):
208     # read messages from client (socket), and sends them to Network
209     # responses are sent back on the same socket
210
211     def __init__(self, server, network, socket):
212         threading.Thread.__init__(self)
213         self.server = server
214         self.daemon = True
215         self.s = socket
216         self.s.settimeout(0.1)
217         self.network = network
218         self.queue = Queue.Queue()
219         self.unanswered_requests = {}
220         self.debug = False
221
222
223     def run(self):
224         message = ''
225         while True:
226             self.send_responses()
227             try:
228                 data = self.s.recv(1024)
229             except socket.timeout:
230                 continue
231
232             if not data:
233                 break
234             message += data
235
236             while True:
237                 cmd, message = self.parse_json(message)
238                 if not cmd:
239                     break
240                 self.process(cmd)
241
242         #print "client thread terminating"
243
244
245     def parse_json(self, message):
246         n = message.find('\n')
247         if n==-1: 
248             return None, message
249         j = json.loads( message[0:n] )
250         return j, message[n+1:]
251
252
253     def process(self, request):
254         if self.debug: print "<--", request
255         method = request['method']
256         params = request['params']
257         _id = request['id']
258
259         if method.startswith('network.'):
260             out = {'id':_id}
261             try:
262                 f = getattr(self.network, method[8:])
263             except AttributeError:
264                 out['error'] = "unknown method"
265             try:
266                 out['result'] = f(*params)
267             except BaseException as e:
268                 out['error'] =str(e)
269             self.queue.put(out) 
270             return
271
272         if method == 'daemon.shutdown':
273             self.server.running = False
274             self.queue.put({'id':_id, 'result':True})
275             return
276
277         def cb(i,r):
278             _id = r.get('id')
279             if _id is not None:
280                 my_id = self.unanswered_requests.pop(_id)
281                 r['id'] = my_id
282             self.queue.put(r)
283
284         new_id = self.network.interface.send([(method, params)], cb) [0]
285         self.unanswered_requests[new_id] = _id
286
287
288     def send_responses(self):
289         while True:
290             try:
291                 r = self.queue.get_nowait()
292             except Queue.Empty:
293                 break
294             out = json.dumps(r) + '\n'
295             while out:
296                 n = self.s.send(out)
297                 out = out[n:]
298             if self.debug: print "-->", r
299         
300
301
302
303 class NetworkServer:
304
305     def __init__(self, config):
306         network = Network(config)
307         if not network.start(wait=True):
308             print_msg("Not connected, aborting.")
309             sys.exit(1)
310         self.network = network
311         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
312         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
313         self.daemon_port = config.get('daemon_port', DAEMON_PORT)
314         self.server.bind(('', self.daemon_port))
315         self.server.listen(5)
316         self.server.settimeout(1)
317         self.running = False
318         self.timeout = config.get('daemon_timeout', 60)
319
320
321     def main_loop(self):
322         self.running = True
323         t = time.time()
324         while self.running:
325             try:
326                 connection, address = self.server.accept()
327             except socket.timeout:
328                 if time.time() - t > self.timeout:
329                     break
330                 continue
331             t = time.time()
332             client = ClientThread(self, self.network, connection)
333             client.start()
334
335
336
337 if __name__ == '__main__':
338     import simple_config
339     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
340     server = NetworkServer(config)
341     try:
342         server.main_loop()
343     except KeyboardInterrupt:
344         print "Ctrl C - Stopping server"
345         sys.exit(1)