X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=lib%2Fnetwork.py;h=114edc62ae67a4a37ed17f3fae512c08ba5a9518;hb=HEAD;hp=82bdcb42b2973f5c8466e743b6e63d3bec523e63;hpb=ba9782eec66ec887ced6932d279c81a3f9048b06;p=electrum-nvc.git diff --git a/lib/network.py b/lib/network.py index 82bdcb4..114edc6 100644 --- a/lib/network.py +++ b/lib/network.py @@ -4,25 +4,46 @@ from bitcoin import * import interface from blockchain import Blockchain -DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} +DEFAULT_PORTS = {'t':'40001', 's':'40002', 'h':'7081', 'g':'7082'} DEFAULT_SERVERS = { - #'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.hachre.de': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - #'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'}, - 'electrum.drollette.com': {'h': '5000', 's': '50002', 't': '50001', 'g': '8082'}, - 'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'}, - 'btc.medoix.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.stupidfoot.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - #'electrum.pdmc.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}, - 'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'} +# '127.0.0.1': DEFAULT_PORTS, + '193.23.181.148': DEFAULT_PORTS, + '91.235.143.61': DEFAULT_PORTS, } -NUM_SERVERS = 8 +def parse_servers(result): + """ parse servers list into dict format""" + from version import PROTOCOL_VERSION + servers = {} + for item in result: + host = item[1] + out = {} + version = None + pruning_level = '-' + if len(item) > 2: + for v in item[2]: + if re.match("[stgh]\d*", v): + protocol, port = v[0], v[1:] + if port == '': port = DEFAULT_PORTS[protocol] + out[protocol] = port + elif re.match("v(.?)+", v): + version = v[1:] + elif re.match("p\d*", v): + pruning_level = v[1:] + if pruning_level == '': pruning_level = '0' + try: + is_recent = float(version)>=float(PROTOCOL_VERSION) + except Exception: + is_recent = False + + if out and is_recent: + out['pruning'] = pruning_level + servers[host] = out + + return servers + def filter_protocol(servers, p): @@ -40,16 +61,20 @@ from simple_config import SimpleConfig class Network(threading.Thread): - def __init__(self, config = {}): + def __init__(self, config=None): + if config is None: + config = {} # Do not use mutables as default values! threading.Thread.__init__(self) self.daemon = True self.config = SimpleConfig(config) if type(config) == type({}) else config self.lock = threading.Lock() + self.num_server = 8 if not self.config.get('oneserver') else 0 self.blockchain = Blockchain(self.config, self) self.interfaces = {} self.queue = Queue.Queue() self.callbacks = {} self.protocol = self.config.get('protocol','s') + self.running = False # Server for addresses and transactions self.default_server = self.config.get('server') @@ -57,13 +82,16 @@ class Network(threading.Thread): self.default_server = pick_random_server(self.protocol) self.irc_servers = [] # returned by interface (list from irc) - self.disconnected_servers = [] + self.pending_servers = set([]) + self.disconnected_servers = set([]) self.recent_servers = self.config.get('recent_servers',[]) # successful connections self.banner = '' self.interface = None self.proxy = self.config.get('proxy') self.heights = {} + self.merkle_roots = {} + self.utxo_roots = {} self.server_lag = 0 dir_path = os.path.join( self.config.path, 'certs') @@ -74,12 +102,21 @@ class Network(threading.Thread): self.subscriptions = {} self.subscriptions[self.on_banner] = [('server.banner',[])] self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])] + self.pending_transactions_for_notifications = [] def is_connected(self): return self.interface and self.interface.is_connected + def is_up_to_date(self): + return self.interface.is_up_to_date() + + + def main_server(self): + return self.interface.server + + def send_subscriptions(self): for cb, sub in self.subscriptions.items(): self.interface.send(sub, cb) @@ -93,8 +130,16 @@ class Network(threading.Thread): if message not in self.subscriptions[callback]: self.subscriptions[callback].append(message) - if self.interface and self.interface.is_connected: + if self.is_connected(): + self.interface.send( messages, callback ) + + + def send(self, messages, callback): + if self.is_connected(): self.interface.send( messages, callback ) + return True + else: + return False def register_callback(self, event, callback): @@ -115,7 +160,7 @@ class Network(threading.Thread): choice_list = [] l = filter_protocol(self.get_servers(), self.protocol) for s in l: - if s in self.disconnected_servers or s in self.interfaces.keys(): + if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys(): continue else: choice_list.append(s) @@ -123,7 +168,7 @@ class Network(threading.Thread): if not choice_list: if not self.interfaces: # we are probably offline, retry later - self.disconnected_servers = [] + self.disconnected_servers = set([]) return server = random.choice( choice_list ) @@ -131,19 +176,23 @@ class Network(threading.Thread): def get_servers(self): - out = self.irc_servers if self.irc_servers else DEFAULT_SERVERS - for s in self.recent_servers: - host, port, protocol = s.split(':') - if host not in out: - out[host] = { protocol:port } + if self.irc_servers: + out = self.irc_servers + else: + out = DEFAULT_SERVERS + for s in self.recent_servers: + host, port, protocol = s.split(':') + if host not in out: + out[host] = { protocol:port } return out def start_interface(self, server): if server in self.interfaces.keys(): return i = interface.Interface(server, self.config) - self.interfaces[server] = i + self.pending_servers.add(server) i.start(self.queue) + return i def start_random_interface(self): server = self.random_server() @@ -151,28 +200,28 @@ class Network(threading.Thread): self.start_interface(server) def start_interfaces(self): - self.start_interface(self.default_server) - self.interface = self.interfaces[self.default_server] + self.interface = self.start_interface(self.default_server) - for i in range(NUM_SERVERS): + for i in range(self.num_server): self.start_random_interface() - if not self.interface: - self.interface = self.interfaces.values()[0] - def start(self, wait=False): self.start_interfaces() threading.Thread.start(self) if wait: - self.interface.connect_event.wait() - return self.interface.is_connected - + return self.wait_until_connected() def wait_until_connected(self): - while not self.interface: - time.sleep(1) - self.interface.connect_event.wait() + "wait until connection status is known" + if self.config.get('auto_cycle'): + # self.random_server() returns None if all servers have been tried + while not self.is_connected() and self.random_server(): + time.sleep(0.1) + else: + self.interface.connect_event.wait() + + return self.interface.is_connected def set_parameters(self, host, port, protocol, proxy, auto_connect): @@ -188,11 +237,11 @@ class Network(threading.Thread): self.protocol = protocol for i in self.interfaces.values(): i.stop() if auto_connect: - self.interface = None + #self.interface = None return if auto_connect: - if not self.interface: + if not self.interface.is_connected: self.switch_to_random_interface() else: if self.server_lag > 0: @@ -206,7 +255,7 @@ class Network(threading.Thread): self.switch_to_interface(random.choice(self.interfaces.values())) def switch_to_interface(self, interface): - assert self.interface is None + assert not self.interface.is_connected server = interface.server print_error("switching to", server) self.interface = interface @@ -221,17 +270,17 @@ class Network(threading.Thread): def stop_interface(self): self.interface.stop() - self.interface = None + def set_server(self, server): - if self.default_server == server and self.interface: + if self.default_server == server and self.interface.is_connected: return if self.protocol != server.split(':')[2]: return # stop the interface in order to terminate subscriptions - if self.interface: + if self.interface.is_connected: self.stop_interface() # notify gui @@ -243,8 +292,7 @@ class Network(threading.Thread): if server in self.interfaces.keys(): self.switch_to_interface( self.interfaces[server] ) else: - self.start_interface(server) - self.interface = self.interfaces[server] + self.interface = self.start_interface(server) def add_recent_server(self, i): @@ -282,11 +330,16 @@ class Network(threading.Thread): try: i = self.queue.get(timeout = 30 if self.interfaces else 3) except Queue.Empty: - if len(self.interfaces) < NUM_SERVERS: + if len(self.interfaces) < self.num_server: self.start_random_interface() continue + if i.server in self.pending_servers: + self.pending_servers.remove(i.server) + if i.is_connected: + #if i.server in self.interfaces: raise + self.interfaces[i.server] = i self.add_recent_server(i) i.send([ ('blockchain.headers.subscribe',[])], self.on_header) if i == self.interface: @@ -294,23 +347,29 @@ class Network(threading.Thread): self.send_subscriptions() self.trigger_callback('connected') else: - self.disconnected_servers.append(i.server) - self.interfaces.pop(i.server) + self.disconnected_servers.add(i.server) + if i.server in self.interfaces: + self.interfaces.pop(i.server) if i.server in self.heights: self.heights.pop(i.server) if i == self.interface: - self.interface = None + #self.interface = None self.trigger_callback('disconnected') - if self.interface is None and self.config.get('auto_cycle'): + if not self.interface.is_connected and self.config.get('auto_cycle'): self.switch_to_random_interface() def on_header(self, i, r): result = r.get('result') - if not result: return + if not result: + return height = result.get('block_height') + if not height: + return self.heights[i.server] = height + self.merkle_roots[i.server] = result.get('merkle_root') + self.utxo_roots[i.server] = result.get('utxo_root') # notify blockchain about the new height self.blockchain.queue.put((i,result)) @@ -325,7 +384,7 @@ class Network(threading.Thread): def on_peers(self, i, r): if not r: return - self.irc_servers = self.parse_servers(r.get('result')) + self.irc_servers = parse_servers(r.get('result')) self.trigger_callback('peers') def on_banner(self, i, r): @@ -340,71 +399,35 @@ class Network(threading.Thread): def synchronous_get(self, requests, timeout=100000000): - queue = Queue.Queue() - ids = self.interface.send(requests, lambda i,r: queue.put(r)) - id2 = ids[:] - res = {} - while ids: - r = queue.get(True, timeout) - _id = r.get('id') - if _id in ids: - ids.remove(_id) - res[_id] = r.get('result') - out = [] - for _id in id2: - out.append(res[_id]) - return out + return self.interface.synchronous_get(requests) - def retrieve_transaction(self, tx_hash, tx_height=0): - import transaction - r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0] - if r: - return transaction.Transaction(r) + def get_header(self, tx_height): + return self.blockchain.read_header(tx_height) + def get_local_height(self): + return self.blockchain.height() - def parse_servers(self, result): - """ parse servers list into dict format""" - from version import PROTOCOL_VERSION - servers = {} - for item in result: - host = item[1] - out = {} - version = None - pruning_level = '-' - if len(item) > 2: - for v in item[2]: - if re.match("[stgh]\d*", v): - protocol, port = v[0], v[1:] - if port == '': port = DEFAULT_PORTS[protocol] - out[protocol] = port - elif re.match("v(.?)+", v): - version = v[1:] - elif re.match("p\d*", v): - pruning_level = v[1:] - if pruning_level == '': pruning_level = '0' - try: - is_recent = float(version)>=float(PROTOCOL_VERSION) - except: - is_recent = False - if out and is_recent: - out['pruning'] = pruning_level - servers[host] = out - return servers + #def retrieve_transaction(self, tx_hash, tx_height=0): + # import transaction + # r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0] + # if r: + # return transaction.Transaction(r) + if __name__ == "__main__": - import simple_config - config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'}) - network = Network(config) + network = NetworkProxy({}) network.start() + print network.get_servers() - while 1: - time.sleep(1) - - + q = Queue.Queue() + network.send([('blockchain.headers.subscribe',[])], q.put) + while True: + r = q.get(timeout=10000) + print r