3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 from network import Network
28 from util import print_msg, print_stderr
29 from simple_config import SimpleConfig
33 class NetworkProxy(threading.Thread):
35 # sends requests, runs callbacks
37 def __init__(self, config=None):
39 config = {} # Do not use mutables as default arguments!
40 threading.Thread.__init__(self)
42 self.config = SimpleConfig(config) if type(config) == type({}) else config
43 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
44 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
45 self.daemon_port = config.get('daemon_port', DAEMON_PORT)
47 self.unanswered_requests = {}
48 self.subscriptions = {}
50 self.lock = threading.Lock()
51 self.pending_transactions_for_notifications = []
54 def start(self, start_daemon=False):
55 daemon_started = False
58 self.socket.connect(('', self.daemon_port))
59 threading.Thread.start(self)
66 elif not daemon_started:
67 print_stderr( "Starting daemon [%s]"%self.config.get('server'))
70 if (pid == 0): # The first child.
75 if (pid2 == 0): # Second child
76 server = NetworkServer(self.config)
79 except KeyboardInterrupt:
80 print "Ctrl C - Stopping server"
88 def parse_json(self, message):
89 s = message.find('\n')
92 j = json.loads( message[0:s] )
93 return j, message[s+1:]
97 # read responses and trigger callbacks
101 data = self.socket.recv(1024)
109 response, message = self.parse_json(message)
110 if response is not None:
111 self.process(response)
115 print "NetworkProxy: exiting"
118 def process(self, response):
120 if self.debug: print "<--", response
122 msg_id = response.get('id')
124 method, params, callback = self.unanswered_requests.pop(msg_id)
126 result = response.get('result')
127 callback(None, {'method':method, 'params':params, 'result':result, 'id':msg_id})
130 def subscribe(self, messages, callback):
131 # detect if it is a subscription
133 if self.subscriptions.get(callback) is None:
134 self.subscriptions[callback] = []
135 for message in messages:
136 if message not in self.subscriptions[callback]:
137 self.subscriptions[callback].append(message)
139 self.send( messages, callback )
142 def send(self, messages, callback):
143 """return the ids of the requests that we sent"""
148 request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
149 self.unanswered_requests[self.message_id] = method, params, callback
150 ids.append(self.message_id)
151 if self.debug: print "-->", request
153 out += request + '\n'
155 sent = self.socket.send( out )
160 def synchronous_get(self, requests, timeout=100000000):
161 queue = Queue.Queue()
162 ids = self.send(requests, lambda i,x: queue.put(x))
166 r = queue.get(True, timeout)
170 res[_id] = r.get('result')
177 def get_servers(self):
178 return self.synchronous_get([('network.get_servers',[])])[0]
180 def get_header(self, height):
181 return self.synchronous_get([('network.get_header',[height])])[0]
183 def get_local_height(self):
184 return self.synchronous_get([('network.get_local_height',[])])[0]
186 def is_connected(self):
187 return self.synchronous_get([('network.is_connected',[])])[0]
189 def is_up_to_date(self):
190 return self.synchronous_get([('network.is_up_to_date',[])])[0]
192 def main_server(self):
193 return self.synchronous_get([('network.main_server',[])])[0]
196 return self.synchronous_get([('daemon.shutdown',[])])[0]
199 def trigger_callback(self, cb):
207 class ClientThread(threading.Thread):
208 # read messages from client (socket), and sends them to Network
209 # responses are sent back on the same socket
211 def __init__(self, server, network, socket):
212 threading.Thread.__init__(self)
216 self.s.settimeout(0.1)
217 self.network = network
218 self.queue = Queue.Queue()
219 self.unanswered_requests = {}
226 self.send_responses()
228 data = self.s.recv(1024)
229 except socket.timeout:
237 cmd, message = self.parse_json(message)
242 #print "client thread terminating"
245 def parse_json(self, message):
246 n = message.find('\n')
249 j = json.loads( message[0:n] )
250 return j, message[n+1:]
253 def process(self, request):
254 if self.debug: print "<--", request
255 method = request['method']
256 params = request['params']
259 if method.startswith('network.'):
262 f = getattr(self.network, method[8:])
263 except AttributeError:
264 out['error'] = "unknown method"
266 out['result'] = f(*params)
267 except BaseException as e:
272 if method == 'daemon.shutdown':
273 self.server.running = False
274 self.queue.put({'id':_id, 'result':True})
280 my_id = self.unanswered_requests.pop(_id)
284 new_id = self.network.interface.send([(method, params)], cb) [0]
285 self.unanswered_requests[new_id] = _id
288 def send_responses(self):
291 r = self.queue.get_nowait()
294 out = json.dumps(r) + '\n'
298 if self.debug: print "-->", r
305 def __init__(self, config):
306 network = Network(config)
307 if not network.start(wait=True):
308 print_msg("Not connected, aborting.")
310 self.network = network
311 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
312 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
313 self.daemon_port = config.get('daemon_port', DAEMON_PORT)
314 self.server.bind(('', self.daemon_port))
315 self.server.listen(5)
316 self.server.settimeout(1)
318 self.timeout = config.get('daemon_timeout', 60)
326 connection, address = self.server.accept()
327 except socket.timeout:
328 if time.time() - t > self.timeout:
332 client = ClientThread(self, self.network, connection)
337 if __name__ == '__main__':
339 config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'})
340 server = NetworkServer(config)
343 except KeyboardInterrupt:
344 print "Ctrl C - Stopping server"