select the longest blockchain from several servers
authorthomasv <thomasv@gitorious>
Mon, 2 Sep 2013 15:49:12 +0000 (17:49 +0200)
committerthomasv <thomasv@gitorious>
Mon, 2 Sep 2013 15:49:12 +0000 (17:49 +0200)
lib/blockchain.py
lib/interface.py
lib/verifier.py

index 702c881..e642574 100644 (file)
@@ -29,79 +29,71 @@ class BlockchainVerifier(threading.Thread):
         threading.Thread.__init__(self)
         self.daemon = True
         self.config = config
-        self.interface = interface
-        self.interface.register_channel('verifier')
         self.lock = threading.Lock()
-        self.pending_headers = [] # headers that have not been verified
         self.height = 0
         self.local_height = 0
         self.running = False
         self.headers_url = 'http://headers.electrum.org/blockchain_headers'
+        self.interface = interface
+        interface.register_channel('verifier')
+        self.set_local_height()
+
+
+
+    def start_interfaces(self):
+        import interface
+        servers = interface.DEFAULT_SERVERS
+        servers = interface.filter_protocol(servers,'s')
+        print_error("using %d servers"% len(servers))
+        self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers )
+
+        for i in self.interfaces:
+            i.start()
+            # subscribe to block headers
+            i.register_channel('verifier')
+            i.register_channel('get_header')
+            i.send([ ('blockchain.headers.subscribe',[])], 'verifier')
+            # note: each interface should send its results directly to a queue, instead of channels
+            # pass the queue to the interface, so that several can share the same queue
+
+
+    def get_new_response(self):
+        # listen to interfaces, forward to verifier using the queue
+        while 1:
+            for i in self.interfaces:
+                try:
+                    r = i.get_response('verifier',timeout=0)
+                except Queue.Empty:
+                    continue
+
+                result = r.get('result')
+                if result:
+                    return (i,result)
+
+            time.sleep(1)
+
+
 
 
     def stop(self):
         with self.lock: self.running = False
-        self.interface.poke('verifier')
+        #self.interface.poke('verifier')
 
     def is_running(self):
         with self.lock: return self.running
 
-    def run(self):
 
-        self.init_headers_file()
-        self.set_local_height()
-
-        with self.lock:
-            self.running = True
-        requested_chunks = []
-        requested_headers = []
-        all_chunks = False
-        
-        # subscribe to block headers
-        self.interface.send([ ('blockchain.headers.subscribe',[])], 'verifier')
-
-        while self.is_running():
-            # request missing chunks
-            if not all_chunks and self.height and not requested_chunks:
-
-                if self.local_height + 50 < self.height:
-                    min_index = (self.local_height + 1)/2016
-                    max_index = (self.height + 1)/2016
-                    for i in range(min_index, max_index + 1):
-                        print_error( "requesting chunk", i )
-                        self.interface.send([ ('blockchain.block.get_chunk',[i])], 'verifier')
-                        requested_chunks.append(i)
-                        break
-                else:
-                    all_chunks = True
-                    print_error("downloaded all chunks")
-
-
-            # process pending headers
-            if self.pending_headers and all_chunks:
-                done = []
-                for header in self.pending_headers:
-                    if self.verify_header(header):
-                        done.append(header)
-                    else:
-                        # request previous header
-                        i = header.get('block_height') - 1
-                        if i not in requested_headers:
-                            print_error("requesting header %d"%i)
-                            self.interface.send([ ('blockchain.block.get_header',[i])], 'verifier')
-                            requested_headers.append(i)
-                        # no point continuing
-                        break
-                if done:
-                    self.interface.trigger_callback('updated')
-                    for header in done: 
-                        self.pending_headers.remove(header)
+    def request_header(self, i, h):
+        print_error("requesting header %d from %s"%(h, i.server))
+        i.send([ ('blockchain.block.get_header',[h])], 'get_header')
 
+    def retrieve_header(self, i):
+        while True:
             try:
-                r = self.interface.get_response('verifier',timeout=1)
+                r = i.get_response('get_header',timeout=1)
             except Queue.Empty:
+                print_error('timeout')
                 continue
-            if not r: continue
 
             if r.get('error'):
                 print_error('Verifier received an error:', r)
@@ -112,23 +104,66 @@ class BlockchainVerifier(threading.Thread):
             params = r['params']
             result = r['result']
 
-            if method == 'blockchain.block.get_chunk':
-                index = params[0]
-                self.verify_chunk(index, result)
-                requested_chunks.remove(index)
+            if method == 'blockchain.block.get_header':
+                return result
+                
 
-            elif method in ['blockchain.headers.subscribe', 'blockchain.block.get_header']:
+    def get_chain(self, interface, final_header):
 
-                self.pending_headers.append(result)
-                if method == 'blockchain.block.get_header':
-                    requested_headers.remove(result.get('block_height'))
-                else:
-                    self.height = result.get('block_height')
-                    ## fixme # self.interface.poke('synchronizer')
-                
-                self.pending_headers.sort(key=lambda x: x.get('block_height'))
-                # print "pending headers", map(lambda x: x.get('block_height'), self.pending_headers)
+        header = final_header
+        chain = [ final_header ]
+        requested_header = False
+        
+        while self.is_running():
+
+            if requested_header:
+                header = self.retrieve_header(interface)
+                if not header: return
+                chain = [ header ] + chain
+                requested_header = False
+
+            height = header.get('block_height')
+            previous_header = self.read_header(height -1)
+            if not previous_header:
+                self.request_header(interface, height - 1)
+                requested_header = True
+                continue
+
+            # verify that it connects to my chain
+            prev_hash = self.hash_header(previous_header)
+            if prev_hash != header.get('prev_block_hash'):
+                print_error("reorg")
+                self.request_header(interface, height - 1)
+                requested_header = True
+                continue
+
+            else:
+                # the chain is complete
+                return chain
+                    
+            
+    def verify_chain(self, chain):
+
+        first_header = chain[0]
+        prev_header = self.read_header(first_header.get('block_height') -1)
+        
+        for header in chain:
 
+            height = header.get('block_height')
+
+            prev_hash = self.hash_header(prev_header)
+            bits, target = self.get_target(height/2016)
+            _hash = self.hash_header(header)
+            try:
+                assert prev_hash == header.get('prev_block_hash')
+                assert bits == header.get('bits')
+                assert eval('0x'+_hash) < target
+            except:
+                return False
+
+            prev_header = header
+
+        return True
 
 
 
@@ -184,17 +219,8 @@ class BlockchainVerifier(threading.Thread):
         except:
             # this can be caused by a reorg.
             print_error("verify header failed"+ repr(header))
-            # undo verifications
-            with self.lock:
-                items = self.verified_tx.items()[:]
-            for tx_hash, item in items:
-                tx_height, timestamp, pos = item
-                if tx_height >= height:
-                    print_error("redoing", tx_hash)
-                    with self.lock:
-                        self.verified_tx.pop(tx_hash)
-                        if tx_hash in self.merkle_roots:
-                            self.merkle_roots.pop(tx_hash)
+            verifier.undo_verifications()
+
             # return False to request previous header.
             return False
 
@@ -272,6 +298,7 @@ class BlockchainVerifier(threading.Thread):
             h = os.path.getsize(name)/80 - 1
             if self.local_height != h:
                 self.local_height = h
+                self.height = self.local_height
 
 
     def read_header(self, block_height):
@@ -327,3 +354,59 @@ class BlockchainVerifier(threading.Thread):
 
 
 
+
+    def run(self):
+        self.start_interfaces()
+        
+        self.init_headers_file()
+        self.set_local_height()
+        print_error( "blocks:", self.local_height )
+
+        with self.lock:
+            self.running = True
+
+        while self.is_running():
+
+            i, header = self.get_new_response()
+            
+            height = header.get('block_height')
+
+            if height > self.local_height:
+                # get missing parts from interface (until it connects to my chain)
+                chain = self.get_chain( i, header )
+
+                # skip that server if the result is not consistent
+                if not chain: continue
+                
+                # verify the chain
+                if self.verify_chain( chain ):
+                    print_error("height:", height, i.server)
+                    for header in chain:
+                        self.save_header(header)
+                        self.height = height
+                else:
+                    print_error("error", i.server)
+                    # todo: dismiss that server
+
+    
+
+
+
+if __name__ == "__main__":
+    import interface, simple_config
+    
+    config = simple_config.SimpleConfig({'verbose':True})
+
+    i0 = interface.Interface()
+    i0.start()
+
+    bv = BlockchainVerifier(i0, config)
+    bv.start()
+
+
+    # listen to interfaces, forward to verifier using the queue
+    while 1:
+        time.sleep(1)
+
+
+
index 48aa295..92b3ad5 100644 (file)
@@ -332,7 +332,8 @@ class Interface(threading.Thread):
         try:
             s.connect(( self.host.encode('ascii'), int(self.port)))
         except:
-            traceback.print_exc(file=sys.stdout)
+            #traceback.print_exc(file=sys.stdout)
+            print_error("failed to connect", host, port)
             self.is_connected = False
             self.s = None
             return
index 32a6842..4cacfc2 100644 (file)
@@ -155,4 +155,14 @@ class TxVerifier(threading.Thread):
 
 
 
-
+    def undo_verifications(self, height):
+        with self.lock:
+            items = self.verified_tx.items()[:]
+        for tx_hash, item in items:
+            tx_height, timestamp, pos = item
+            if tx_height >= height:
+                print_error("redoing", tx_hash)
+                with self.lock:
+                    self.verified_tx.pop(tx_hash)
+                    if tx_hash in self.merkle_roots:
+                        self.merkle_roots.pop(tx_hash)