move synchronous_get to network.py, fix get_balance script
[electrum-nvc.git] / lib / network.py
index bf75137..02cf727 100644 (file)
@@ -1,8 +1,40 @@
+import threading, time, Queue, os, sys, shutil, random
+from util import user_dir, appdata_dir, print_error, print_msg
+from bitcoin import *
 import interface
 from blockchain import Blockchain
-import threading, time, Queue, os, sys, shutil
-from util import user_dir, appdata_dir, print_error
-from bitcoin import *
+
+DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
+
+DEFAULT_SERVERS = {
+    'the9ull.homelinux.org': {'h': '8082', 't': '50001'},
+    'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.hachre.de': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.mooo.com': {'h': '8081', 't': '50001'},
+    'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'},
+    'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'},
+    'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'},
+    'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}
+}
+
+
+
+def filter_protocol(servers, p):
+    l = []
+    for k, protocols in servers.items():
+        if p in protocols:
+            l.append( ':'.join([k, protocols[p], p]) )
+    return l
+    
+
+def pick_random_server():
+    return random.choice( filter_protocol(DEFAULT_SERVERS,'s') )
 
 
 class Network(threading.Thread):
@@ -12,12 +44,44 @@ class Network(threading.Thread):
         self.daemon = True
         self.config = config
         self.lock = threading.Lock()
-        self.blockchain = Blockchain(config)
+        self.blockchain = Blockchain(config, self)
         self.interfaces = {}
         self.queue = Queue.Queue()
         self.default_server = self.config.get('server')
-        self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
+        self.disconnected_servers = []
         self.callbacks = {}
+        self.servers = []
+        self.banner = ''
+        self.interface = None
+        self.proxy = self.config.get('proxy')
+        self.heights = {}
+
+        dir_path = os.path.join( self.config.path, 'certs')
+        if not os.path.exists(dir_path):
+            os.mkdir(dir_path)
+
+
+        # default subscriptions
+        self.subscriptions = {}
+        self.subscriptions[self.on_banner] = [('server.banner',[])]
+        self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])]
+
+
+    def send_subscriptions(self):
+        for cb, sub in self.subscriptions.items():
+            self.interface.send(sub, cb)
+
+
+    def subscribe(self, messages, callback):
+        with self.lock:
+            if self.subscriptions.get(callback) is None: 
+                self.subscriptions[callback] = []
+            for message in messages:
+                if message not in self.subscriptions[callback]:
+                    self.subscriptions[callback].append(message)
+
+        if self.interface and self.interface.is_connected:
+            self.interface.send( messages, callback )
 
 
     def register_callback(self, event, callback):
@@ -34,25 +98,53 @@ class Network(threading.Thread):
             [callback() for callback in callbacks]
 
 
-    def start_interfaces(self):
+    def random_server(self):
+        choice_list = []
+        l = filter_protocol(self.get_servers(), 's')
+        for s in l:
+            if s in self.disconnected_servers or s in self.interfaces.keys():
+                continue
+            else:
+                choice_list.append(s)
+        
+        if not choice_list: return
+        
+        server = random.choice( choice_list )
+        return server
 
-        for server in self.servers_list:
-            self.interfaces[server] = interface.Interface({'server':server})
 
-        for i in self.interfaces.values():
-            i.network = self # fixme
-            i.start(self.queue)
+    def get_servers(self):
+        if not self.servers:
+            return DEFAULT_SERVERS
+        else:
+            return self.servers
+
 
+    def start_interface(self, server):
+        if server in self.interfaces.keys():
+            return
+        i = interface.Interface({'server':server, 'path':self.config.path, 'proxy':self.proxy})
+        self.interfaces[server] = i
+        i.start(self.queue)
+
+    def start_random_interface(self):
+        server = self.random_server()
+        if server:
+            self.start_interface(server)
+
+    def start_interfaces(self):
         if self.default_server:
-            self.interface = interface.Interface({'server':self.default_server})
-            self.interface.network = self # fixme
-            self.interface.start(self.queue)
-        else:
-            self.interface = self.interfaces[0]
+            self.start_interface(self.default_server)
+            self.interface = self.interfaces[self.default_server]
 
+        for i in range(8):
+            self.start_random_interface()
+            
+        if not self.interface:
+            self.interface = self.interfaces.values()[0]
 
-    def start(self, wait=False):
 
+    def start(self, wait=False):
         self.start_interfaces()
         threading.Thread.start(self)
         if wait:
@@ -60,6 +152,31 @@ class Network(threading.Thread):
             return self.interface.is_connected
 
 
+    def set_proxy(self, proxy):
+        self.proxy = proxy
+
+
+    def set_server(self, server):
+        if self.default_server == server:
+            return
+
+        # stop the interface in order to terminate subscriptions
+        self.interface.stop() 
+        # notify gui
+        self.trigger_callback('disconnecting')
+        # start interface
+        self.default_server = server
+
+        if server in self.interfaces.keys():
+            self.interface = self.interfaces[server]
+            self.send_subscriptions()
+        else:
+            self.start_interface(server)
+            self.interface = self.interfaces[server]
+        
+
+
+
 
     def run(self):
         self.blockchain.start()
@@ -71,28 +188,36 @@ class Network(threading.Thread):
             i = self.queue.get()
 
             if i.is_connected:
-                i.register_channel('verifier', self.blockchain.queue)
-                i.register_channel('get_header')
-                i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
+                i.send([ ('blockchain.headers.subscribe',[])], self.on_header)
                 if i == self.interface:
-                    i.send([('server.banner',[])])
-                    i.send([('server.peers.subscribe',[])])
+                    self.send_subscriptions()
+                    self.trigger_callback('connected')
             else:
+                self.disconnected_servers.append(i.server)
                 self.interfaces.pop(i.server)
+                self.start_random_interface()
+                
                 if i == self.interface:
-                    if self.default_server is None:
-                        print_msg("Using random server...")
-                        server = random.choice( self.servers_list )
-                        self.interface = interface.Interface({'server':self.default_server})
+                    if self.config.get('auto_cycle'):
+                        self.interface = random.choice(self.interfaces.values())
+                        self.config.set_key('server', self.interface.server, False)
                     else:
-                        #i.trigger_callback('disconnected')
-                        pass
-
-    def on_peers(self, resut):
-        pass
-
-    def on_banner(self, result):
-        pass
+                        self.trigger_callback('disconnected')
+                
+    def on_header(self, i, r):
+        result = r.get('result')
+        if not result: return
+        self.heights[i.server] = result.get('block_height')
+        self.blockchain.queue.put((i,result))
+
+    def on_peers(self, i, r):
+        if not r: return
+        self.servers = self.parse_servers(r.get('result'))
+        self.trigger_callback('peers')
+
+    def on_banner(self, i, r):
+        self.banner = r.get('result')
+        self.trigger_callback('banner')
 
     def stop(self):
         with self.lock: self.running = False
@@ -100,38 +225,68 @@ class Network(threading.Thread):
     def is_running(self):
         with self.lock: return self.running
 
-
-    def resend_subscriptions(self):
-        for channel, messages in self.subscriptions.items():
-            if messages:
-                self.send(messages, channel)
-
-
-    def auto_cycle(self):
-        if not self.is_connected and self.config.get('auto_cycle'):
-            print_msg("Using random server...")
-            servers = filter_protocol(DEFAULT_SERVERS, 's')
-            while servers:
-                server = random.choice( servers )
-                servers.remove(server)
-                print server
-                self.config.set_key('server', server, False)
-                self.init_with_server(self.config)
-                if self.is_connected: break
-
-            if not self.is_connected:
-                print 'no server available'
-                self.connect_event.set() # to finish start
-                self.server = 'ecdsa.org:50001:t'
-                self.proxy = None
-                return
+    
+    def synchronous_get(self, requests, timeout=100000000):
+        queue = Queue.Queue()
+        ids = self.interface.send(requests, lambda i,r: queue.put(r))
+        id2 = ids[:]
+        res = {}
+        while ids:
+            r = queue.get(True, timeout)
+            _id = r.get('id')
+            if _id in ids:
+                ids.remove(_id)
+                res[_id] = r.get('result')
+        out = []
+        for _id in id2:
+            out.append(res[_id])
+        return out
+
+
+    def retrieve_transaction(self, tx_hash, tx_height=0):
+        import transaction
+        r = self.synchronous_get([ ('blockchain.transaction.get',[tx_hash, tx_height]) ])[0]
+        if r:
+            return transaction.Transaction(r)
+
+
+    def parse_servers(self, result):
+        """ parse servers list into dict format"""
+        from version import PROTOCOL_VERSION
+        servers = {}
+        for item in result:
+            host = item[1]
+            out = {}
+            version = None
+            pruning_level = '-'
+            if len(item) > 2:
+                for v in item[2]:
+                    if re.match("[stgh]\d*", v):
+                        protocol, port = v[0], v[1:]
+                        if port == '': port = DEFAULT_PORTS[protocol]
+                        out[protocol] = port
+                    elif re.match("v(.?)+", v):
+                        version = v[1:]
+                    elif re.match("p\d*", v):
+                        pruning_level = v[1:]
+                    if pruning_level == '': pruning_level = '0'
+            try: 
+                is_recent = float(version)>=float(PROTOCOL_VERSION)
+            except:
+                is_recent = False
+
+            if out and is_recent:
+                out['pruning'] = pruning_level
+                servers[host] = out
+
+        return servers
 
 
 
 
 if __name__ == "__main__":
     import simple_config
-    config = simple_config.SimpleConfig({'verbose':True})
+    config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'})
     network = Network(config)
     network.start()