big refactoring of the interface
authorThomasV <thomasv@gitorious>
Sun, 21 Oct 2012 00:57:31 +0000 (02:57 +0200)
committerThomasV <thomasv@gitorious>
Sun, 21 Oct 2012 20:55:16 +0000 (22:55 +0200)
addition of the wallet verifier class for SPV

electrum
lib/__init__.py
lib/gui_lite.py
lib/gui_qt.py
lib/interface.py
lib/wallet.py
scripts/blocks
scripts/peers
scripts/servers

index 267baab..4e842f4 100755 (executable)
--- a/electrum
+++ b/electrum
@@ -36,9 +36,9 @@ except ImportError:
     sys.exit("Error: AES does not seem to be installed. Try 'sudo pip install slowaes'")
 
 try:
-    from lib import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
+    from lib import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
 except ImportError:
-    from electrum import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server
+    from electrum import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server
 
 from decimal import Decimal
 
@@ -185,8 +185,11 @@ if __name__ == '__main__':
             sys.exit("Error: Unknown GUI: " + pref_gui )
 
         gui = gui.ElectrumGui(wallet, config)
-        interface = WalletSynchronizer(wallet, config, True, gui.server_list_changed)
-        interface.start()
+        wallet.interface = Interface(config, True, gui.server_list_changed)
+        wallet.interface.start()
+
+        WalletSynchronizer(wallet, config).start()
+        WalletVerifier(wallet, config).start()
 
         try:
             found = config.wallet_file_exists
index 70fd32f..258a661 100644 (file)
@@ -1,4 +1,5 @@
-from wallet import Wallet, format_satoshis
-from interface import WalletSynchronizer, Interface, pick_random_server, DEFAULT_SERVERS
+from util import format_satoshis
+from wallet import Wallet, WalletSynchronizer, WalletVerifier
+from interface import Interface, pick_random_server, DEFAULT_SERVERS
 from simple_config import SimpleConfig
 import bitcoin
index ed13117..02eefcc 100644 (file)
@@ -800,7 +800,7 @@ class MiniDriver(QObject):
         self.wallet = wallet
         self.window = window
 
-        self.wallet.register_callback(self.update_callback)
+        self.wallet.interface.register_callback(self.update_callback)
 
         self.state = None
 
index f93ee3c..38b65b3 100644 (file)
@@ -207,7 +207,7 @@ class ElectrumWindow(QMainWindow):
         QMainWindow.__init__(self)
         self.wallet = wallet
         self.config = config
-        self.wallet.register_callback(self.update_callback)
+        self.wallet.interface.register_callback(self.update_callback)
 
         self.detailed_view = config.get('qt_detailed_view', False)
 
@@ -1577,7 +1577,7 @@ class ElectrumGui:
             wallet.init_mpk( wallet.seed )
             wallet.up_to_date_event.clear()
             wallet.up_to_date = False
-            wallet.interface.poke()
+            wallet.interface.poke('synchronizer')
             waiting_dialog(waiting)
             # run a dialog indicating the seed, ask the user to remember it
             ElectrumWindow.show_seed_dialog(wallet)
@@ -1589,7 +1589,7 @@ class ElectrumGui:
             wallet.init_mpk( wallet.seed )
             wallet.up_to_date_event.clear()
             wallet.up_to_date = False
-            wallet.interface.poke()
+            wallet.interface.poke('synchronizer')
             waiting_dialog(waiting)
             if wallet.is_found():
                 # history and addressbook
index c1f243a..729f037 100644 (file)
@@ -28,11 +28,11 @@ DEFAULT_TIMEOUT = 5
 DEFAULT_SERVERS = [ 
     'electrum.novit.ro:50001:t', 
     'electrum.pdmc.net:50001:t',
-    #'ecdsa.org:50002:s',
+    'ecdsa.org:50001:t',
     'electrum.bitcoins.sk:50001:t',
     'uncle-enzo.info:50001:t',
     'electrum.bytesized-hosting.com:50001:t',
-    'california.stratum.bitcoin.cz:50001:t',
+    'electrum.bitcoin.cz:50001:t',
     'electrum.bitfoo.org:50001:t'
     ]
 
@@ -42,24 +42,22 @@ proxy_modes = ['socks4', 'socks5', 'http']
 def pick_random_server():
     return random.choice( DEFAULT_SERVERS )
 
-def pick_random_interface(config):
-    servers = DEFAULT_SERVERS
-    while servers:
-        server = random.choice( servers )
-        servers.remove(server)
-        config.set_key('server', server, False)
-        i = Interface(config)
-        if i.is_connected:
-            return i
-    raise BaseException('no server available')
 
 
 
-class InterfaceAncestor(threading.Thread):
+class Interface(threading.Thread):
 
-    def __init__(self, host, port, proxy=None, use_ssl=True):
-        threading.Thread.__init__(self)
-        self.daemon = True
+    def register_callback(self, update_callback):
+        with self.lock:
+            self.update_callbacks.append(update_callback)
+
+    def trigger_callbacks(self):
+        with self.lock:
+            callbacks = self.update_callbacks[:]
+        [update() for update in callbacks]
+
+
+    def init_server(self, host, port, proxy=None, use_ssl=True):
         self.host = host
         self.port = port
         self.proxy = proxy
@@ -74,13 +72,9 @@ class InterfaceAncestor(threading.Thread):
 
         #json
         self.message_id = 0
-        self.responses = Queue.Queue()
         self.unanswered_requests = {}
 
 
-    def poke(self):
-        # push a fake response so that the getting thread exits its loop
-        self.responses.put(None)
 
     def queue_json_response(self, c):
 
@@ -95,12 +89,19 @@ class InterfaceAncestor(threading.Thread):
             return
 
         if msg_id is not None:
-            method, params = self.unanswered_requests.pop(msg_id)
+            with self.lock: 
+                method, params, channel = self.unanswered_requests.pop(msg_id)
             result = c.get('result')
         else:
-            # notification
+            # notification. we should find the channel(s)..
             method = c.get('method')
             params = c.get('params')
+            with self.lock:
+                for k,v in self.subscriptions.items():
+                    if (method, params) in v:
+                        channel = k
+                else:
+                    raise
 
             if method == 'blockchain.numblocks.subscribe':
                 result = params[0]
@@ -111,32 +112,29 @@ class InterfaceAncestor(threading.Thread):
                 result = params[1]
                 params = [addr]
 
-        self.responses.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
-
-
+        response_queue = self.responses[channel]
+        response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
 
-    def subscribe(self, addresses):
-        messages = []
-        for addr in addresses:
-            messages.append(('blockchain.address.subscribe', [addr]))
-        self.send(messages)
 
 
+    def get_response(self, channel='default', block=True, timeout=10000000000):
+        return self.responses[channel].get(block, timeout)
 
+    def register_channel(self, channel):
+        with self.lock:
+            self.responses[channel] = Queue.Queue()
 
+    def poke(self, channel):
+        self.responses[channel].put(None)
 
-class HttpStratumInterface(InterfaceAncestor):
-    """ non-persistent connection. synchronous calls"""
 
-    def __init__(self, host, port, proxy=None, use_ssl=True):
-        InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
+    def init_http(self, host, port, proxy=None, use_ssl=True):
+        self.init_server(host, port, proxy, use_ssl)
         self.session_id = None
         self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
 
-    def get_history(self, address):
-        self.send([('blockchain.address.get_history', [address] )])
 
-    def run(self):
+    def run_http(self):
         self.is_connected = True
         while self.is_connected:
             try:
@@ -152,13 +150,13 @@ class HttpStratumInterface(InterfaceAncestor):
                 break
             
         self.is_connected = False
-        self.poke()
 
                 
     def poll(self):
         self.send([])
 
-    def send(self, messages):
+
+    def send_http(self, messages, channel='default'):
         import urllib2, json, time, cookielib
         
         if self.proxy:
@@ -177,7 +175,7 @@ class HttpStratumInterface(InterfaceAncestor):
             method, params = m
             if type(params) != type([]): params = [params]
             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
-            self.unanswered_requests[self.message_id] = method, params
+            self.unanswered_requests[self.message_id] = method, params, channel
             self.message_id += 1
 
         if data:
@@ -221,14 +219,9 @@ class HttpStratumInterface(InterfaceAncestor):
 
 
 
-class TcpStratumInterface(InterfaceAncestor):
-    """json-rpc over persistent TCP connection, asynchronous"""
+    def init_tcp(self, host, port, proxy=None, use_ssl=True):
+        self.init_server(host, port, proxy, use_ssl)
 
-    def __init__(self, host, port, proxy=None, use_ssl=True):
-        InterfaceAncestor.__init__(self, host, port, proxy, use_ssl)
-        self.init_socket()
-
-    def init_socket(self):
         import ssl
         global proxy_modes
         self.connection_msg = "%s:%d"%(self.host,self.port)
@@ -251,17 +244,18 @@ class TcpStratumInterface(InterfaceAncestor):
             s.settimeout(60)
             self.s = s
             self.is_connected = True
-            self.send([('server.version', [ELECTRUM_VERSION])])
         except:
             self.is_connected = False
             self.s = None
 
-    def run(self):
+
+    def run_tcp(self):
         try:
             out = ''
             while self.is_connected:
                 try: msg = self.s.recv(1024)
                 except socket.timeout:
+                    print "timeout"
                     # ping the server with server.version, as a real ping does not exist yet
                     self.send([('server.version', [ELECTRUM_VERSION])])
                     continue
@@ -283,17 +277,16 @@ class TcpStratumInterface(InterfaceAncestor):
             traceback.print_exc(file=sys.stdout)
 
         self.is_connected = False
-        print "Poking"
-        self.poke()
 
-    def send(self, messages):
+
+    def send_tcp(self, messages, channel='default'):
         """return the ids of the requests that we sent"""
         out = ''
         ids = []
         for m in messages:
             method, params = m 
             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
-            self.unanswered_requests[self.message_id] = method, params
+            self.unanswered_requests[self.message_id] = method, params, channel
             ids.append(self.message_id)
             # uncomment to debug
             # print "-->",request
@@ -304,18 +297,55 @@ class TcpStratumInterface(InterfaceAncestor):
             out = out[sent:]
         return ids
 
-    def get_history(self, addr):
-        self.send([('blockchain.address.get_history', [addr])])
 
 
-
-class Interface(TcpStratumInterface, HttpStratumInterface):
-    
-    def __init__(self, config = None):
+    def __init__(self, config=None, loop=False, servers_loaded_callback=None):
 
         if config is None:
             from simple_config import SimpleConfig
             config = SimpleConfig()
+
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.loop = loop
+        self.config = config
+        self.servers_loaded_callback = servers_loaded_callback
+
+        self.subscriptions = {}
+        self.responses = {}
+        self.responses['default'] = Queue.Queue()
+
+        self.update_callbacks = []
+        self.lock = threading.Lock()
+        self.init_interface()
+
+
+
+    def init_interface(self):
+        if self.config.get('server'):
+            self.init_with_server(self.config)
+        else:
+            print "Using random server..."
+            servers = DEFAULT_SERVERS
+            while servers:
+                server = random.choice( servers )
+                servers.remove(server)
+                self.config.set_key('server', server, False)
+                self.init_with_server(self.config)
+                if self.is_connected: break
+
+            if not servers:
+                raise BaseException('no server available')
+
+        if self.is_connected:
+            print "Connected to " + self.connection_msg
+            self.send([('server.version', [ELECTRUM_VERSION])])
+            #self.send([('server.banner',[])], 'synchronizer')
+        else:
+            print_error("Failed to connect " + self.connection_msg)
+
+
+    def init_with_server(self, config):
             
         s = config.get('server')
         host, port, protocol = s.split(':')
@@ -327,24 +357,41 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
 
         #print protocol, host, port
         if protocol in 'st':
-            TcpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='s'))
+            self.init_tcp(host, port, proxy, use_ssl=(protocol=='s'))
         elif protocol in 'gh':
-            HttpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='g'))
+            self.init_http(host, port, proxy, use_ssl=(protocol=='g'))
         else:
             raise BaseException('Unknown protocol: %s'%protocol)
 
 
-    def run(self):
-        if self.protocol  in 'st':
-            TcpStratumInterface.run(self)
-        else:
-            HttpStratumInterface.run(self)
+    def send(self, messages, channel='default'):
+
+        sub = []
+        for message in messages:
+            m, v = message
+            if m[-10:] == '.subscribe':
+                sub.append(message)
+
+        if sub:
+            with self.lock:
+                if self.subscriptions.get(channel) is None: 
+                    self.subscriptions[channel] = []
+                self.subscriptions[channel] += sub
 
-    def send(self, messages):
         if self.protocol in 'st':
-            return TcpStratumInterface.send(self, messages)
+            with self.lock:
+                out = self.send_tcp(messages, channel)
         else:
-            return HttpStratumInterface.send(self, messages)
+            # do not use lock, http is synchronous
+            out = self.send_http(messages, channel)
+
+        return out
+
+    def resend_subscriptions(self):
+        for channel, messages in self.subscriptions.items():
+            if messages:
+                self.send(messages, channel)
+
 
 
     def parse_proxy_options(self, s):
@@ -377,12 +424,30 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
             print "changing server:", server, proxy
             self.server = server
             self.proxy = proxy
+            if self.protocol in 'st':
+                self.s.shutdown(socket.SHUT_RDWR)
+                self.s.close()
             self.is_connected = False  # this exits the polling loop
-            self.poke()
 
 
-    def is_up_to_date(self):
-        return self.responses.empty() and not self.unanswered_requests
+    def is_empty(self, channel):
+        q = self.responses.get(channel)
+        if q: 
+            return q.empty()
+        else:
+            return True
+
+
+    def get_pending_requests(self, channel):
+        result = []
+        with self.lock:
+            for k, v in self.unanswered_requests.items():
+                a, b, c = v
+                if c == channel: result.append(k)
+        return result
+
+    def is_up_to_date(self, channel):
+        return self.is_empty(channel) and not self.get_pending_requests(channel)
 
 
     def synchronous_get(self, requests, timeout=100000000):
@@ -391,7 +456,7 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
         id2 = ids[:]
         res = {}
         while ids:
-            r = self.responses.get(True, timeout)
+            r = self.responses['default'].get(True, timeout)
             _id = r.get('id')
             if _id in ids:
                 ids.remove(_id)
@@ -403,130 +468,15 @@ class Interface(TcpStratumInterface, HttpStratumInterface):
 
 
 
+    def run(self):
+        while True:
+            self.run_tcp() if self.protocol in 'st' else self.run_http()
+            self.trigger_callbacks()
+            if not self.loop: break
 
-class WalletSynchronizer(threading.Thread):
-
-    def __init__(self, wallet, config, loop=False, servers_loaded_callback=None):
-        threading.Thread.__init__(self)
-        self.daemon = True
-        self.wallet = wallet
-        self.loop = loop
-        self.config = config
-        self.init_interface()
-        self.servers_loaded_callback = servers_loaded_callback
-
-    def init_interface(self):
-        if self.config.get('server'):
-            self.interface = Interface(self.config)
-        else:
-            print "Using random server..."
-            self.interface = pick_random_interface(self.config)
-
-        if self.interface.is_connected:
-            print "Connected to " + self.interface.connection_msg
-        else:
-            print_error("Failed to connect " + self.interface.connection_msg)
-
-        self.wallet.interface = self.interface
-
-    def handle_response(self, r):
-        if r is None:
-            return
-
-        method = r['method']
-        params = r['params']
-        result = r['result']
-
-        if method == 'server.banner':
-            self.wallet.banner = result
-            self.wallet.was_updated = True
-
-        elif method == 'server.peers.subscribe':
-            servers = []
-            for item in result:
-                s = []
-                host = item[1]
-                ports = []
-                version = None
-                if len(item) > 2:
-                    for v in item[2]:
-                        if re.match("[stgh]\d+", v):
-                            ports.append((v[0], v[1:]))
-                        if re.match("v(.?)+", v):
-                            version = v[1:]
-                if ports and version:
-                    servers.append((host, ports))
-            self.interface.servers = servers
-            # servers_loaded_callback is None for commands, but should
-            # NEVER be None when using the GUI.
-            if self.servers_loaded_callback is not None:
-                self.servers_loaded_callback()
-
-        elif method == 'blockchain.address.subscribe':
-            addr = params[0]
-            self.wallet.receive_status_callback(addr, result)
-                            
-        elif method == 'blockchain.address.get_history':
-            addr = params[0]
-            self.wallet.receive_history_callback(addr, result)
-            self.wallet.was_updated = True
-
-        elif method == 'blockchain.transaction.broadcast':
-            self.wallet.tx_result = result
-            self.wallet.tx_event.set()
-
-        elif method == 'blockchain.numblocks.subscribe':
-            self.wallet.blocks = result
-            self.wallet.was_updated = True
-
-        elif method == 'server.version':
-            pass
-
-        else:
-            print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
-
-
-    def start_interface(self):
-        self.interface.start()
-        if self.interface.is_connected:
-            self.wallet.start_session(self.interface)
-
+            time.sleep(5)
+            self.init_interface()
+            self.resend_subscriptions()
 
 
-    def run(self):
-        import socket, time
-        self.start_interface()
-        while True:
-            while self.interface.is_connected:
-                new_addresses = self.wallet.synchronize()
-                if new_addresses:
-                    self.interface.subscribe(new_addresses)
-
-                if self.interface.is_up_to_date():
-                    if not self.wallet.up_to_date:
-                        self.wallet.up_to_date = True
-                        self.wallet.was_updated = True
-                        self.wallet.up_to_date_event.set()
-                else:
-                    if self.wallet.up_to_date:
-                        self.wallet.up_to_date = False
-                        self.wallet.was_updated = True
-
-                if self.wallet.was_updated:
-                    self.wallet.trigger_callbacks()
-                    self.wallet.was_updated = False
-
-                response = self.interface.responses.get()
-                self.handle_response(response)
-
-            self.wallet.trigger_callbacks()
-            if self.loop:
-                time.sleep(5)
-                # Server has been changed. Copy callback for new interface.
-                self.proxy = self.interface.proxy
-                self.init_interface()
-                self.start_interface()
-                continue
-            else:
-                break
 
index 2e5cc78..464735c 100644 (file)
@@ -28,6 +28,7 @@ import threading
 import random
 import aes
 import ecdsa
+import Queue
 
 from ecdsa.util import string_to_number, number_to_string
 from util import print_error, user_dir, format_satoshis
@@ -50,7 +51,6 @@ class Wallet:
 
         self.config = config
         self.electrum_version = ELECTRUM_VERSION
-        self.update_callbacks = []
 
         # saved fields
         self.seed_version          = config.get('seed_version', SEED_VERSION)
@@ -94,16 +94,6 @@ class Wallet:
             raise ValueError("This wallet seed is deprecated. Please run upgrade.py for a diagnostic.")
 
 
-    def register_callback(self, update_callback):
-        with self.lock:
-            self.update_callbacks.append(update_callback)
-
-    def trigger_callbacks(self):
-        with self.lock:
-            callbacks = self.update_callbacks[:]
-        [update() for update in callbacks]
-
-
     def import_key(self, keypair, password):
         address, key = keypair.split(':')
         if not self.is_valid(address):
@@ -480,7 +470,8 @@ class Wallet:
             return s
 
     def get_status(self, address):
-        h = self.history.get(address)
+        with self.lock:
+            h = self.history.get(address)
         if not h:
             status = None
         else:
@@ -490,11 +481,6 @@ class Wallet:
                 status = status + ':%d'% len(h)
         return status
 
-    def receive_status_callback(self, addr, status):
-        with self.lock:
-            if self.get_status(addr) != status:
-                #print "updating status for", addr, status
-                self.interface.get_history(addr)
 
     def receive_history_callback(self, addr, data): 
         #print "updating history for", addr
@@ -504,10 +490,26 @@ class Wallet:
             self.save()
 
     def get_tx_history(self):
-        lines = self.tx_history.values()
+        with self.lock:
+            lines = self.tx_history.values()
         lines = sorted(lines, key=operator.itemgetter("timestamp"))
         return lines
 
+    def get_tx_hashes(self):
+        with self.lock:
+            hashes = self.tx_history.keys()
+        return hashes
+
+    def get_transactions_at_height(self, height):
+        with self.lock:
+            values = self.tx_history.values()[:]
+
+        out = []
+        for tx in values:
+            if tx['height'] == height:
+                out.append(tx['tx_hash'])
+        return out
+
     def update_tx_history(self):
         self.tx_history= {}
         for addr in self.all_addresses():
@@ -751,12 +753,6 @@ class Wallet:
         self.up_to_date_event.wait(10000000000)
 
 
-    def start_session(self, interface):
-        self.interface = interface
-        self.interface.send([('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
-        self.interface.subscribe(self.all_addresses())
-
-
     def freeze(self,addr):
         if addr in self.all_addresses() and addr not in self.frozen_addresses:
             self.unprioritize(addr)
@@ -816,3 +812,223 @@ class Wallet:
         for k, v in s.items():
             self.config.set_key(k,v)
         self.config.save()
+
+
+
+
+
+
+class WalletSynchronizer(threading.Thread):
+
+
+    def __init__(self, wallet, config):
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.wallet = wallet
+        self.interface = self.wallet.interface
+        self.interface.register_channel('synchronizer')
+
+
+    def synchronize_wallet(self):
+        new_addresses = self.wallet.synchronize()
+        if new_addresses:
+            self.subscribe_to_addresses(new_addresses)
+            
+        if self.interface.is_up_to_date('synchronizer'):
+            if not self.wallet.up_to_date:
+                self.wallet.up_to_date = True
+                self.wallet.was_updated = True
+                self.wallet.up_to_date_event.set()
+        else:
+            if self.wallet.up_to_date:
+                self.wallet.up_to_date = False
+                self.wallet.was_updated = True
+
+        if self.wallet.was_updated:
+            self.interface.trigger_callbacks()
+            self.wallet.was_updated = False
+
+
+    def subscribe_to_addresses(self, addresses):
+        messages = []
+        for addr in addresses:
+            messages.append(('blockchain.address.subscribe', [addr]))
+        self.interface.send( messages, 'synchronizer')
+
+
+    def run(self):
+
+        # subscriptions
+        self.interface.send([('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])], 'synchronizer')
+        self.subscribe_to_addresses(self.wallet.all_addresses())
+
+        while True:
+            # 1. send new requests
+            self.synchronize_wallet()
+
+            # 2. get a response
+            r = self.interface.get_response('synchronizer')
+            if not r: continue
+
+            # 3. handle response
+            method = r['method']
+            params = r['params']
+            result = r['result']
+
+            if method == 'blockchain.address.subscribe':
+                addr = params[0]
+                if self.wallet.get_status(addr) != result:
+                    self.interface.send([('blockchain.address.get_history', [address] )])
+                            
+            elif method == 'blockchain.address.get_history':
+                addr = params[0]
+                self.wallet.receive_history_callback(addr, result)
+                self.wallet.was_updated = True
+
+            elif method == 'blockchain.transaction.broadcast':
+                self.wallet.tx_result = result
+                self.wallet.tx_event.set()
+
+            elif method == 'blockchain.numblocks.subscribe':
+                self.wallet.blocks = result
+                self.wallet.was_updated = True
+
+            elif method == 'server.banner':
+                self.wallet.banner = result
+                self.wallet.was_updated = True
+
+            elif method == 'server.peers.subscribe':
+                servers = []
+                for item in result:
+                    s = []
+                    host = item[1]
+                    ports = []
+                    version = None
+                    if len(item) > 2:
+                        for v in item[2]:
+                            if re.match("[stgh]\d+", v):
+                                ports.append((v[0], v[1:]))
+                            if re.match("v(.?)+", v):
+                                version = v[1:]
+                    if ports and version:
+                        servers.append((host, ports))
+                self.interface.servers = servers
+
+                # servers_loaded_callback is None for commands, but should
+                # NEVER be None when using the GUI.
+                #if self.servers_loaded_callback is not None:
+                #    self.servers_loaded_callback()
+
+            elif method == 'server.version':
+                pass
+
+            else:
+                print_error("Error: Unknown message:" + method + ", " + params + ", " + result)
+
+
+encode = lambda x: x[::-1].encode('hex')
+decode = lambda x: x.decode('hex')[::-1]
+from bitcoin import Hash, rev_hex, int_to_hex
+
+class WalletVerifier(threading.Thread):
+
+    def __init__(self, wallet, config):
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.wallet = wallet
+        self.interface = self.wallet.interface
+        self.interface.register_channel('verifier')
+        self.validated = []
+        self.merkle_roots = {}
+        self.headers = {}
+        self.lock = threading.Lock()
+
+    def run(self):
+        requested = []
+
+        while True:
+            txlist = self.wallet.get_tx_hashes()
+            for tx in txlist:
+                if tx not in requested:
+                    requested.append(tx)
+                    self.request_merkle(tx)
+                    break
+            try:
+                r = self.interface.get_response('verifier',timeout=1)
+            except Queue.Empty:
+                continue
+
+            # 3. handle response
+            method = r['method']
+            params = r['params']
+            result = r['result']
+
+            if method == 'blockchain.transaction.get_merkle':
+                tx_hash = params[0]
+                tx_height = result.get('block_height')
+                self.merkle_roots[tx_hash] = self.hash_merkle_root(result['merkle'], tx_hash)
+                # if we already have the header, check merkle root directly
+                header = self.headers.get(tx_height)
+                if header:
+                    self.validated.append(tx_hash)
+                    assert header.get('merkle_root') == self.merkle_roots[tx_hash]
+                self.request_headers(tx_height) 
+
+            elif method == 'blockchain.block.get_header':
+                self.validate_header(result)
+
+
+    def request_merkle(self, tx_hash):
+        self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash]) ], 'verifier')
+        
+
+    def request_headers(self, tx_height, delta=10):
+        headers_requests = []
+        for height in range(tx_height-delta,tx_height+delta): # we might can request blocks that do not exist yet
+            if height not in self.headers:
+                headers_requests.append( ('blockchain.block.get_header',[height]) )
+        self.interface.send(headers_requests,'verifier')
+
+
+    def validate_header(self, header):
+        """ if there is a previous or a next block in the list, check the hash"""
+        height = header.get('block_height')
+        with self.lock:
+            self.headers[height] = header # detect conflicts
+            prev_header = next_header = None
+            if height-1 in self.headers:
+                prev_header = self.headers[height-1]
+            if height+1 in self.headers:
+                next_header = self.headers[height+1]
+
+        if prev_header:
+            prev_hash = self.hash_header(prev_header)
+            assert prev_hash == header.get('prev_block_hash')
+        if next_header:
+            _hash = self.hash_header(header)
+            assert _hash == next_header.get('prev_block_hash')
+            
+        # check if there are transactions at that height
+        for tx_hash in self.wallet.get_transactions_at_height(height):
+            if tx_hash in self.validated: continue
+            # check if we already have the merkle root
+            merkle_root = self.merkle_roots.get(tx_hash)
+            if merkle_root:
+                self.validated.append(tx_hash)
+                assert header.get('merkle_root') == merkle_root
+
+    def hash_header(self, res):
+        header = int_to_hex(res.get('version'),4) \
+            + rev_hex(res.get('prev_block_hash')) \
+            + rev_hex(res.get('merkle_root')) \
+            + int_to_hex(int(res.get('timestamp')),4) \
+            + int_to_hex(int(res.get('bits')),4) \
+            + int_to_hex(int(res.get('nonce')),4)
+        return rev_hex(Hash(header.decode('hex')).encode('hex'))
+
+    def hash_merkle_root(self, merkle_s, target_hash):
+        h = decode(target_hash)
+        for item in merkle_s:
+            is_left = item[0] == 'L'
+            h = Hash( h + decode(item[1:]) ) if is_left else Hash( decode(item[1:]) + h )
+        return encode(h)
index 67db84a..16aa5e7 100755 (executable)
@@ -8,7 +8,7 @@ i.send([('blockchain.numblocks.subscribe',[])])
 
 while True:
     try:
-        r = i.responses.get(True, 100000000000)
+        r = i.get_response()
     except KeyboardInterrupt:
         break
     if r.get('method') == 'blockchain.numblocks.subscribe':
index b3656ef..38ef632 100755 (executable)
@@ -2,10 +2,10 @@
 
 from electrum import Interface
 
-i = Interface({'server':'electrum.novit.ro:50001:t'})
+i = Interface({'server':'ecdsa.org:50001:t'})
 i.start()
 i.send([('server.peers.subscribe',[])])
 
 while True:
-    r = i.responses.get(True, 100000000000)
+    r = i.get_response()
     print r.get('result')
index 69c0ade..a7f4df2 100755 (executable)
@@ -5,12 +5,12 @@ import time, Queue
 
 servers = DEFAULT_SERVERS
 interfaces = map ( lambda server: Interface({'server':server} ), servers )
-results = []
 
 for i in interfaces:
     if i.is_connected:
         i.start()
         i.send([('blockchain.numblocks.subscribe',[])])
+        i.status = "timed out"
     else:
         servers.remove(i.server)
         i.status = "unreachable"
@@ -18,29 +18,25 @@ for i in interfaces:
 for i in interfaces:
     while True:
         try:
-            r = i.responses.get(True,1)
+            r = i.get_response(timeout=1)
         except Queue.Empty:
             break
 
         if r.get('method') == 'blockchain.numblocks.subscribe':
-            results.append((i.host, r.get('result')))
-            i.status = "ok"
             servers.remove(i.server)
+            i.status = "ok"
+            i.blocks = r.get('result')
             break
 
-for s in servers:
-    i.status = "timed out"
 
 from collections import defaultdict
 d = defaultdict(int)
-for e in results:
-    d[e[1]] += 1
+for i in interfaces:
+    if i.status == 'ok':
+        d[i.blocks] += 1
 v = d.values()
 numblocks = d.keys()[v.index(max(v))]
 
 for i in interfaces:
-    print i.host, i.status
-
-for s,n in results:
-    print "%30s   %d   "%(s, n), "ok" if abs(n-numblocks)<2 else "lagging"
+    print "%30s   %s   "%(i.host, i.status) #,  "ok" if abs(n-numblocks)<2 else "lagging"