simplify interface: use callbacks
authorThomasV <thomasv@gitorious>
Thu, 12 Sep 2013 06:41:27 +0000 (08:41 +0200)
committerThomasV <thomasv@gitorious>
Thu, 12 Sep 2013 06:41:27 +0000 (08:41 +0200)
gui/gui_classic/main_window.py
gui/gui_classic/network_dialog.py
lib/__init__.py
lib/blockchain.py
lib/interface.py
lib/network.py
lib/verifier.py
lib/wallet.py

index 65d05e5..920d9c0 100644 (file)
@@ -206,7 +206,7 @@ class ElectrumWindow(QMainWindow):
         QShortcut(QKeySequence("Ctrl+PgDown"), self, lambda: tabs.setCurrentIndex( (tabs.currentIndex() + 1 )%tabs.count() ))
         
         self.connect(self, QtCore.SIGNAL('update_status'), self.update_status)
-        self.connect(self, QtCore.SIGNAL('banner_signal'), lambda: self.console.showMessage(self.wallet.interface.banner) )
+        self.connect(self, QtCore.SIGNAL('banner_signal'), lambda: self.console.showMessage(self.network.banner) )
         self.connect(self, QtCore.SIGNAL('transaction_signal'), lambda: self.notify_transactions() )
 
         self.history_list.setFocus(True)
@@ -240,7 +240,7 @@ class ElectrumWindow(QMainWindow):
         self.setWindowTitle( title )
         self.update_wallet()
         # set initial message
-        self.console.showMessage(self.wallet.interface.banner)
+        self.console.showMessage(self.network.banner)
         # Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized
         self.notify_transactions()
 
index cade6c8..37c57d3 100644 (file)
@@ -23,7 +23,7 @@ import os.path, json, ast, traceback
 
 from PyQt4.QtGui import *
 from PyQt4.QtCore import *
-from electrum.interface import DEFAULT_SERVERS, DEFAULT_PORTS
+from electrum import DEFAULT_SERVERS, DEFAULT_PORTS
 
 from qt_util import *
 
@@ -62,7 +62,7 @@ class NetworkDialog(QDialog):
             status = _("Please choose a server.") + "\n" + _("Select 'Cancel' if you are offline.")
             server = interface.server
 
-        self.servers = interface.get_servers()
+        self.servers = network.get_servers()
 
 
         vbox = QVBoxLayout()
index a56b646..5182f42 100644 (file)
@@ -3,8 +3,8 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos
 from wallet import WalletSynchronizer, WalletStorage
 from wallet_factory import WalletFactory as Wallet
 from verifier import TxVerifier
-from network import Network
-from interface import Interface, pick_random_server, DEFAULT_SERVERS
+from network import Network, DEFAULT_SERVERS, DEFAULT_PORTS
+from interface import Interface
 from simple_config import SimpleConfig
 import bitcoin
 import account
index 0aac442..94cc559 100644 (file)
@@ -308,18 +308,23 @@ class Blockchain(threading.Thread):
         return new_bits, new_target
 
 
-    def request_header(self, i, h):
+    def request_header(self, i, h, queue):
         print_error("requesting header %d from %s"%(h, i.server))
-        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
+        i.send([ ('blockchain.block.get_header',[h])], lambda i,r: queue.put((i,r)))
 
-    def retrieve_header(self, i):
+    def retrieve_header(self, i, queue):
         while True:
             try:
-                r = i.get_response('get_header',timeout=1)
+                ir = queue.get(timeout=1)
             except Queue.Empty:
                 print_error('timeout')
                 continue
 
+            if not ir: 
+                continue
+
+            i, r = ir
+
             if r.get('error'):
                 print_error('Verifier received an error:', r)
                 continue
@@ -339,11 +344,12 @@ class Blockchain(threading.Thread):
         header = final_header
         chain = [ final_header ]
         requested_header = False
-        
+        queue = Queue.Queue()
+
         while self.is_running():
 
             if requested_header:
-                header = self.retrieve_header(interface)
+                header = self.retrieve_header(interface, queue)
                 if not header: return
                 chain = [ header ] + chain
                 requested_header = False
@@ -351,7 +357,7 @@ class Blockchain(threading.Thread):
             height = header.get('block_height')
             previous_header = self.read_header(height -1)
             if not previous_header:
-                self.request_header(interface, height - 1)
+                self.request_header(interface, height - 1, queue)
                 requested_header = True
                 continue
 
@@ -359,7 +365,7 @@ class Blockchain(threading.Thread):
             prev_hash = self.hash_header(previous_header)
             if prev_hash != header.get('prev_block_hash'):
                 print_error("reorg")
-                self.request_header(interface, height - 1)
+                self.request_header(interface, height - 1, queue)
                 requested_header = True
                 continue
 
@@ -370,17 +376,18 @@ class Blockchain(threading.Thread):
 
     def get_chunks(self, i, header, height):
         requested_chunks = []
+        queue = Queue.Queue()
         min_index = (self.local_height + 1)/2016
         max_index = (height + 1)/2016
         for n in range(min_index, max_index + 1):
             print_error( "requesting chunk", n )
-            i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
+            i.send([ ('blockchain.block.get_chunk',[n])], lambda i,r:queue.put(r))
             requested_chunks.append(n)
             break
 
         while requested_chunks:
             try:
-                r = i.get_response('get_header',timeout=1)
+                r = queue.get(timeout=1)
             except Queue.Empty:
                 continue
             if not r: continue
@@ -390,14 +397,12 @@ class Blockchain(threading.Thread):
                 continue
 
             # 3. handle response
-            method = r['method']
             params = r['params']
             result = r['result']
 
-            if method == 'blockchain.block.get_chunk':
-                index = params[0]
-                self.verify_chunk(index, result)
-                requested_chunks.remove(index)
+            index = params[0]
+            self.verify_chunk(index, result)
+            requested_chunks.remove(index)
 
 
 
index de60e5c..0dd3674 100644 (file)
@@ -25,36 +25,6 @@ from util import print_error, print_msg
 
 
 DEFAULT_TIMEOUT = 5
-DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
-
-DEFAULT_SERVERS = {
-    'the9ull.homelinux.org': {'h': '8082', 't': '50001'},
-    'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.mooo.com': {'h': '8081', 't': '50001'},
-    'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'},
-    'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'},
-    'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'},
-    'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
-    'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}
-}
-
-
-
-def filter_protocol(servers, p):
-    l = []
-    for k, protocols in servers.items():
-        if p in protocols:
-            l.append( ':'.join([k, protocols[p], p]) )
-    return l
-    
-
-
 proxy_modes = ['socks4', 'socks5', 'http']
 
 
@@ -62,12 +32,9 @@ def pick_random_server():
     return random.choice( filter_protocol(DEFAULT_SERVERS,'s') )
 
 
-
-
 class Interface(threading.Thread):
 
 
-
     def init_server(self, host, port, proxy=None, use_ssl=True):
         self.host = host
         self.port = port
@@ -78,43 +45,9 @@ class Interface(threading.Thread):
         #json
         self.message_id = 0
         self.unanswered_requests = {}
-        #banner
-        self.banner = ''
         self.pending_transactions_for_notifications= []
 
 
-    def parse_servers(self, result):
-        """ parse servers list into dict format"""
-
-        servers = {}
-        for item in result:
-            host = item[1]
-            out = {}
-            version = None
-            pruning_level = '-'
-            if len(item) > 2:
-                for v in item[2]:
-                    if re.match("[stgh]\d*", v):
-                        protocol, port = v[0], v[1:]
-                        if port == '': port = DEFAULT_PORTS[protocol]
-                        out[protocol] = port
-                    elif re.match("v(.?)+", v):
-                        version = v[1:]
-                    elif re.match("p\d*", v):
-                        pruning_level = v[1:]
-                    if pruning_level == '': pruning_level = '0'
-            try: 
-                is_recent = float(version)>=float(PROTOCOL_VERSION)
-            except:
-                is_recent = False
-
-            if out and is_recent:
-                out['pruning'] = pruning_level
-                servers[host] = out
-
-        return servers
-
-
     def queue_json_response(self, c):
 
         # uncomment to debug
@@ -127,30 +60,18 @@ class Interface(threading.Thread):
             print_error("received error:", c)
             if msg_id is not None:
                 with self.lock: 
-                    method, params, channel = self.unanswered_requests.pop(msg_id)
-                response_queue = self.responses[channel]
-                response_queue.put((self,{'method':method, 'params':params, 'error':error, 'id':msg_id}))
+                    method, params, callback = self.unanswered_requests.pop(msg_id)
+                callback(self,{'method':method, 'params':params, 'error':error, 'id':msg_id})
 
             return
 
         if msg_id is not None:
             with self.lock: 
-                method, params, channel = self.unanswered_requests.pop(msg_id)
+                method, params, callback = self.unanswered_requests.pop(msg_id)
             result = c.get('result')
 
-            if method == 'server.version':
-                self.server_version = result
-
-            elif method == 'server.banner':
-                self.banner = result
-                self.network.trigger_callback('banner')
-
-            elif method == 'server.peers.subscribe':
-                self.servers = self.parse_servers(result)
-                self.network.trigger_callback('peers')
-
         else:
-            # notification: find the channel(s)
+            # notification
             method = c.get('method')
             params = c.get('params')
 
@@ -170,31 +91,19 @@ class Interface(threading.Thread):
             with self.lock:
                 for k,v in self.subscriptions.items():
                     if (method, params) in v:
-                        channel = k
+                        callback = k
                         break
                 else:
                     print_error( "received unexpected notification", method, params)
                     print_error( self.subscriptions )
                     return
-                
-        response_queue = self.responses[channel]
-        response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id}))
-
 
 
-    def get_response(self, channel='default', block=True, timeout=10000000000):
-        ir = self.responses[channel].get(block, timeout)
-        if ir:
-            return ir[1]
+        callback(self, {'method':method, 'params':params, 'result':result, 'id':msg_id})
 
-    def register_channel(self, channel, queue=None):
-        if queue is None:
-            queue = Queue.Queue()
-        with self.lock:
-            self.responses[channel] = queue
 
-    def poke(self, channel):
-        self.responses[channel].put(None)
+    def on_version(self, i, result):
+        self.server_version = result
 
 
     def init_http(self, host, port, proxy=None, use_ssl=True):
@@ -237,7 +146,7 @@ class Interface(threading.Thread):
         self.send([])
 
 
-    def send_http(self, messages, channel='default'):
+    def send_http(self, messages, callback):
         import urllib2, json, time, cookielib
         print_error( "send_http", messages )
         
@@ -257,7 +166,7 @@ class Interface(threading.Thread):
             method, params = m
             if type(params) != type([]): params = [params]
             data.append( { 'method':method, 'id':self.message_id, 'params':params } )
-            self.unanswered_requests[self.message_id] = method, params, channel
+            self.unanswered_requests[self.message_id] = method, params, callback
             self.message_id += 1
 
         if data:
@@ -359,7 +268,7 @@ class Interface(threading.Thread):
 
                 if timeout:
                     # ping the server with server.version, as a real ping does not exist yet
-                    self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
+                    self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
                     continue
 
                 out += msg
@@ -381,14 +290,14 @@ class Interface(threading.Thread):
         self.is_connected = False
 
 
-    def send_tcp(self, messages, channel='default'):
+    def send_tcp(self, messages, callback):
         """return the ids of the requests that we sent"""
         out = ''
         ids = []
         for m in messages:
             method, params = m 
             request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
-            self.unanswered_requests[self.message_id] = method, params, channel
+            self.unanswered_requests[self.message_id] = method, params, callback
             ids.append(self.message_id)
             # uncomment to debug
             # print "-->", request
@@ -413,7 +322,7 @@ class Interface(threading.Thread):
 
 
     def __init__(self, config=None):
-        self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
+        #self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
         self.proxy = None
 
         if config is None:
@@ -426,9 +335,6 @@ class Interface(threading.Thread):
         self.connect_event = threading.Event()
 
         self.subscriptions = {}
-        self.responses = {}
-        self.responses['default'] = Queue.Queue()
-
         self.lock = threading.Lock()
 
         self.servers = {} # actual list from IRC
@@ -475,7 +381,7 @@ class Interface(threading.Thread):
             raise BaseException('Unknown protocol: %s'%protocol)
 
 
-    def send(self, messages, channel='default'):
+    def send(self, messages, callback):
 
         sub = []
         for message in messages:
@@ -485,21 +391,21 @@ class Interface(threading.Thread):
 
         if sub:
             with self.lock:
-                if self.subscriptions.get(channel) is None: 
-                    self.subscriptions[channel] = []
+                if self.subscriptions.get(callback) is None: 
+                    self.subscriptions[callback] = []
                 for message in sub:
-                    if message not in self.subscriptions[channel]:
-                        self.subscriptions[channel].append(message)
+                    if message not in self.subscriptions[callback]:
+                        self.subscriptions[callback].append(message)
 
         if not self.is_connected: 
             return
 
         if self.protocol in 'st':
             with self.lock:
-                out = self.send_tcp(messages, channel)
+                out = self.send_tcp(messages, callback)
         else:
             # do not use lock, http is synchronous
-            out = self.send_http(messages, channel)
+            out = self.send_http(messages, callback)
 
         return out
 
@@ -525,6 +431,7 @@ class Interface(threading.Thread):
 
 
     def set_server(self, server, proxy=None):
+        "todo: remove this"
         # raise an error if the format isnt correct
         a,b,c = server.split(':')
         b = int(b)
@@ -540,46 +447,25 @@ class Interface(threading.Thread):
             self.is_connected = False  # this exits the polling loop
             self.trigger_callback('disconnecting') # for actively disconnecting
 
+
     def stop(self):
         if self.is_connected and self.protocol in 'st' and self.s:
             self.s.shutdown(socket.SHUT_RDWR)
             self.s.close()
 
 
-    def get_servers(self):
-        if not self.servers:
-            return DEFAULT_SERVERS
-        else:
-            return self.servers
-
-
-    def is_empty(self, channel):
-        q = self.responses.get(channel)
-        if q: 
-            return q.empty()
-        else:
-            return True
-
-
-    def get_pending_requests(self, channel):
-        result = []
-        with self.lock:
-            for k, v in self.unanswered_requests.items():
-                a, b, c = v
-                if c == channel: result.append(k)
-        return result
-
-    def is_up_to_date(self, channel):
-        return self.is_empty(channel) and not self.get_pending_requests(channel)
+    def is_up_to_date(self):
+        return self.unanswered_requests == {}
 
 
     def synchronous_get(self, requests, timeout=100000000):
         # todo: use generators, unanswered_requests should be a list of arrays...
-        ids = self.send(requests)
+        q = Queue.Queue()
+        ids = self.send(requests, lambda i,r: queue.put(r))
         id2 = ids[:]
         res = {}
         while ids:
-            r = self.responses['default'].get(True, timeout)
+            r = queue.get(True, timeout)
             _id = r.get('id')
             if _id in ids:
                 ids.remove(_id)
@@ -595,20 +481,16 @@ class Interface(threading.Thread):
         threading.Thread.start(self)
 
 
-
     def run(self):
         self.init_interface()
         if self.is_connected:
-            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
+            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
             self.change_status()
             self.run_tcp() if self.protocol in 'st' else self.run_http()
         self.change_status()
         
+
     def change_status(self):
         #print "change status", self.server, self.is_connected
         self.queue.put(self)
 
-
-
-
-
index 768df8d..c4d8cb0 100644 (file)
@@ -4,6 +4,35 @@ from bitcoin import *
 import interface
 from blockchain import Blockchain
 
+DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
+
+DEFAULT_SERVERS = {
+    'the9ull.homelinux.org': {'h': '8082', 't': '50001'},
+    'electrum.coinwallet.me': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.dynaloop.net': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.koh.ms': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.novit.ro': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.stepkrav.pw': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'ecdsa.org': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.mooo.com': {'h': '8081', 't': '50001'},
+    'electrum.bitcoins.sk': {'h': '8081', 's': '50002', 't': '50001', 'g': '8'},
+    'electrum.no-ip.org': {'h': '80', 's': '50002', 't': '50001', 'g': '443'},
+    'electrum.drollette.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'btc.it-zone.org': {'h': '80', 's': '110', 't': '50001', 'g': '443'},
+    'electrum.yacoin.com': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'},
+    'electrum.be': {'h': '8081', 's': '50002', 't': '50001', 'g': '8082'}
+}
+
+
+
+def filter_protocol(servers, p):
+    l = []
+    for k, protocols in servers.items():
+        if p in protocols:
+            l.append( ':'.join([k, protocols[p], p]) )
+    return l
+    
+
 
 class Network(threading.Thread):
 
@@ -16,8 +45,10 @@ class Network(threading.Thread):
         self.interfaces = {}
         self.queue = Queue.Queue()
         self.default_server = self.config.get('server')
-        self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
+        self.servers_list = filter_protocol(DEFAULT_SERVERS,'s')
         self.callbacks = {}
+        #banner
+        self.banner = ''
 
 
     def register_callback(self, event, callback):
@@ -45,11 +76,17 @@ class Network(threading.Thread):
         return server
 
 
+    def get_servers(self):
+        if not self.servers:
+            return DEFAULT_SERVERS
+        else:
+            return self.servers
+
+
     def start_interface(self, server):
         if server in self.interfaces.keys():
             return
         i = interface.Interface({'server':server})
-        i.network = self # fixme
         self.interfaces[server] = i
         i.start(self.queue)
 
@@ -97,13 +134,10 @@ class Network(threading.Thread):
             i = self.queue.get()
 
             if i.is_connected:
-                i.register_channel('verifier', self.blockchain.queue)
-                i.register_channel('get_header')
-                i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
-
+                i.send([ ('blockchain.headers.subscribe',[])], self.on_header)
                 if i == self.interface:
-                    i.send([('server.banner',[])])
-                    i.send([('server.peers.subscribe',[])])
+                    i.send([('server.banner',[])], self.on_banner)
+                    i.send([('server.peers.subscribe',[])], self.on_peers)
             else:
                 self.servers_list.remove(i.server)
                 self.interfaces.pop(i.server)
@@ -116,13 +150,16 @@ class Network(threading.Thread):
                     else:
                         self.trigger_callback('disconnected')
                 
+    def on_header(self, i, result):
+        self.blockchain.queue.put((i,result))
 
-    def on_peers(self, result):
-        # populate servers list here
-        pass
+    def on_peers(self, i, r):
+        self.servers = self.parse_servers(r.get('result'))
+        self.trigger_callback('peers')
 
-    def on_banner(self, result):
-        pass
+    def on_banner(self, i, r):
+        self.banner = r.get('result')
+        self.trigger_callback('banner')
 
     def stop(self):
         with self.lock: self.running = False
@@ -131,6 +168,38 @@ class Network(threading.Thread):
         with self.lock: return self.running
 
 
+    def parse_servers(self, result):
+        """ parse servers list into dict format"""
+        from version import PROTOCOL_VERSION
+        servers = {}
+        for item in result:
+            host = item[1]
+            out = {}
+            version = None
+            pruning_level = '-'
+            if len(item) > 2:
+                for v in item[2]:
+                    if re.match("[stgh]\d*", v):
+                        protocol, port = v[0], v[1:]
+                        if port == '': port = DEFAULT_PORTS[protocol]
+                        out[protocol] = port
+                    elif re.match("v(.?)+", v):
+                        version = v[1:]
+                    elif re.match("p\d*", v):
+                        pruning_level = v[1:]
+                    if pruning_level == '': pruning_level = '0'
+            try: 
+                is_recent = float(version)>=float(PROTOCOL_VERSION)
+            except:
+                is_recent = False
+
+            if out and is_recent:
+                out['pruning'] = pruning_level
+                servers[host] = out
+
+        return servers
+
+
     def resend_subscriptions(self, subscriptions):
         for channel, messages in subscriptions.items():
             if messages:
@@ -141,7 +210,7 @@ class Network(threading.Thread):
 
 if __name__ == "__main__":
     import simple_config
-    config = simple_config.SimpleConfig({'verbose':True})
+    config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'})
     network = Network(config)
     network.start()
 
index b2a9c9b..e4b8346 100644 (file)
@@ -35,11 +35,12 @@ class TxVerifier(threading.Thread):
         self.blockchain = network.blockchain
         self.interface = network.interface
         self.transactions    = {}                                 # requested verifications (with height sent by the requestor)
-        self.interface.register_channel('txverifier')
+        #self.interface.register_channel('txverifier')
         self.verified_tx     = storage.get('verified_tx3',{})      # height, timestamp of verified transactions
         self.merkle_roots    = storage.get('merkle_roots',{})      # hashed by me
         self.lock = threading.Lock()
         self.running = False
+        self.queue = Queue.Queue()
 
 
     def get_confirmations(self, tx):
@@ -107,13 +108,14 @@ class TxVerifier(threading.Thread):
                 if tx_hash not in self.verified_tx:
                     if self.merkle_roots.get(tx_hash) is None and tx_hash not in requested_merkle:
                         print_error('requesting merkle', tx_hash)
-                        self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], 'txverifier')
+                        self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r))
                         requested_merkle.append(tx_hash)
 
             try:
-                r = self.interface.get_response('txverifier',timeout=1)
+                r = self.queue.get(timeout=1)
             except Queue.Empty:
                 continue
+
             if not r: continue
 
             if r.get('error'):
index f1ca254..88e632b 100644 (file)
@@ -217,13 +217,17 @@ class Wallet:
     def set_up_to_date(self,b):
         with self.lock: self.up_to_date = b
 
+
     def is_up_to_date(self):
         with self.lock: return self.up_to_date
 
+
     def update(self):
         self.up_to_date = False
-        self.interface.poke('synchronizer')
-        while not self.is_up_to_date(): time.sleep(0.1)
+        #self.interface.poke('synchronizer')
+        while not self.is_up_to_date(): 
+            time.sleep(0.1)
+
 
     def import_key(self, sec, password):
         # check password
@@ -652,7 +656,7 @@ class Wallet:
         if value >= self.gap_limit:
             self.gap_limit = value
             self.storage.put('gap_limit', self.gap_limit, True)
-            self.interface.poke('synchronizer')
+            #self.interface.poke('synchronizer')
             return True
 
         elif value >= self.min_acceptable_gap():
@@ -1184,9 +1188,13 @@ class Wallet:
     def send_tx(self, tx):
         # asynchronous
         self.tx_event.clear()
-        self.interface.send([('blockchain.transaction.broadcast', [str(tx)])], 'synchronizer')
+        self.interface.send([('blockchain.transaction.broadcast', [str(tx)])], self.on_broadcast)
         return tx.hash()
 
+    def on_broadcast(self, i, result):
+        self.tx_result = result
+        self.tx_event.set()
+
     def receive_tx(self,tx_hash):
         out = self.tx_result 
         if out != tx_hash:
@@ -1378,15 +1386,14 @@ class WalletSynchronizer(threading.Thread):
         self.wallet = wallet
         wallet.synchronizer = self
         self.interface = self.wallet.interface
-        self.interface.register_channel('synchronizer')
         #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
         self.was_updated = True
         self.running = False
         self.lock = threading.Lock()
+        self.queue = Queue.Queue()
 
     def stop(self):
         with self.lock: self.running = False
-        self.interface.poke('synchronizer')
 
     def is_running(self):
         with self.lock: return self.running
@@ -1396,7 +1403,7 @@ class WalletSynchronizer(threading.Thread):
         messages = []
         for addr in addresses:
             messages.append(('blockchain.address.subscribe', [addr]))
-        self.interface.send( messages, 'synchronizer')
+        self.interface.send( messages, lambda i,r: self.queue.put(r))
 
 
     def run(self):
@@ -1436,26 +1443,26 @@ class WalletSynchronizer(threading.Thread):
             # request missing transactions
             for tx_hash, tx_height in missing_tx:
                 if (tx_hash, tx_height) not in requested_tx:
-                    self.interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], 'synchronizer')
+                    self.interface.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], lambda i,r: self.queue.put(r))
                     requested_tx.append( (tx_hash, tx_height) )
             missing_tx = []
 
             # detect if situation has changed
-            if not self.interface.is_up_to_date('synchronizer'):
-                if self.wallet.is_up_to_date():
-                    self.wallet.set_up_to_date(False)
-                    self.was_updated = True
-            else:
+            if self.interface.is_up_to_date() and self.queue.empty():
                 if not self.wallet.is_up_to_date():
                     self.wallet.set_up_to_date(True)
                     self.was_updated = True
+            else:
+                if self.wallet.is_up_to_date():
+                    self.wallet.set_up_to_date(False)
+                    self.was_updated = True
 
             if self.was_updated:
-                self.interface.network.trigger_callback('updated')
+                self.wallet.network.trigger_callback('updated')
                 self.was_updated = False
 
             # 2. get a response
-            r = self.interface.get_response('synchronizer')
+            r = self.queue.get(block=True, timeout=10000000000)
 
             # poke sends None. (needed during stop)
             if not r: continue
@@ -1473,7 +1480,7 @@ class WalletSynchronizer(threading.Thread):
                 addr = params[0]
                 if self.wallet.get_status(self.wallet.get_history(addr)) != result:
                     if requested_histories.get(addr) is None:
-                        self.interface.send([('blockchain.address.get_history', [addr])], 'synchronizer')
+                        self.interface.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r))
                         requested_histories[addr] = result
 
             elif method == 'blockchain.address.get_history':
@@ -1519,15 +1526,11 @@ class WalletSynchronizer(threading.Thread):
                 requested_tx.remove( (tx_hash, tx_height) )
                 print_error("received tx:", tx_hash, len(tx.raw))
 
-            elif method == 'blockchain.transaction.broadcast':
-                self.wallet.tx_result = result
-                self.wallet.tx_event.set()
-
             else:
                 print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) )
 
             if self.was_updated and not requested_tx:
-                self.interface.network.trigger_callback('updated')
-                self.interface.network.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times
+                self.wallet.network.trigger_callback('updated')
+                self.wallet.network.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times
 
                 self.was_updated = False