id of imported account
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import time
21 import sys
22 import os
23 import threading
24 import traceback
25 import json
26 import Queue
27 from network import Network
28 from util import print_msg, print_stderr
29 from simple_config import SimpleConfig
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config = {}):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = SimpleConfig(config) if type(config) == type({}) else config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.daemon_port = config.get('daemon_port', 8000)
43         self.message_id = 0
44         self.unanswered_requests = {}
45         self.subscriptions = {}
46         self.debug = False
47         self.lock = threading.Lock()
48         self.pending_transactions_for_notifications = []
49
50
51     def start(self, start_daemon=False):
52         daemon_started = False
53         while True:
54             try:
55                 self.socket.connect(('', self.daemon_port))
56                 threading.Thread.start(self)
57                 return True
58
59             except socket.error:
60                 if not start_daemon:
61                     return False
62
63                 elif not daemon_started:
64                     print_stderr( "Starting daemon [%s]"%self.config.get('server'))
65                     daemon_started = True
66                     pid = os.fork()
67                     if (pid == 0): # The first child.
68                         os.chdir("/")
69                         os.setsid()
70                         os.umask(0)
71                         pid2 = os.fork()
72                         if (pid2 == 0):  # Second child
73                             server = NetworkServer(self.config)
74                             try:
75                                 server.main_loop()
76                             except KeyboardInterrupt:
77                                 print "Ctrl C - Stopping server"
78                             sys.exit(1)
79                         sys.exit(0)
80                 else:
81                     time.sleep(0.1)
82
83
84
85     def parse_json(self, message):
86         s = message.find('\n')
87         if s==-1: 
88             return None, message
89         j = json.loads( message[0:s] )
90         return j, message[s+1:]
91
92
93     def run(self):
94         # read responses and trigger callbacks
95         message = ''
96         while True:
97             try:
98                 data = self.socket.recv(1024)
99             except:
100                 data = ''
101             if not data:
102                 break
103
104             message += data
105             while True:
106                 response, message = self.parse_json(message)
107                 if response is not None: 
108                     self.process(response)
109                 else:
110                     break
111
112         print "NetworkProxy: exiting"
113
114
115     def process(self, response):
116         # runs callbacks
117         if self.debug: print "<--", response
118
119         msg_id = response.get('id')
120         with self.lock: 
121             method, params, callback = self.unanswered_requests.pop(msg_id)
122
123         result = response.get('result')
124         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
125
126
127     def subscribe(self, messages, callback):
128         # detect if it is a subscription
129         with self.lock:
130             if self.subscriptions.get(callback) is None: 
131                 self.subscriptions[callback] = []
132             for message in messages:
133                 if message not in self.subscriptions[callback]:
134                     self.subscriptions[callback].append(message)
135
136         self.send( messages, callback )
137
138
139     def send(self, messages, callback):
140         """return the ids of the requests that we sent"""
141         out = ''
142         ids = []
143         for m in messages:
144             method, params = m 
145             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
146             self.unanswered_requests[self.message_id] = method, params, callback
147             ids.append(self.message_id)
148             if self.debug: print "-->", request
149             self.message_id += 1
150             out += request + '\n'
151         while out:
152             sent = self.socket.send( out )
153             out = out[sent:]
154         return ids
155
156
157     def synchronous_get(self, requests, timeout=100000000):
158         queue = Queue.Queue()
159         ids = self.send(requests, lambda i,x: queue.put(x))
160         id2 = ids[:]
161         res = {}
162         while ids:
163             r = queue.get(True, timeout)
164             _id = r.get('id')
165             if _id in ids:
166                 ids.remove(_id)
167                 res[_id] = r.get('result')
168         out = []
169         for _id in id2:
170             out.append(res[_id])
171         return out
172
173
174     def get_servers(self):
175         return self.synchronous_get([('network.get_servers',[])])[0]
176
177     def get_header(self, height):
178         return self.synchronous_get([('network.get_header',[height])])[0]
179
180     def get_local_height(self):
181         return self.synchronous_get([('network.get_local_height',[])])[0]
182
183     def is_connected(self):
184         return self.synchronous_get([('network.is_connected',[])])[0]
185
186     def is_up_to_date(self):
187         return self.synchronous_get([('network.is_up_to_date',[])])[0]
188
189     def main_server(self):
190         return self.synchronous_get([('network.main_server',[])])[0]
191
192     def stop(self):
193         return self.synchronous_get([('daemon.shutdown',[])])[0]
194
195
196     def trigger_callback(self, cb):
197         pass
198
199
200
201
202
203
204 class ClientThread(threading.Thread):
205     # read messages from client (socket), and sends them to Network
206     # responses are sent back on the same socket
207
208     def __init__(self, server, network, socket):
209         threading.Thread.__init__(self)
210         self.server = server
211         self.daemon = True
212         self.s = socket
213         self.s.settimeout(0.1)
214         self.network = network
215         self.queue = Queue.Queue()
216         self.unanswered_requests = {}
217         self.debug = False
218
219
220     def run(self):
221         message = ''
222         while True:
223             self.send_responses()
224             try:
225                 data = self.s.recv(1024)
226             except socket.timeout:
227                 continue
228
229             if not data:
230                 break
231             message += data
232
233             while True:
234                 cmd, message = self.parse_json(message)
235                 if not cmd:
236                     break
237                 self.process(cmd)
238
239         #print "client thread terminating"
240
241
242     def parse_json(self, message):
243         n = message.find('\n')
244         if n==-1: 
245             return None, message
246         j = json.loads( message[0:n] )
247         return j, message[n+1:]
248
249
250     def process(self, request):
251         if self.debug: print "<--", request
252         method = request['method']
253         params = request['params']
254         _id = request['id']
255
256         if method.startswith('network.'):
257             out = {'id':_id}
258             try:
259                 f = getattr(self.network, method[8:])
260             except AttributeError:
261                 out['error'] = "unknown method"
262             try:
263                 out['result'] = f(*params)
264             except BaseException as e:
265                 out['error'] =str(e)
266             self.queue.put(out) 
267             return
268
269         if method == 'daemon.shutdown':
270             self.server.running = False
271             self.queue.put({'id':_id, 'result':True})
272             return
273
274         def cb(i,r):
275             _id = r.get('id')
276             if _id is not None:
277                 my_id = self.unanswered_requests.pop(_id)
278                 r['id'] = my_id
279             self.queue.put(r)
280
281         new_id = self.network.interface.send([(method, params)], cb) [0]
282         self.unanswered_requests[new_id] = _id
283
284
285     def send_responses(self):
286         while True:
287             try:
288                 r = self.queue.get_nowait()
289             except Queue.Empty:
290                 break
291             out = json.dumps(r) + '\n'
292             while out:
293                 n = self.s.send(out)
294                 out = out[n:]
295             if self.debug: print "-->", r
296         
297
298
299
300 class NetworkServer:
301
302     def __init__(self, config):
303         network = Network(config)
304         if not network.start(wait=True):
305             print_msg("Not connected, aborting.")
306             sys.exit(1)
307         self.network = network
308         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
309         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
310         self.daemon_port = config.get('daemon_port', 8000)
311         self.server.bind(('', self.daemon_port))
312         self.server.listen(5)
313         self.server.settimeout(1)
314         self.running = False
315         self.timeout = config.get('daemon_timeout', 60)
316
317
318     def main_loop(self):
319         self.running = True
320         t = time.time()
321         while self.running:
322             try:
323                 connection, address = self.server.accept()
324             except socket.timeout:
325                 if time.time() - t > self.timeout:
326                     break
327                 continue
328             t = time.time()
329             client = ClientThread(self, self.network, connection)
330             client.start()
331
332
333
334 if __name__ == '__main__':
335     import simple_config
336     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
337     server = NetworkServer(config)
338     try:
339         server.main_loop()
340     except KeyboardInterrupt:
341         print "Ctrl C - Stopping server"
342         sys.exit(1)