X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=lib%2Fnetwork.py;h=114edc62ae67a4a37ed17f3fae512c08ba5a9518;hb=HEAD;hp=7583a70dcca3dc67d68b091f5b8ee48158657d55;hpb=40e393187a96c8504b0c5e364af1e45251b36547;p=electrum-nvc.git diff --git a/lib/network.py b/lib/network.py index 7583a70..114edc6 100644 --- a/lib/network.py +++ b/lib/network.py @@ -4,26 +4,47 @@ from bitcoin import * import interface from blockchain import Blockchain -DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} +DEFAULT_PORTS = {'t':'40001', 's':'40002', 'h':'7081', 'g':'7082'} DEFAULT_SERVERS = { - 'the9ull.homelinux.org': {'h': '8082', 't': '50001'}, - 'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.mooo.com': {'h': '8081', 't': '50001'}, - 'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'}, - 'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'}, - 'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'}, - 'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'} +# '127.0.0.1': DEFAULT_PORTS, + '193.23.181.148': DEFAULT_PORTS, + '91.235.143.61': DEFAULT_PORTS, } +def parse_servers(result): + """ parse servers list into dict format""" + from version import PROTOCOL_VERSION + servers = {} + for item in result: + host = item[1] + out = {} + version = None + pruning_level = '-' + if len(item) > 2: + for v in item[2]: + if re.match("[stgh]\d*", v): + protocol, port = v[0], v[1:] + if port == '': port = DEFAULT_PORTS[protocol] + out[protocol] = port + elif re.match("v(.?)+", v): + version = v[1:] + elif re.match("p\d*", v): + pruning_level = v[1:] + if pruning_level == '': pruning_level = '0' + try: + is_recent = float(version)>=float(PROTOCOL_VERSION) + except Exception: + is_recent = False + + if out and is_recent: + out['pruning'] = pruning_level + servers[host] = out + + return servers + + def filter_protocol(servers, p): l = [] @@ -33,38 +54,67 @@ def filter_protocol(servers, p): return l -def pick_random_server(): - return random.choice( filter_protocol(DEFAULT_SERVERS,'s') ) +def pick_random_server(p='s'): + return random.choice( filter_protocol(DEFAULT_SERVERS,p) ) +from simple_config import SimpleConfig class Network(threading.Thread): - def __init__(self, config): + def __init__(self, config=None): + if config is None: + config = {} # Do not use mutables as default values! threading.Thread.__init__(self) self.daemon = True - self.config = config + self.config = SimpleConfig(config) if type(config) == type({}) else config self.lock = threading.Lock() - self.blockchain = Blockchain(config, self) + self.num_server = 8 if not self.config.get('oneserver') else 0 + self.blockchain = Blockchain(self.config, self) self.interfaces = {} self.queue = Queue.Queue() - self.default_server = self.config.get('server') - self.disconnected_servers = [] self.callbacks = {} - self.servers = [] + self.protocol = self.config.get('protocol','s') + self.running = False + + # Server for addresses and transactions + self.default_server = self.config.get('server') + if not self.default_server: + self.default_server = pick_random_server(self.protocol) + + self.irc_servers = [] # returned by interface (list from irc) + self.pending_servers = set([]) + self.disconnected_servers = set([]) + self.recent_servers = self.config.get('recent_servers',[]) # successful connections + self.banner = '' self.interface = None self.proxy = self.config.get('proxy') self.heights = {} + self.merkle_roots = {} + self.utxo_roots = {} + self.server_lag = 0 dir_path = os.path.join( self.config.path, 'certs') if not os.path.exists(dir_path): os.mkdir(dir_path) - # default subscriptions self.subscriptions = {} self.subscriptions[self.on_banner] = [('server.banner',[])] self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])] + self.pending_transactions_for_notifications = [] + + + def is_connected(self): + return self.interface and self.interface.is_connected + + + def is_up_to_date(self): + return self.interface.is_up_to_date() + + + def main_server(self): + return self.interface.server def send_subscriptions(self): @@ -80,10 +130,18 @@ class Network(threading.Thread): if message not in self.subscriptions[callback]: self.subscriptions[callback].append(message) - if self.interface and self.interface.is_connected: + if self.is_connected(): self.interface.send( messages, callback ) + def send(self, messages, callback): + if self.is_connected(): + self.interface.send( messages, callback ) + return True + else: + return False + + def register_callback(self, event, callback): with self.lock: if not self.callbacks.get(event): @@ -100,32 +158,41 @@ class Network(threading.Thread): def random_server(self): choice_list = [] - l = filter_protocol(self.get_servers(), 's') + l = filter_protocol(self.get_servers(), self.protocol) for s in l: - if s in self.disconnected_servers or s in self.interfaces.keys(): + if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys(): continue else: choice_list.append(s) - if not choice_list: return + if not choice_list: + if not self.interfaces: + # we are probably offline, retry later + self.disconnected_servers = set([]) + return server = random.choice( choice_list ) return server def get_servers(self): - if not self.servers: - return DEFAULT_SERVERS + if self.irc_servers: + out = self.irc_servers else: - return self.servers - + out = DEFAULT_SERVERS + for s in self.recent_servers: + host, port, protocol = s.split(':') + if host not in out: + out[host] = { protocol:port } + return out def start_interface(self, server): if server in self.interfaces.keys(): return - i = interface.Interface({'server':server, 'path':self.config.path, 'proxy':self.proxy}) - self.interfaces[server] = i + i = interface.Interface(server, self.config) + self.pending_servers.add(server) i.start(self.queue) + return i def start_random_interface(self): server = self.random_server() @@ -133,49 +200,124 @@ class Network(threading.Thread): self.start_interface(server) def start_interfaces(self): - if self.default_server: - self.start_interface(self.default_server) - self.interface = self.interfaces[self.default_server] + self.interface = self.start_interface(self.default_server) - for i in range(8): + for i in range(self.num_server): self.start_random_interface() - if not self.interface: - self.interface = self.interfaces.values()[0] - def start(self, wait=False): self.start_interfaces() threading.Thread.start(self) if wait: + return self.wait_until_connected() + + def wait_until_connected(self): + "wait until connection status is known" + if self.config.get('auto_cycle'): + # self.random_server() returns None if all servers have been tried + while not self.is_connected() and self.random_server(): + time.sleep(0.1) + else: self.interface.connect_event.wait() - return self.interface.is_connected + return self.interface.is_connected + + + def set_parameters(self, host, port, protocol, proxy, auto_connect): - def set_proxy(self, proxy): - self.proxy = proxy + self.config.set_key('auto_cycle', auto_connect, True) + self.config.set_key("proxy", proxy, True) + self.config.set_key("protocol", protocol, True) + server = ':'.join([ host, port, protocol ]) + self.config.set_key("server", server, True) + + if self.proxy != proxy or self.protocol != protocol: + self.proxy = proxy + self.protocol = protocol + for i in self.interfaces.values(): i.stop() + if auto_connect: + #self.interface = None + return + + if auto_connect: + if not self.interface.is_connected: + self.switch_to_random_interface() + else: + if self.server_lag > 0: + self.stop_interface() + else: + self.set_server(server) + + + def switch_to_random_interface(self): + if self.interfaces: + self.switch_to_interface(random.choice(self.interfaces.values())) + + def switch_to_interface(self, interface): + assert not self.interface.is_connected + server = interface.server + print_error("switching to", server) + self.interface = interface + h = self.heights.get(server) + if h: + self.server_lag = self.blockchain.height() - h + self.config.set_key('server', server, False) + self.default_server = server + self.send_subscriptions() + self.trigger_callback('connected') + + + def stop_interface(self): + self.interface.stop() def set_server(self, server): - if self.default_server == server: + if self.default_server == server and self.interface.is_connected: + return + + if self.protocol != server.split(':')[2]: return # stop the interface in order to terminate subscriptions - self.interface.stop() + if self.interface.is_connected: + self.stop_interface() + # notify gui self.trigger_callback('disconnecting') # start interface self.default_server = server + self.config.set_key("server", server, True) if server in self.interfaces.keys(): - self.interface = self.interfaces[server] - self.send_subscriptions() + self.switch_to_interface( self.interfaces[server] ) else: - self.start_interface(server) - self.interface = self.interfaces[server] + self.interface = self.start_interface(server) - + def add_recent_server(self, i): + # list is ordered + s = i.server + if s in self.recent_servers: + self.recent_servers.remove(s) + self.recent_servers.insert(0,s) + self.recent_servers = self.recent_servers[0:20] + self.config.set_key('recent_servers', self.recent_servers) + + + def new_blockchain_height(self, blockchain_height, i): + if self.is_connected(): + h = self.heights.get(self.interface.server) + if h: + self.server_lag = blockchain_height - h + if self.server_lag > 1: + print_error( "Server is lagging", blockchain_height, h) + if self.config.get('auto_cycle'): + self.set_server(i.server) + else: + print_error('no height for main interface') + + self.trigger_callback('updated') def run(self): @@ -185,34 +327,64 @@ class Network(threading.Thread): self.running = True while self.is_running(): - i = self.queue.get() + try: + i = self.queue.get(timeout = 30 if self.interfaces else 3) + except Queue.Empty: + if len(self.interfaces) < self.num_server: + self.start_random_interface() + continue + + if i.server in self.pending_servers: + self.pending_servers.remove(i.server) if i.is_connected: + #if i.server in self.interfaces: raise + self.interfaces[i.server] = i + self.add_recent_server(i) i.send([ ('blockchain.headers.subscribe',[])], self.on_header) if i == self.interface: + print_error('sending subscriptions to', self.interface.server) self.send_subscriptions() self.trigger_callback('connected') else: - self.disconnected_servers.append(i.server) - self.interfaces.pop(i.server) - self.start_random_interface() - + self.disconnected_servers.add(i.server) + if i.server in self.interfaces: + self.interfaces.pop(i.server) + if i.server in self.heights: + self.heights.pop(i.server) if i == self.interface: - if self.config.get('auto_cycle'): - self.interface = random.choice(self.interfaces.values()) - self.config.set_key('server', self.interface.server, False) - else: - self.trigger_callback('disconnected') - + #self.interface = None + self.trigger_callback('disconnected') + + if not self.interface.is_connected and self.config.get('auto_cycle'): + self.switch_to_random_interface() + + def on_header(self, i, r): result = r.get('result') - if not result: return - self.heights[i.server] = result.get('block_height') + if not result: + return + height = result.get('block_height') + if not height: + return + self.heights[i.server] = height + self.merkle_roots[i.server] = result.get('merkle_root') + self.utxo_roots[i.server] = result.get('utxo_root') + # notify blockchain about the new height self.blockchain.queue.put((i,result)) + if i == self.interface: + self.server_lag = self.blockchain.height() - height + if self.server_lag > 1 and self.config.get('auto_cycle'): + print_error( "Server lagging, stopping interface") + self.stop_interface() + + self.trigger_callback('updated') + + def on_peers(self, i, r): if not r: return - self.servers = self.parse_servers(r.get('result')) + self.irc_servers = parse_servers(r.get('result')) self.trigger_callback('peers') def on_banner(self, i, r): @@ -226,55 +398,36 @@ class Network(threading.Thread): with self.lock: return self.running - def retrieve_transaction(self, tx_hash, tx_height=0): - import transaction - r = self.interface.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0] - if r: - return transaction.Transaction(r) - - - def parse_servers(self, result): - """ parse servers list into dict format""" - from version import PROTOCOL_VERSION - servers = {} - for item in result: - host = item[1] - out = {} - version = None - pruning_level = '-' - if len(item) > 2: - for v in item[2]: - if re.match("[stgh]\d*", v): - protocol, port = v[0], v[1:] - if port == '': port = DEFAULT_PORTS[protocol] - out[protocol] = port - elif re.match("v(.?)+", v): - version = v[1:] - elif re.match("p\d*", v): - pruning_level = v[1:] - if pruning_level == '': pruning_level = '0' - try: - is_recent = float(version)>=float(PROTOCOL_VERSION) - except: - is_recent = False - - if out and is_recent: - out['pruning'] = pruning_level - servers[host] = out - - return servers + def synchronous_get(self, requests, timeout=100000000): + return self.interface.synchronous_get(requests) + def get_header(self, tx_height): + return self.blockchain.read_header(tx_height) + def get_local_height(self): + return self.blockchain.height() -if __name__ == "__main__": - import simple_config - config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'}) - network = Network(config) - network.start() - while 1: - time.sleep(1) + #def retrieve_transaction(self, tx_hash, tx_height=0): + # import transaction + # r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0] + # if r: + # return transaction.Transaction(r) + + + + + +if __name__ == "__main__": + network = NetworkProxy({}) + network.start() + print network.get_servers() + q = Queue.Queue() + network.send([('blockchain.headers.subscribe',[])], q.put) + while True: + r = q.get(timeout=10000) + print r