add daemon port to config
[electrum-nvc.git] / lib / daemon.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import socket
20 import select
21 import time
22 import sys
23 import os
24 import threading
25 import traceback
26 import json
27 import Queue
28 from network import Network
29 from util import print_msg
30
31
32 class NetworkProxy(threading.Thread):
33     # connects to daemon
34     # sends requests, runs callbacks
35
36     def __init__(self, config):
37         threading.Thread.__init__(self)
38         self.daemon = True
39         self.config = config
40         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42         self.daemon_port = config.get('daemon_port', 8000)
43         self.socket.connect(('', self.daemon_port))
44         self.message_id = 0
45         self.unanswered_requests = {}
46         self.subscriptions = {}
47         self.debug = False
48         self.lock = threading.Lock()
49         
50
51     def parse_json(self, message):
52         s = message.find('\n')
53         if s==-1: 
54             return None, message
55         j = json.loads( message[0:s] )
56         return j, message[s+1:]
57
58
59     def run(self):
60         # read responses and trigger callbacks
61         message = ''
62         while True:
63             try:
64                 data = self.socket.recv(1024)
65             except:
66                 data = ''
67             if not data:
68                 break
69
70             message += data
71             while True:
72                 response, message = self.parse_json(message)
73                 if response is not None: 
74                     self.process(response)
75                 else:
76                     break
77
78         print "NetworkProxy: exiting"
79
80
81     def process(self, response):
82         # runs callbacks
83         if self.debug: print "<--", response
84
85         msg_id = response.get('id')
86         with self.lock: 
87             method, params, callback = self.unanswered_requests.pop(msg_id)
88
89         result = response.get('result')
90         callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
91
92
93     def subscribe(self, messages, callback):
94         # detect if it is a subscription
95         with self.lock:
96             if self.subscriptions.get(callback) is None: 
97                 self.subscriptions[callback] = []
98             for message in messages:
99                 if message not in self.subscriptions[callback]:
100                     self.subscriptions[callback].append(message)
101
102         self.do_send( messages, callback )
103
104
105     def do_send(self, messages, callback):
106         """return the ids of the requests that we sent"""
107         out = ''
108         ids = []
109         for m in messages:
110             method, params = m 
111             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
112             self.unanswered_requests[self.message_id] = method, params, callback
113             ids.append(self.message_id)
114             if self.debug: print "-->", request
115             self.message_id += 1
116             out += request + '\n'
117         while out:
118             sent = self.socket.send( out )
119             out = out[sent:]
120         return ids
121
122
123     def synchronous_get(self, requests, timeout=100000000):
124         queue = Queue.Queue()
125         ids = self.do_send(requests, lambda i,x: queue.put(x))
126         id2 = ids[:]
127         res = {}
128         while ids:
129             r = queue.get(True, timeout)
130             _id = r.get('id')
131             if _id in ids:
132                 ids.remove(_id)
133                 res[_id] = r.get('result')
134         out = []
135         for _id in id2:
136             out.append(res[_id])
137         return out
138
139
140     def get_servers(self):
141         return self.synchronous_get([('network.get_servers',[])])[0]
142
143     def get_header(self, height):
144         return self.synchronous_get([('network.get_header',[height])])[0]
145
146     def get_local_height(self):
147         return self.synchronous_get([('network.get_local_height',[])])[0]
148
149     def is_connected(self):
150         return self.synchronous_get([('network.is_connected',[])])[0]
151
152     def is_up_to_date(self):
153         return self.synchronous_get([('network.is_up_to_date',[])])[0]
154
155     def main_server(self):
156         return self.synchronous_get([('network.main_server',[])])[0]
157
158     def stop(self):
159         return self.synchronous_get([('daemon.shutdown',[])])[0]
160
161
162     def trigger_callback(self, cb):
163         pass
164
165
166
167
168
169
170 class ClientThread(threading.Thread):
171     # read messages from client (socket), and sends them to Network
172     # responses are sent back on the same socket
173
174     def __init__(self, server, network, socket):
175         threading.Thread.__init__(self)
176         self.server = server
177         self.daemon = True
178         self.s = socket
179         self.s.settimeout(0.1)
180         self.network = network
181         self.queue = Queue.Queue()
182         self.unanswered_requests = {}
183         self.debug = False
184
185
186     def run(self):
187         message = ''
188         while True:
189             self.send_responses()
190             try:
191                 data = self.s.recv(1024)
192             except socket.timeout:
193                 continue
194
195             if not data:
196                 break
197             message += data
198
199             while True:
200                 cmd, message = self.parse_json(message)
201                 if not cmd:
202                     break
203                 self.process(cmd)
204
205         #print "client thread terminating"
206
207
208     def parse_json(self, message):
209         n = message.find('\n')
210         if n==-1: 
211             return None, message
212         j = json.loads( message[0:n] )
213         return j, message[n+1:]
214
215
216     def process(self, request):
217         if self.debug: print "<--", request
218         method = request['method']
219         params = request['params']
220         _id = request['id']
221
222         if method.startswith('network.'):
223             out = {'id':_id}
224             try:
225                 f = getattr(self.network, method[8:])
226             except AttributeError:
227                 out['error'] = "unknown method"
228             try:
229                 out['result'] = f(*params)
230             except BaseException as e:
231                 out['error'] =str(e)
232             self.queue.put(out) 
233             return
234
235         if method == 'daemon.shutdown':
236             self.server.running = False
237             self.queue.put({'id':_id, 'result':True})
238             return
239
240         def cb(i,r):
241             _id = r.get('id')
242             if _id is not None:
243                 my_id = self.unanswered_requests.pop(_id)
244                 r['id'] = my_id
245             self.queue.put(r)
246
247         new_id = self.network.interface.send([(method, params)], cb) [0]
248         self.unanswered_requests[new_id] = _id
249
250
251     def send_responses(self):
252         while True:
253             try:
254                 r = self.queue.get_nowait()
255             except Queue.Empty:
256                 break
257             out = json.dumps(r) + '\n'
258             while out:
259                 n = self.s.send(out)
260                 out = out[n:]
261             if self.debug: print "-->", r
262         
263
264
265
266 class NetworkServer:
267
268     def __init__(self, config):
269         network = Network(config)
270         if not network.start(wait=True):
271             print_msg("Not connected, aborting.")
272             sys.exit(1)
273         self.network = network
274         self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
275         self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
276         self.daemon_port = config.get('daemon_port', 8000)
277         self.server.bind(('', self.daemon_port))
278         self.server.listen(5)
279         self.server.settimeout(1)
280         self.running = False
281         self.timeout = config.get('daemon_timeout', 60)
282
283
284     def main_loop(self):
285         self.running = True
286         t = time.time()
287         while self.running:
288             try:
289                 connection, address = self.server.accept()
290             except socket.timeout:
291                 if time.time() - t > self.timeout:
292                     break
293                 continue
294             t = time.time()
295             client = ClientThread(self, self.network, connection)
296             client.start()
297
298
299
300 if __name__ == '__main__':
301     import simple_config
302     config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
303     server = NetworkServer(config)
304     try:
305         server.main_loop()
306     except KeyboardInterrupt:
307         print "Ctrl C - Stopping server"
308         sys.exit(1)