work
authorforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Tue, 12 Jul 2011 11:57:26 +0000 (11:57 +0000)
committerforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Tue, 12 Jul 2011 11:57:26 +0000 (11:57 +0000)
git-svn-id: svn://forre.st/p2pool@1379 470744a7-cac9-478e-843e-5ec1b25c69e8

p2pool/bitcoin/data.py
p2pool/bitcoin/p2p.py
p2pool/data.py
p2pool/main.py
p2pool/util/variable.py

index 930d27a..c06163c 100644 (file)
@@ -391,6 +391,100 @@ def address_to_pubkey_hash(address, net):
         raise ValueError('address not for this net!')
     return x['pubkey_hash']
 
+# linked list tracker
+
+class Tracker(object):
+    def __init__(self):
+        self.shares = {} # hash -> share
+        self.reverse_shares = {} # previous_hash -> set of share_hashes
+        
+        self.heads = {} # head hash -> tail_hash
+        self.tails = {} # tail hash -> set of head hashes
+        self.heights = {} # share_hash -> height_to, other_share_hash
+    
+    def add(self, share):
+        if share.hash in self.shares:
+            return # XXX raise exception?
+        
+        self.shares[share.hash] = share
+        self.reverse_shares.setdefault(share.previous_hash, set()).add(share.hash)
+        
+        if share.hash in self.tails:
+            heads = self.tails.pop(share.hash)
+        else:
+            heads = set([share.hash])
+        
+        if share.previous_hash in self.heads:
+            tail = self.heads.pop(share.previous_hash)
+        else:
+            tail = share.previous_hash
+        
+        self.tails.setdefault(tail, set()).update(heads)
+        if share.previous_hash in self.tails[tail]:
+            self.tails[tail].remove(share.previous_hash)
+        
+        for head in heads:
+            self.heads[head] = tail
+    
+    def get_height_and_last(self, share_hash):
+        orig = share_hash
+        height = 0
+        updates = []
+        while True:
+            if share_hash is None or share_hash not in self.shares:
+                break
+            updates.append((share_hash, height))
+            if share_hash in self.heights:
+                height_inc, share_hash = self.heights[share_hash]
+            else:
+                height_inc, share_hash = 1, self.shares[share_hash].previous_hash
+            height += height_inc
+        for update_hash, height_then in updates:
+            self.heights[update_hash] = height - height_then, share_hash
+        assert (height, share_hash) == self.get_height_and_last2(orig), ((height, share_hash), self.get_height_and_last2(orig))
+        return height, share_hash
+    
+    def get_height_and_last2(self, share_hash):
+        height = 0
+        while True:
+            if share_hash not in self.shares:
+                break
+            share_hash = self.shares[share_hash].previous_hash
+            height += 1
+        return height, share_hash
+    
+    def get_chain_known(self, start_hash):
+        '''
+        Chain starting with item of hash I{start_hash} of items that this Tracker contains
+        '''
+        item_hash_to_get = start_hash
+        while True:
+            if item_hash_to_get not in self.shares:
+                break
+            share = self.shares[item_hash_to_get]
+            assert not isinstance(share, long)
+            yield share
+            item_hash_to_get = share.previous_hash
+    
+    def get_chain_to_root(self, start_hash, root=None):
+        '''
+        Chain of hashes starting with share_hash of shares to the root (doesn't include root)
+        Raises an error if one is missing
+        '''
+        share_hash_to_get = start_hash
+        while share_hash_to_get != root:
+            share = self.shares[share_hash_to_get]
+            yield share
+            share_hash_to_get = share.previous_hash
+    
+    def get_best_hash(self):
+        '''
+        Returns hash of item with the most items in its chain
+        '''
+        if not self.heads:
+            return None
+        return max(self.heads, key=self.get_height_and_last)
+
 # network definitions
 
 class Mainnet(object):
index dd4ed40..d48fdf7 100644 (file)
@@ -10,7 +10,7 @@ import struct
 import time
 import traceback
 
-from twisted.internet import protocol, reactor
+from twisted.internet import defer, protocol, reactor
 
 from . import data as bitcoin_data
 from p2pool.util import variable, datachunker, deferral
@@ -230,7 +230,9 @@ class Protocol(BaseProtocol):
         ('block', bitcoin_data.block_type),
     ])
     def handle_block(self, block):
-        self.get_block.got_response(bitcoin_data.block_header_type.hash256(block['header']), block)
+        block_hash = bitcoin_data.block_header_type.hash256(block['header'])
+        self.get_block.got_response(block_hash, block)
+        self.get_block_header.got_response(block_hash, block['header'])
     
     message_headers = bitcoin_data.ComposedType([
         ('headers', bitcoin_data.ListType(bitcoin_data.block_type)),
@@ -238,8 +240,8 @@ class Protocol(BaseProtocol):
     def handle_headers(self, headers):
         for header in headers:
             header = header['header']
-            print header
             self.get_block_header.got_response(bitcoin_data.block_header_type.hash256(header), header)
+        self.factory.new_headers.happened([header['header'] for header in headers])
     
     message_reply = bitcoin_data.ComposedType([
         ('hash', bitcoin_data.HashType()),
@@ -276,6 +278,7 @@ class ClientFactory(protocol.ReconnectingClientFactory):
         
         self.new_block = variable.Event()
         self.new_tx = variable.Event()
+        self.new_headers = variable.Event()
     
     def buildProtocol(self, addr):
         p = self.protocol(self.net)
@@ -288,14 +291,112 @@ class ClientFactory(protocol.ReconnectingClientFactory):
     def getProtocol(self):
         return self.conn.get_not_none()
 
+class HeaderWrapper(object):
+    def __init__(self, header):
+        self.hash = bitcoin_data.block_header_type.hash256(header)
+        self.previous_hash = header['previous_block']
+
+class HeightTracker(object):
+    '''Point this at a factory and let it take care of getting block heights'''
+    # XXX think keeps object alive
+    
+    def __init__(self, factory):
+        self.factory = factory
+        self.tracker = bitcoin_data.Tracker()
+        self.most_recent = None
+        
+        self.factory.new_headers.watch(self.heard_headers)
+        
+        self.think()
+    
+    @defer.inlineCallbacks
+    def think(self):
+        last = None
+        yield self.factory.getProtocol()
+        while True:
+            highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None
+            it = self.tracker.get_chain_known(highest_head)
+            have = []
+            step = 1
+            try:
+                cur = it.next()
+            except StopIteration:
+                cur = None
+            while True:
+                if cur is None:
+                    break
+                have.append(cur.hash)
+                for i in xrange(step): # XXX inefficient
+                    try:
+                        cur = it.next()
+                    except StopIteration:
+                        break
+                else:
+                    if len(have) > 10:
+                        step *= 2
+                    continue
+                break
+            chain = list(self.tracker.get_chain_known(highest_head))
+            if chain:
+                have.append(chain[-1].hash)
+            if not have:
+                have.append(0)
+            if have == last:
+                yield deferral.sleep(1)
+                last = None
+                continue
+            
+            last = have
+            good_tails = [x for x in self.tracker.tails if x is not None]
+            self.request(have, random.choice(good_tails) if good_tails else None)
+            for tail in self.tracker.tails:
+                if tail is None:
+                    continue
+                self.request([], tail)
+            try:
+                yield self.factory.new_headers.get_deferred(timeout=1)
+            except defer.TimeoutError:
+                pass
+    
+    def heard_headers(self, headers):
+        header2s = map(HeaderWrapper, headers)
+        for header2 in header2s:
+            self.tracker.add(header2)
+        if header2s:
+            if self.tracker.get_height_and_last(header2s[-1].hash)[1] is None:
+                self.most_recent = header2s[-1].hash
+                if random.random() < .6:
+                    self.request([header2s[-1].hash], None)
+        print len(self.tracker.shares)
+    
+    def request(self, have, last):
+        #print "REQ", ('[' + ', '.join(map(hex, have)) + ']', hex(last) if last is not None else None)
+        if self.factory.conn.value is not None:
+            self.factory.conn.value.send_getheaders(version=1, have=have, last=last)
+    
+    #@defer.inlineCallbacks
+    #XXX should defer
+    def getHeight(self, block_hash):
+        height, last = self.tracker.get_height_and_last(block_hash)
+        if last is not None:
+            self.request([], last)
+            #raise ValueError(last)
+        return height, last
+
 if __name__ == '__main__':
-    factory = ClientFactory()
+    factory = ClientFactory(bitcoin_data.Mainnet)
     reactor.connectTCP('127.0.0.1', 8333, factory)
-
+    h = HeightTracker(factory)
+    
     @repr
     @apply
     @defer.inlineCallbacks
     def think():
-        (yield factory.getProtocol())
+        while True:
+            yield deferral.sleep(1)
+            try:
+                print h.getHeight(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523)
+            except Exception, e:
+                traceback.print_exc()
     
     reactor.run()
index b3f2f86..55a3a66 100644 (file)
@@ -11,17 +11,17 @@ class CompressedList(bitcoin_data.Type):
         self.inner = inner
     
     def read(self, file):
-        values = bitcoin_data.ListType(self.inner).read(file)
+        values, file = bitcoin_data.ListType(self.inner).read(file)
         if values != sorted(set(values)):
             raise ValueError("invalid values")
-        references = bitcoin_data.ListType(bitcoin_data.VarIntType()).read(file)
-        return [values[reference] for reference in references]
+        references, file = bitcoin_data.ListType(bitcoin_data.VarIntType()).read(file)
+        return [values[reference] for reference in references], file
     
     def write(self, file, item):
         values = sorted(set(item))
         values_map = dict((value, i) for i, value in enumerate(values))
-        bitcoin_data.ListType(self.inner).write(file, values)
-        bitcoin_data.ListType(bitcoin_data.VarIntType()).write(file, [values_map[subitem] for subitem in item])
+        file = bitcoin_data.ListType(self.inner).write(file, values)
+        return bitcoin_data.ListType(bitcoin_data.VarIntType()).write(file, [values_map[subitem] for subitem in item])
 
 
 merkle_branch_type = bitcoin_data.ListType(bitcoin_data.ComposedType([
@@ -140,7 +140,7 @@ class Share(object):
         self.new_script = self.share_info['new_script']
         self.subsidy = self.share_info['subsidy']
         
-        self.previous_share_hash = self.share_data['previous_share_hash']
+        self.previous_hash = self.previous_share_hash = self.share_data['previous_share_hash']
         self.previous_shares_hash = self.share_data['previous_shares_hash']
         self.target2 = self.share_data['target2']
         
@@ -270,94 +270,7 @@ def generate_transaction(tracker, previous_share_hash, new_script, subsidy, nonc
     )
 
 
-class Tracker(object):
-    def __init__(self):        
-        self.shares = {} # hash -> share
-        self.reverse_shares = {} # previous_share_hash -> set of share_hashes
-        
-        self.heads = {} # head hash -> tail_hash
-        self.tails = {} # tail hash -> set of head hashes
-        self.heights = {} # share_hash -> height_to, other_share_hash
-    
-    def add_share(self, share):
-        if share.hash in self.shares:
-            return # XXX raise exception?
-        
-        self.shares[share.hash] = share
-        self.reverse_shares.setdefault(share.previous_share_hash, set()).add(share.hash)
-        
-        if share.hash in self.tails:
-            heads = self.tails.pop(share.hash)
-        else:
-            heads = set([share.hash])
-        
-        if share.previous_share_hash in self.heads:
-            tail = self.heads.pop(share.previous_share_hash)
-        else:
-            tail = share.previous_share_hash
-        
-        self.tails.setdefault(tail, set()).update(heads)
-        if share.previous_share_hash in self.tails[tail]:
-            self.tails[tail].remove(share.previous_share_hash)
-        
-        for head in heads:
-            self.heads[head] = tail
-    
-    def get_height_and_last(self, share_hash):
-        orig = share_hash
-        height = 0
-        updates = []
-        while True:
-            if share_hash is None or share_hash not in self.shares:
-                break
-            updates.append((share_hash, height))
-            if share_hash in self.heights:
-                height_inc, share_hash = self.heights[share_hash]
-            else:
-                height_inc, share_hash = 1, self.shares[share_hash].previous_share_hash
-            height += height_inc
-        for update_hash, height_then in updates:
-            self.heights[update_hash] = height - height_then, share_hash
-        assert (height, share_hash) == self.get_height_and_last2(orig), ((height, share_hash), self.get_height_and_last2(orig))
-        return height, share_hash
-    
-    def get_height_and_last2(self, share_hash):
-        height = 0
-        while True:
-            if share_hash not in self.shares:
-                break
-            share_hash = self.shares[share_hash].previous_share_hash
-            height += 1
-        return height, share_hash
-    
-    def get_chain_known(self, share_hash):
-        while True:
-            if share_hash not in self.shares:
-                break
-            yield share_hash
-            share_hash = self.shares[share_hash].previous_share_hash
-    
-    def get_chain_to_root(self, start):
-        share_hash_to_get = start
-        while share_hash_to_get is not None:
-            share = self.shares[share_hash_to_get]
-            yield share
-            share_hash_to_get = share.previous_share_hash
-    
-    
-    def get_best_share_hash(self):
-        return None
-        return max(self.heads, key=self.score_chain)
-    '''
-    def score_chain(self, start):
-        length = len(self.get_chain(start))
-        
-        score = 0
-        for share in itertools.islice(self.get_chain(start), self.net.CHAIN_LENGTH):
-            score += a
-        
-        return (min(length, 1000), score)
-    '''
+
 
 if __name__ == '__main__':
     class FakeShare(object):
@@ -381,11 +294,11 @@ if __name__ == '__main__':
     for share_hash in t.shares:
         print share_hash, t.get_height_and_last(share_hash)
 
-class OkayTracker(Tracker):
+class OkayTracker(bitcoin_data.Tracker):
     def __init__(self, net):
-        Tracker.__init__(self)
+        bitcoin_data.Tracker.__init__(self)
         self.net = net
-        self.verified = Tracker()
+        self.verified = bitcoin_data.Tracker()
     """
         self.okay_cache = {} # hash -> height
     
@@ -423,12 +336,14 @@ class OkayTracker(Tracker):
         # if no successful verification because of lack of parents, request parent
         for head in self.heads:
             head_height, last = self.get_height_and_last(head)
-            if head_height < a and last is not None:
+            if head_height < self.net.CHAIN_LENGTH and last is not None:
                 # request more
+                pass
             
             for share in itertools.islice(self.get_chain_known(head), None if last is None else head_height - self.net.CHAIN_LENGTH): # XXX change length for None
-                in share in self.verified.shares:
+                if share.hash in self.verified.shares:
                     break
+                print share
                 try:
                     share.check(self, self.net)
                 except:
@@ -437,20 +352,22 @@ class OkayTracker(Tracker):
                     traceback.print_exc()
                     print
                 else:
-                    self.verified.add_share(share_hash)
+                    self.verified.add(share)
                     break
         
         # try to get at least CHAIN_LENGTH height for each verified head, requesting parents if needed
         for head in self.verified.heads:
             head_height, last = self.verified.get_height_and_last(head)
-            a
+            if head_height < self.net.CHAIN_LENGTH and last is not None:
+                desired.add(last)
         
         # decide best verified head
         def score(share_hash):
             share = self.verified.shares[share_hash]
             head_height, last = self.verified.get_height_and_last(share)
-            return (min(head_height, net.CHAIN_LENGTH), RECENTNESS)
-        best = max(self.verified.heads, key=score)
+            RECENTNESS = 0
+            return (min(head_height, self.net.CHAIN_LENGTH), RECENTNESS)
+        best = max(self.verified.heads, key=score) if self.verified.heads else None
         
         return best, desired
 
index 50db2ff..a898d20 100644 (file)
@@ -198,6 +198,8 @@ def main(args):
         @defer.inlineCallbacks
         def set_real_work():
             work, height = yield getwork(bitcoind)
+            best, desired = tracker.think()
+            # XXX desired?
             current_work.set(dict(
                 version=work.version,
                 previous_block=work.previous_block,
@@ -205,7 +207,7 @@ def main(args):
                 
                 height=height + 1,
                 
-                best_share_hash=tracker.get_best_share_hash(),
+                best_share_hash=best,
             ))
             current_work2.set(dict(
                 timestamp=work.timestamp,
@@ -236,7 +238,7 @@ def main(args):
             
             print "Received share %x" % (share.hash,)
             
-            tracker.add_share(share)
+            tracker.add(share)
             best, desired = tracker.think()
             for peer2, share_hash in desired:
                 peer2.get_shares([share_hash])
@@ -340,9 +342,7 @@ def main(args):
         # send share when the chain changes to their chain
         def work_changed(new_work):
             #print 'Work changed:', new_work
-            for share_hash in tracker.get_chain_known(new_work['best_share_hash']):
-                if share_hash is None: continue
-                share = tracker.shares[share_hash]
+            for share in tracker.get_chain_known(new_work['best_share_hash']):
                 if share.shared:
                     break
                 share_share(share)
index dfa1c3e..cdc64d7 100644 (file)
@@ -1,6 +1,7 @@
 import itertools
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python import failure
 
 class Event(object):
     def __init__(self):
@@ -31,9 +32,16 @@ class Event(object):
         for func in one_time_observers.itervalues():
             func(event)
     
-    def get_deferred(self):
+    def get_deferred(self, timeout=None):
         df = defer.Deferred()
-        self.watch_one_time(df.callback)
+        id1 = self.watch_one_time(df.callback)
+        if timeout is not None:
+            def do_timeout():
+                df.errback(failure.Failure(defer.TimeoutError()))
+                self.unwatch_one_time(id1)
+                self.unwatch_one_time(x)
+            delay = reactor.callLater(timeout, do_timeout)
+            x = self.watch_one_time(lambda value: delay.cancel())
         return df
 
 class Variable(object):