New port numbers
[electrum-nvc.git] / lib / network.py
1 import threading, time, Queue, os, sys, shutil, random
2 from util import user_dir, appdata_dir, print_error, print_msg
3 from bitcoin import *
4 import interface
5 from blockchain import Blockchain
6
7 DEFAULT_PORTS = {'t':'40001', 's':'40002', 'h':'7081', 'g':'7082'}
8
9 DEFAULT_SERVERS = {
10 #    '127.0.0.1': DEFAULT_PORTS,
11     '193.23.181.148': DEFAULT_PORTS,
12     '91.235.143.61': DEFAULT_PORTS,
13 }
14
15
16 def parse_servers(result):
17     """ parse servers list into dict format"""
18     from version import PROTOCOL_VERSION
19     servers = {}
20     for item in result:
21         host = item[1]
22         out = {}
23         version = None
24         pruning_level = '-'
25         if len(item) > 2:
26             for v in item[2]:
27                 if re.match("[stgh]\d*", v):
28                     protocol, port = v[0], v[1:]
29                     if port == '': port = DEFAULT_PORTS[protocol]
30                     out[protocol] = port
31                 elif re.match("v(.?)+", v):
32                     version = v[1:]
33                 elif re.match("p\d*", v):
34                     pruning_level = v[1:]
35                 if pruning_level == '': pruning_level = '0'
36         try: 
37             is_recent = float(version)>=float(PROTOCOL_VERSION)
38         except Exception:
39             is_recent = False
40
41         if out and is_recent:
42             out['pruning'] = pruning_level
43             servers[host] = out
44
45     return servers
46
47
48
49 def filter_protocol(servers, p):
50     l = []
51     for k, protocols in servers.items():
52         if p in protocols:
53             l.append( ':'.join([k, protocols[p], p]) )
54     return l
55     
56
57 def pick_random_server(p='s'):
58     return random.choice( filter_protocol(DEFAULT_SERVERS,p) )
59
60 from simple_config import SimpleConfig
61
62 class Network(threading.Thread):
63
64     def __init__(self, config=None):
65         if config is None:
66             config = {}  # Do not use mutables as default values!
67         threading.Thread.__init__(self)
68         self.daemon = True
69         self.config = SimpleConfig(config) if type(config) == type({}) else config
70         self.lock = threading.Lock()
71         self.num_server = 8 if not self.config.get('oneserver') else 0
72         self.blockchain = Blockchain(self.config, self)
73         self.interfaces = {}
74         self.queue = Queue.Queue()
75         self.callbacks = {}
76         self.protocol = self.config.get('protocol','s')
77         self.running = False
78
79         # Server for addresses and transactions
80         self.default_server = self.config.get('server')
81         if not self.default_server:
82             self.default_server = pick_random_server(self.protocol)
83
84         self.irc_servers = [] # returned by interface (list from irc)
85         self.pending_servers = set([])
86         self.disconnected_servers = set([])
87         self.recent_servers = self.config.get('recent_servers',[]) # successful connections
88
89         self.banner = ''
90         self.interface = None
91         self.proxy = self.config.get('proxy')
92         self.heights = {}
93         self.merkle_roots = {}
94         self.utxo_roots = {}
95         self.server_lag = 0
96
97         dir_path = os.path.join( self.config.path, 'certs')
98         if not os.path.exists(dir_path):
99             os.mkdir(dir_path)
100
101         # default subscriptions
102         self.subscriptions = {}
103         self.subscriptions[self.on_banner] = [('server.banner',[])]
104         self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])]
105         self.pending_transactions_for_notifications = []
106
107
108     def is_connected(self):
109         return self.interface and self.interface.is_connected
110
111
112     def is_up_to_date(self):
113         return self.interface.is_up_to_date()
114
115
116     def main_server(self):
117         return self.interface.server
118
119
120     def send_subscriptions(self):
121         for cb, sub in self.subscriptions.items():
122             self.interface.send(sub, cb)
123
124
125     def subscribe(self, messages, callback):
126         with self.lock:
127             if self.subscriptions.get(callback) is None: 
128                 self.subscriptions[callback] = []
129             for message in messages:
130                 if message not in self.subscriptions[callback]:
131                     self.subscriptions[callback].append(message)
132
133         if self.is_connected():
134             self.interface.send( messages, callback )
135
136
137     def send(self, messages, callback):
138         if self.is_connected():
139             self.interface.send( messages, callback )
140             return True
141         else:
142             return False
143
144
145     def register_callback(self, event, callback):
146         with self.lock:
147             if not self.callbacks.get(event):
148                 self.callbacks[event] = []
149             self.callbacks[event].append(callback)
150
151
152     def trigger_callback(self, event):
153         with self.lock:
154             callbacks = self.callbacks.get(event,[])[:]
155         if callbacks:
156             [callback() for callback in callbacks]
157
158
159     def random_server(self):
160         choice_list = []
161         l = filter_protocol(self.get_servers(), self.protocol)
162         for s in l:
163             if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys():
164                 continue
165             else:
166                 choice_list.append(s)
167         
168         if not choice_list: 
169             if not self.interfaces:
170                 # we are probably offline, retry later
171                 self.disconnected_servers = set([])
172             return
173         
174         server = random.choice( choice_list )
175         return server
176
177
178     def get_servers(self):
179         if self.irc_servers:
180             out = self.irc_servers  
181         else:
182             out = DEFAULT_SERVERS
183             for s in self.recent_servers:
184                 host, port, protocol = s.split(':')
185                 if host not in out:
186                     out[host] = { protocol:port }
187         return out
188
189     def start_interface(self, server):
190         if server in self.interfaces.keys():
191             return
192         i = interface.Interface(server, self.config)
193         self.pending_servers.add(server)
194         i.start(self.queue)
195         return i 
196
197     def start_random_interface(self):
198         server = self.random_server()
199         if server:
200             self.start_interface(server)
201
202     def start_interfaces(self):
203         self.interface = self.start_interface(self.default_server)
204
205         for i in range(self.num_server):
206             self.start_random_interface()
207             
208
209     def start(self, wait=False):
210         self.start_interfaces()
211         threading.Thread.start(self)
212         if wait:
213             return self.wait_until_connected()
214
215     def wait_until_connected(self):
216         "wait until connection status is known"
217         if self.config.get('auto_cycle'): 
218             # self.random_server() returns None if all servers have been tried
219             while not self.is_connected() and self.random_server():
220                 time.sleep(0.1)
221         else:
222             self.interface.connect_event.wait()
223
224         return self.interface.is_connected
225
226
227     def set_parameters(self, host, port, protocol, proxy, auto_connect):
228
229         self.config.set_key('auto_cycle', auto_connect, True)
230         self.config.set_key("proxy", proxy, True)
231         self.config.set_key("protocol", protocol, True)
232         server = ':'.join([ host, port, protocol ])
233         self.config.set_key("server", server, True)
234
235         if self.proxy != proxy or self.protocol != protocol:
236             self.proxy = proxy
237             self.protocol = protocol
238             for i in self.interfaces.values(): i.stop()
239             if auto_connect:
240                 #self.interface = None
241                 return
242
243         if auto_connect:
244             if not self.interface.is_connected:
245                 self.switch_to_random_interface()
246             else:
247                 if self.server_lag > 0:
248                     self.stop_interface()
249         else:
250             self.set_server(server)
251
252
253     def switch_to_random_interface(self):
254         if self.interfaces:
255             self.switch_to_interface(random.choice(self.interfaces.values()))
256
257     def switch_to_interface(self, interface):
258         assert not self.interface.is_connected
259         server = interface.server
260         print_error("switching to", server)
261         self.interface = interface
262         h =  self.heights.get(server)
263         if h:
264             self.server_lag = self.blockchain.height() - h
265         self.config.set_key('server', server, False)
266         self.default_server = server
267         self.send_subscriptions()
268         self.trigger_callback('connected')
269
270
271     def stop_interface(self):
272         self.interface.stop() 
273
274
275     def set_server(self, server):
276         if self.default_server == server and self.interface.is_connected:
277             return
278
279         if self.protocol != server.split(':')[2]:
280             return
281
282         # stop the interface in order to terminate subscriptions
283         if self.interface.is_connected:
284             self.stop_interface()
285
286         # notify gui
287         self.trigger_callback('disconnecting')
288         # start interface
289         self.default_server = server
290         self.config.set_key("server", server, True)
291
292         if server in self.interfaces.keys():
293             self.switch_to_interface( self.interfaces[server] )
294         else:
295             self.interface = self.start_interface(server)
296         
297
298     def add_recent_server(self, i):
299         # list is ordered
300         s = i.server
301         if s in self.recent_servers:
302             self.recent_servers.remove(s)
303         self.recent_servers.insert(0,s)
304         self.recent_servers = self.recent_servers[0:20]
305         self.config.set_key('recent_servers', self.recent_servers)
306
307
308     def new_blockchain_height(self, blockchain_height, i):
309         if self.is_connected():
310             h = self.heights.get(self.interface.server)
311             if h:
312                 self.server_lag = blockchain_height - h
313                 if self.server_lag > 1:
314                     print_error( "Server is lagging", blockchain_height, h)
315                     if self.config.get('auto_cycle'):
316                         self.set_server(i.server)
317             else:
318                 print_error('no height for main interface')
319         
320         self.trigger_callback('updated')
321
322
323     def run(self):
324         self.blockchain.start()
325
326         with self.lock:
327             self.running = True
328
329         while self.is_running():
330             try:
331                 i = self.queue.get(timeout = 30 if self.interfaces else 3)
332             except Queue.Empty:
333                 if len(self.interfaces) < self.num_server:
334                     self.start_random_interface()
335                 continue
336
337             if i.server in self.pending_servers:
338                 self.pending_servers.remove(i.server)
339
340             if i.is_connected:
341                 #if i.server in self.interfaces: raise
342                 self.interfaces[i.server] = i
343                 self.add_recent_server(i)
344                 i.send([ ('blockchain.headers.subscribe',[])], self.on_header)
345                 if i == self.interface:
346                     print_error('sending subscriptions to', self.interface.server)
347                     self.send_subscriptions()
348                     self.trigger_callback('connected')
349             else:
350                 self.disconnected_servers.add(i.server)
351                 if i.server in self.interfaces:
352                     self.interfaces.pop(i.server)
353                 if i.server in self.heights:
354                     self.heights.pop(i.server)
355                 if i == self.interface:
356                     #self.interface = None
357                     self.trigger_callback('disconnected')
358
359             if not self.interface.is_connected and self.config.get('auto_cycle'):
360                 self.switch_to_random_interface()
361
362
363     def on_header(self, i, r):
364         result = r.get('result')
365         if not result:
366             return
367         height = result.get('block_height')
368         if not height:
369             return
370         self.heights[i.server] = height
371         self.merkle_roots[i.server] = result.get('merkle_root')
372         self.utxo_roots[i.server] = result.get('utxo_root')
373         # notify blockchain about the new height
374         self.blockchain.queue.put((i,result))
375
376         if i == self.interface:
377             self.server_lag = self.blockchain.height() - height
378             if self.server_lag > 1 and self.config.get('auto_cycle'):
379                 print_error( "Server lagging, stopping interface")
380                 self.stop_interface()
381
382             self.trigger_callback('updated')
383
384
385     def on_peers(self, i, r):
386         if not r: return
387         self.irc_servers = parse_servers(r.get('result'))
388         self.trigger_callback('peers')
389
390     def on_banner(self, i, r):
391         self.banner = r.get('result')
392         self.trigger_callback('banner')
393
394     def stop(self):
395         with self.lock: self.running = False
396
397     def is_running(self):
398         with self.lock: return self.running
399
400     
401     def synchronous_get(self, requests, timeout=100000000):
402         return self.interface.synchronous_get(requests)
403
404
405     def get_header(self, tx_height):
406         return self.blockchain.read_header(tx_height)
407
408     def get_local_height(self):
409         return self.blockchain.height()
410
411
412
413     #def retrieve_transaction(self, tx_hash, tx_height=0):
414     #    import transaction
415     #    r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0]
416     #    if r:
417     #        return transaction.Transaction(r)
418
419
420
421
422
423 if __name__ == "__main__":
424     network = NetworkProxy({})
425     network.start()
426     print network.get_servers()
427
428     q = Queue.Queue()
429     network.send([('blockchain.headers.subscribe',[])], q.put)
430     while True:
431         r = q.get(timeout=10000)
432         print r
433