separate blockchain and network
authorThomasV <thomasv@gitorious>
Sun, 8 Sep 2013 15:23:01 +0000 (17:23 +0200)
committerThomasV <thomasv@gitorious>
Sun, 8 Sep 2013 15:23:01 +0000 (17:23 +0200)
electrum
gui/gui_classic.py
lib/__init__.py
lib/blockchain.py
lib/interface.py
lib/network.py [new file with mode: 0644]
lib/wallet.py
setup.py

index 669b7ee..d123fae 100755 (executable)
--- a/electrum
+++ b/electrum
@@ -128,18 +128,14 @@ if __name__ == '__main__':
             #sys.exit("Error: Unknown GUI: " + gui_name )
         
         # network interface
-        interface = Interface(config, True)
-        interface.start(wait = False)
-        interface.send([('server.peers.subscribe',[])])
+        network = Network(config)
+        network.start()
+        #interface.send([('server.peers.subscribe',[])])
 
-        blockchain = BlockchainVerifier(interface, config)
-        blockchain.start()
-
-        gui = gui.ElectrumGui(config, interface, blockchain)
+        gui = gui.ElectrumGui(config, network)
         gui.main(url)
         
-        interface.stop()
-        blockchain.stop()
+        network.stop()
 
         # we use daemon threads, their termination is enforced.
         # this sleep command gives them time to terminate cleanly. 
index 932da61..076f97d 100644 (file)
@@ -568,8 +568,6 @@ class ElectrumWindow(QMainWindow):
             self.config.set_key('io_dir', os.path.dirname(fileName), True)
         return fileName
 
-
-
     def close(self):
         QMainWindow.close(self)
         self.run_hook('close_main_window')
@@ -1367,7 +1365,7 @@ class ElectrumWindow(QMainWindow):
         console.history = self.config.get("console-history",[])
         console.history_index = len(console.history)
 
-        console.updateNamespace({'wallet' : self.wallet, 'interface' : self.wallet.interface, 'gui':self})
+        console.updateNamespace({'wallet' : self.wallet, 'network' : self.wallet.network, 'gui':self})
         console.updateNamespace({'util' : util, 'bitcoin':bitcoin})
 
         c = commands.Commands(self.wallet, self.wallet.interface, lambda: self.console.set_json(True))
@@ -2258,10 +2256,11 @@ class OpenFileEventFilter(QObject):
 
 class ElectrumGui:
 
-    def __init__(self, config, interface, blockchain, app=None):
-        self.interface = interface
+    def __init__(self, config, network, app=None):
+        self.network = network
+        #self.interface = interface
         self.config = config
-        self.blockchain = blockchain
+        #self.blockchain = network.blockchain
         self.windows = []
         self.efilter = OpenFileEventFilter(self.windows)
         if app is None:
@@ -2281,7 +2280,7 @@ class ElectrumGui:
         else:
             wallet = Wallet(storage)
 
-        wallet.start_threads(self.interface, self.blockchain)
+        wallet.start_threads(self.network)
 
         s = Timer()
         s.start()
index f2a27fc..07f8fd7 100644 (file)
@@ -3,7 +3,7 @@ from util import format_satoshis, print_msg, print_json, print_error, set_verbos
 from wallet import WalletSynchronizer, WalletStorage
 from wallet_factory import WalletFactory as Wallet
 from verifier import TxVerifier
-from blockchain import BlockchainVerifier
+from network import Network
 from interface import Interface, pick_random_server, DEFAULT_SERVERS
 from simple_config import SimpleConfig
 import bitcoin
index 310da9f..6f495ac 100644 (file)
@@ -22,10 +22,9 @@ from util import user_dir, appdata_dir, print_error
 from bitcoin import *
 
 
-class BlockchainVerifier(threading.Thread):
-    """ Simple Payment Verification """
+class Blockchain(threading.Thread):
 
-    def __init__(self, interface, config):
+    def __init__(self, config):
         threading.Thread.__init__(self)
         self.daemon = True
         self.config = config
@@ -34,112 +33,62 @@ class BlockchainVerifier(threading.Thread):
         self.local_height = 0
         self.running = False
         self.headers_url = 'http://headers.electrum.org/blockchain_headers'
-        self.interface = interface
-        interface.register_channel('verifier')
         self.set_local_height()
+        self.queue = Queue.Queue()
 
-
-
-    def start_interfaces(self):
-        import interface
-        servers = interface.DEFAULT_SERVERS
-        servers = interface.filter_protocol(servers,'s')
-        print_error("using %d servers"% len(servers))
-        self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers )
-
-        for i in self.interfaces:
-            i.start()
-            # subscribe to block headers
-            i.register_channel('verifier')
-            i.register_channel('get_header')
-            i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
-            # note: each interface should send its results directly to a queue, instead of channels
-            # pass the queue to the interface, so that several can share the same queue
-
-
-    def get_new_response(self):
-        # listen to interfaces, forward to verifier using the queue
-        while self.is_running():
-            for i in self.interfaces:
-                try:
-                    r = i.get_response('verifier',timeout=0)
-                except Queue.Empty:
-                    continue
-
-                result = r.get('result')
-                if result:
-                    return (i,result)
-
-            time.sleep(1)
-
-
-
-
+    
     def stop(self):
         with self.lock: self.running = False
-        #self.interface.poke('verifier')
+
 
     def is_running(self):
         with self.lock: return self.running
 
 
-    def request_header(self, i, h):
-        print_error("requesting header %d from %s"%(h, i.server))
-        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
+    def run(self):
+        self.init_headers_file()
+        self.set_local_height()
+        print_error( "blocks:", self.local_height )
+
+        with self.lock:
+            self.running = True
+
+        while self.is_running():
 
-    def retrieve_header(self, i):
-        while True:
             try:
-                r = i.get_response('get_header',timeout=1)
+                i, result = self.queue.get()
             except Queue.Empty:
-                print_error('timeout')
                 continue
 
-            if r.get('error'):
-                print_error('Verifier received an error:', r)
-                continue
-
-            # 3. handle response
-            method = r['method']
-            params = r['params']
-            result = r['result']
+            header= result.get('result')
+            #print_error( i.server, header )
+            height = header.get('block_height')
 
-            if method == 'blockchain.block.get_header':
-                return result
-                
+            if height > self.local_height + 50:
+                self.get_chunks(i, header, height)
+                i.trigger_callback('updated')
 
-    def get_chain(self, interface, final_header):
+            if height > self.local_height:
+                # get missing parts from interface (until it connects to my chain)
+                chain = self.get_chain( i, header )
 
-        header = final_header
-        chain = [ final_header ]
-        requested_header = False
-        
-        while self.is_running():
+                # skip that server if the result is not consistent
+                if not chain: continue
+                
+                # verify the chain
+                if self.verify_chain( chain ):
+                    print_error("height:", height, i.server)
+                    for header in chain:
+                        self.save_header(header)
+                        self.height = height
+                else:
+                    print_error("error", i.server)
+                    # todo: dismiss that server
 
-            if requested_header:
-                header = self.retrieve_header(interface)
-                if not header: return
-                chain = [ header ] + chain
-                requested_header = False
+                i.trigger_callback('updated')
 
-            height = header.get('block_height')
-            previous_header = self.read_header(height -1)
-            if not previous_header:
-                self.request_header(interface, height - 1)
-                requested_header = True
-                continue
 
-            # verify that it connects to my chain
-            prev_hash = self.hash_header(previous_header)
-            if prev_hash != header.get('prev_block_hash'):
-                print_error("reorg")
-                self.request_header(interface, height - 1)
-                requested_header = True
-                continue
 
-            else:
-                # the chain is complete
-                return chain
                     
             
     def verify_chain(self, chain):
@@ -166,37 +115,6 @@ class BlockchainVerifier(threading.Thread):
         return True
 
 
-    def get_chunks(self, i, header, height):
-        requested_chunks = []
-        min_index = (self.local_height + 1)/2016
-        max_index = (height + 1)/2016
-        for n in range(min_index, max_index + 1):
-            print_error( "requesting chunk", n )
-            i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
-            requested_chunks.append(n)
-            break
-
-        while requested_chunks:
-            try:
-                r = i.get_response('get_header',timeout=1)
-            except Queue.Empty:
-                continue
-            if not r: continue
-
-            if r.get('error'):
-                print_error('Verifier received an error:', r)
-                continue
-
-            # 3. handle response
-            method = r['method']
-            params = r['params']
-            result = r['result']
-
-            if method == 'blockchain.block.get_chunk':
-                index = params[0]
-                self.verify_chunk(index, result)
-                requested_chunks.remove(index)
-
 
     def verify_chunk(self, index, hexdata):
         data = hexdata.decode('hex')
@@ -259,8 +177,6 @@ class BlockchainVerifier(threading.Thread):
         return True
         
 
-            
-
     def header_to_string(self, res):
         s = int_to_hex(res.get('version'),4) \
             + rev_hex(res.get('prev_block_hash')) \
@@ -383,65 +299,100 @@ class BlockchainVerifier(threading.Thread):
         return new_bits, new_target
 
 
+    def request_header(self, i, h):
+        print_error("requesting header %d from %s"%(h, i.server))
+        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
 
+    def retrieve_header(self, i):
+        while True:
+            try:
+                r = i.get_response('get_header',timeout=1)
+            except Queue.Empty:
+                print_error('timeout')
+                continue
 
-    def run(self):
-        self.start_interfaces()
-        
-        self.init_headers_file()
-        self.set_local_height()
-        print_error( "blocks:", self.local_height )
+            if r.get('error'):
+                print_error('Verifier received an error:', r)
+                continue
 
-        with self.lock:
-            self.running = True
+            # 3. handle response
+            method = r['method']
+            params = r['params']
+            result = r['result']
+
+            if method == 'blockchain.block.get_header':
+                return result
+                
+
+
+    def get_chain(self, interface, final_header):
 
+        header = final_header
+        chain = [ final_header ]
+        requested_header = False
+        
         while self.is_running():
 
-            i, header = self.get_new_response()
-            
+            if requested_header:
+                header = self.retrieve_header(interface)
+                if not header: return
+                chain = [ header ] + chain
+                requested_header = False
+
             height = header.get('block_height')
+            previous_header = self.read_header(height -1)
+            if not previous_header:
+                self.request_header(interface, height - 1)
+                requested_header = True
+                continue
 
-            if height > self.local_height + 50:
-                self.get_chunks(i, header, height)
-                self.interface.trigger_callback('updated')
+            # verify that it connects to my chain
+            prev_hash = self.hash_header(previous_header)
+            if prev_hash != header.get('prev_block_hash'):
+                print_error("reorg")
+                self.request_header(interface, height - 1)
+                requested_header = True
+                continue
 
-            if height > self.local_height:
-                # get missing parts from interface (until it connects to my chain)
-                chain = self.get_chain( i, header )
+            else:
+                # the chain is complete
+                return chain
 
-                # skip that server if the result is not consistent
-                if not chain: continue
-                
-                # verify the chain
-                if self.verify_chain( chain ):
-                    print_error("height:", height, i.server)
-                    for header in chain:
-                        self.save_header(header)
-                        self.height = height
-                else:
-                    print_error("error", i.server)
-                    # todo: dismiss that server
 
-                self.interface.trigger_callback('updated')
-    
+    def get_chunks(self, i, header, height):
+        requested_chunks = []
+        min_index = (self.local_height + 1)/2016
+        max_index = (height + 1)/2016
+        for n in range(min_index, max_index + 1):
+            print_error( "requesting chunk", n )
+            i.send([ ('blockchain.block.get_chunk',[n])], 'get_header')
+            requested_chunks.append(n)
+            break
 
+        while requested_chunks:
+            try:
+                r = i.get_response('get_header',timeout=1)
+            except Queue.Empty:
+                continue
+            if not r: continue
 
+            if r.get('error'):
+                print_error('Verifier received an error:', r)
+                continue
 
-if __name__ == "__main__":
-    import interface, simple_config
-    
-    config = simple_config.SimpleConfig({'verbose':True})
+            # 3. handle response
+            method = r['method']
+            params = r['params']
+            result = r['result']
+
+            if method == 'blockchain.block.get_chunk':
+                index = params[0]
+                self.verify_chunk(index, result)
+                requested_chunks.remove(index)
 
-    i0 = interface.Interface()
-    i0.start()
 
-    bv = BlockchainVerifier(i0, config)
-    bv.start()
 
 
-    # listen to interfaces, forward to verifier using the queue
-    while 1:
-        time.sleep(1)
 
 
 
index 26eaebf..faf165c 100644 (file)
@@ -66,18 +66,21 @@ def pick_random_server():
 
 class Interface(threading.Thread):
 
+
     def register_callback(self, event, callback):
         with self.lock:
             if not self.callbacks.get(event):
                 self.callbacks[event] = []
             self.callbacks[event].append(callback)
 
+
     def trigger_callback(self, event):
         with self.lock:
             callbacks = self.callbacks.get(event,[])[:]
         if callbacks:
             [callback() for callback in callbacks]
 
+
     def init_server(self, host, port, proxy=None, use_ssl=True):
         self.host = host
         self.port = port
@@ -188,16 +191,19 @@ class Interface(threading.Thread):
                     return
                 
         response_queue = self.responses[channel]
-        response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id})
+        response_queue.put((self, {'method':method, 'params':params, 'result':result, 'id':msg_id}))
 
 
 
     def get_response(self, channel='default', block=True, timeout=10000000000):
-        return self.responses[channel].get(block, timeout)
+        i, r = self.responses[channel].get(block, timeout)
+        return r
 
-    def register_channel(self, channel):
+    def register_channel(self, channel, queue=None):
+        if queue is None:
+            queue = Queue.Queue()
         with self.lock:
-            self.responses[channel] = Queue.Queue()
+            self.responses[channel] = queue
 
     def poke(self, channel):
         self.responses[channel].put(None)
@@ -418,7 +424,7 @@ class Interface(threading.Thread):
 
 
 
-    def __init__(self, config=None, loop=False):
+    def __init__(self, config=None):
         self.server = random.choice(filter_protocol(DEFAULT_SERVERS, 's'))
         self.proxy = None
 
@@ -428,7 +434,6 @@ class Interface(threading.Thread):
 
         threading.Thread.__init__(self)
         self.daemon = True
-        self.loop = loop
         self.config = config
         self.connect_event = threading.Event()
 
@@ -457,32 +462,11 @@ class Interface(threading.Thread):
             if self.config.get('auto_cycle') is None:
                 self.config.set_key('auto_cycle', True, False)
 
-        if not self.is_connected and self.config.get('auto_cycle'):
-            print_msg("Using random server...")
-            servers = filter_protocol(DEFAULT_SERVERS, 's')
-            while servers:
-                server = random.choice( servers )
-                servers.remove(server)
-                print server
-                self.config.set_key('server', server, False)
-                self.init_with_server(self.config)
-                if self.is_connected: break
-
-            if not self.is_connected:
-                print 'no server available'
-                self.connect_event.set() # to finish start
-                self.server = 'ecdsa.org:50001:t'
-                self.proxy = None
-                return
+        if not self.is_connected: 
+            self.connect_event.set()
+            return
 
         self.connect_event.set()
-        if self.is_connected:
-            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
-            self.send([('server.banner',[])])
-            self.trigger_callback('connected')
-        else:
-            self.trigger_callback('notconnected')
-            #print_error("Failed to connect " + self.connection_msg)
 
 
     def init_with_server(self, config):
@@ -532,12 +516,6 @@ class Interface(threading.Thread):
 
         return out
 
-    def resend_subscriptions(self):
-        for channel, messages in self.subscriptions.items():
-            if messages:
-                self.send(messages, channel)
-
-
 
     def parse_proxy_options(self, s):
         if type(s) == type({}): return s  # fixme: type should be fixed
@@ -625,26 +603,24 @@ class Interface(threading.Thread):
         return out
 
 
-    def start(self, wait=True):
+    def start(self, queue):
+        self.queue = queue
         threading.Thread.start(self)
-        if wait:
-            # wait until connection is established
-            self.connect_event.wait()
-            if not self.is_connected:
-                return False
-        return True
+
+
 
     def run(self):
-        while True:
-            self.init_interface()
-            if self.is_connected:
-                self.resend_subscriptions()
-                self.run_tcp() if self.protocol in 'st' else self.run_http()
+        self.init_interface()
+        if self.is_connected:
+            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])])
+            self.change_status()
+            self.run_tcp() if self.protocol in 'st' else self.run_http()
+        self.change_status()
+        
+    def change_status(self):
+        self.queue.put(self)
 
-            self.trigger_callback('disconnected')
 
-            if not self.loop: break
-            time.sleep(5)
 
 
 
diff --git a/lib/network.py b/lib/network.py
new file mode 100644 (file)
index 0000000..9f29317
--- /dev/null
@@ -0,0 +1,121 @@
+import interface
+from blockchain import Blockchain
+import threading, time, Queue, os, sys, shutil
+from util import user_dir, appdata_dir, print_error
+from bitcoin import *
+
+
+class Network(threading.Thread):
+
+    def __init__(self, config):
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.config = config
+        self.lock = threading.Lock()
+        self.blockchain = Blockchain(config)
+        self.interfaces = {}
+        self.queue = Queue.Queue()
+        self.default_server = self.config.get('server')
+        self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
+
+
+
+    def start_interfaces(self):
+
+        for server in self.servers_list:
+            self.interfaces[server] = interface.Interface({'server':server})
+
+        for i in self.interfaces.values():
+            i.start(self.queue)
+
+        if self.default_server:
+            self.interface = interface.Interface({'server':self.default_server})
+            self.interface.start(self.queue)
+        else:
+            self.interface = self.interfaces[0]
+
+
+
+
+
+
+    def run(self):
+        self.blockchain.start()
+        self.start_interfaces()
+
+        with self.lock:
+            self.running = True
+
+        while self.is_running():
+            i = self.queue.get()
+
+            if i.is_connected:
+                i.register_channel('verifier', self.blockchain.queue)
+                i.register_channel('get_header')
+                i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
+                if i == self.interface:
+                    i.send([('server.banner',[])])
+                    i.send([('server.peers.subscribe',[])])
+            else:
+                self.interfaces.pop(i.server)
+                if i == self.interface:
+                    if self.default_server is None:
+                        print_msg("Using random server...")
+                        server = random.choice( self.servers_list )
+                        self.interface = interface.Interface({'server':self.default_server})
+                    else:
+                        #i.trigger_callback('disconnected')
+                        pass
+
+    def on_peers(self, resut):
+        pass
+
+    def on_banner(self, result):
+        pass
+
+    def stop(self):
+        with self.lock: self.running = False
+
+    def is_running(self):
+        with self.lock: return self.running
+
+
+    def resend_subscriptions(self):
+        for channel, messages in self.subscriptions.items():
+            if messages:
+                self.send(messages, channel)
+
+
+    def auto_cycle(self):
+        if not self.is_connected and self.config.get('auto_cycle'):
+            print_msg("Using random server...")
+            servers = filter_protocol(DEFAULT_SERVERS, 's')
+            while servers:
+                server = random.choice( servers )
+                servers.remove(server)
+                print server
+                self.config.set_key('server', server, False)
+                self.init_with_server(self.config)
+                if self.is_connected: break
+
+            if not self.is_connected:
+                print 'no server available'
+                self.connect_event.set() # to finish start
+                self.server = 'ecdsa.org:50001:t'
+                self.proxy = None
+                return
+
+
+
+
+if __name__ == "__main__":
+    import simple_config
+    config = simple_config.SimpleConfig({'verbose':True})
+    network = Network(config)
+    network.start()
+
+    while 1:
+        time.sleep(1)
+
+
+
index c830fe0..ec268a1 100644 (file)
@@ -1343,10 +1343,11 @@ class Wallet:
         return True
 
 
-    def start_threads(self, interface, blockchain):
+    def start_threads(self, network):
         from verifier import TxVerifier
-        self.interface = interface
-        self.verifier = TxVerifier(interface, blockchain, self.storage)
+        self.network = network
+        self.interface = network.interface
+        self.verifier = TxVerifier(self.interface, network.blockchain, self.storage)
         self.verifier.start()
         self.set_verifier(self.verifier)
         self.synchronizer = WalletSynchronizer(self)
@@ -1370,7 +1371,7 @@ class WalletSynchronizer(threading.Thread):
         wallet.synchronizer = self
         self.interface = self.wallet.interface
         self.interface.register_channel('synchronizer')
-        self.wallet.interface.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
+        #self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
         self.was_updated = True
         self.running = False
         self.lock = threading.Lock()
index 19c6f27..41ab87c 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -53,22 +53,23 @@ setup(name = "Electrum",
     package_dir = {'electrum': 'lib', 'electrum_gui': 'gui', 'electrum_plugins':'plugins'},
     scripts= ['electrum'],
     data_files = data_files,
-    py_modules = ['electrum.version',
-                  'electrum.wallet',
-                  'electrum.wallet_bitkey',
-                  'electrum.wallet_factory',
-                  'electrum.interface',
+    py_modules = ['electrum.account',
+                  'electrum.bitcoin',
                   'electrum.blockchain',
                   'electrum.commands',
+                  'electrum.interface',
                   'electrum.mnemonic',
+                  'electrum.msqr',
+                  'electrum.network',
                   'electrum.simple_config',
                   'electrum.socks',
-                  'electrum.msqr',
-                  'electrum.util',
-                  'electrum.account',
-                  'electrum.bitcoin',
                   'electrum.transaction',
+                  'electrum.util',
+                  'electrum.version',
                   'electrum.verifier',
+                  'electrum.wallet',
+                  'electrum.wallet_bitkey',
+                  'electrum.wallet_factory',
                   'electrum_gui.gui_gtk',
                   'electrum_gui.qt_console',
                   'electrum_gui.gui_classic',