fixes, resend_subscriptions
[electrum-nvc.git] / lib / network.py
1 import threading, time, Queue, os, sys, shutil, random
2 from util import user_dir, appdata_dir, print_error, print_msg
3 from bitcoin import *
4 import interface
5 from blockchain import Blockchain
6
7
8 class Network(threading.Thread):
9
10     def __init__(self, config):
11         threading.Thread.__init__(self)
12         self.daemon = True
13         self.config = config
14         self.lock = threading.Lock()
15         self.blockchain = Blockchain(config, self)
16         self.interfaces = {}
17         self.queue = Queue.Queue()
18         self.default_server = self.config.get('server')
19         self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
20         self.callbacks = {}
21
22
23     def register_callback(self, event, callback):
24         with self.lock:
25             if not self.callbacks.get(event):
26                 self.callbacks[event] = []
27             self.callbacks[event].append(callback)
28
29
30     def trigger_callback(self, event):
31         with self.lock:
32             callbacks = self.callbacks.get(event,[])[:]
33         if callbacks:
34             [callback() for callback in callbacks]
35
36
37     def random_server(self):
38         if len(self.servers_list) <= len(self.interfaces.keys()):
39             return
40         
41         while True:
42             server = random.choice( self.servers_list )
43             if server not in self.interfaces.keys(): break
44
45         return server
46
47
48     def start_interface(self, server):
49         if server in self.interfaces.keys():
50             return
51         i = interface.Interface({'server':server})
52         i.network = self # fixme
53         self.interfaces[server] = i
54         i.start(self.queue)
55
56     def start_random_interface(self):
57         server = self.random_server()
58         if server:
59             self.start_interface(server)
60
61     def start_interfaces(self):
62         if self.default_server:
63             self.start_interface(self.default_server)
64             self.interface = self.interfaces[self.default_server]
65
66         for i in range(8):
67             self.start_random_interface()
68             
69         if not self.interface:
70             self.interface = self.interfaces.values()[0]
71
72
73     def start(self, wait=False):
74         self.start_interfaces()
75         threading.Thread.start(self)
76         if wait:
77             self.interface.connect_event.wait()
78             return self.interface.is_connected
79
80
81     def set_server(self, server, proxy):
82         subscriptions = self.interface.subscriptions
83         self.default_server = server
84         self.start_interface(server)
85         self.interface = self.interfaces[server]
86         self.resend_subscriptions(subscriptions)
87         self.trigger_callback('disconnecting') # for actively disconnecting
88
89
90     def run(self):
91         self.blockchain.start()
92
93         with self.lock:
94             self.running = True
95
96         while self.is_running():
97             i = self.queue.get()
98
99             if i.is_connected:
100                 i.register_channel('verifier', self.blockchain.queue)
101                 i.register_channel('get_header')
102                 i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
103
104                 if i == self.interface:
105                     i.send([('server.banner',[])])
106                     i.send([('server.peers.subscribe',[])])
107             else:
108                 self.servers_list.remove(i.server)
109                 self.interfaces.pop(i.server)
110                 self.start_random_interface()
111                 
112                 if i == self.interface:
113                     if self.config.get('auto_cycle'):
114                         self.interface = random.choice(self.interfaces.values())
115                         self.config.set_key('server', self.interface.server, False)
116                     else:
117                         self.trigger_callback('disconnected')
118                 
119
120     def on_peers(self, result):
121         # populate servers list here
122         pass
123
124     def on_banner(self, result):
125         pass
126
127     def stop(self):
128         with self.lock: self.running = False
129
130     def is_running(self):
131         with self.lock: return self.running
132
133
134     def resend_subscriptions(self, subscriptions):
135         for channel, messages in subscriptions.items():
136             if messages:
137                 self.interface.send(messages, channel)
138
139
140
141
142 if __name__ == "__main__":
143     import simple_config
144     config = simple_config.SimpleConfig({'verbose':True})
145     network = Network(config)
146     network.start()
147
148     while 1:
149         time.sleep(1)
150
151
152