cleanup, compressed p2p, bugs, finished HeightTracker
authorforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Fri, 15 Jul 2011 09:49:04 +0000 (09:49 +0000)
committerforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Fri, 15 Jul 2011 09:49:04 +0000 (09:49 +0000)
git-svn-id: svn://forre.st/p2pool@1391 470744a7-cac9-478e-843e-5ec1b25c69e8

p2pool/bitcoin/base58.py
p2pool/bitcoin/data.py
p2pool/bitcoin/p2p.py
p2pool/data.py
p2pool/main.py
p2pool/p2p.py
p2pool/util/bases.py

index c798e39..5c719b6 100644 (file)
@@ -6,5 +6,5 @@ def base58_encode(data):
     return base58_alphabet[0]*(len(data) - len(data.lstrip(chr(0)))) + bases.natural_to_string(bases.string_to_natural(data), base58_alphabet)
 
 def base58_decode(data):
-    return chr(0)*(len(data) - len(data.lstrip(base58_alphabet[0]))) + bases.natural_to_string(bases.string_to_natural(data, base58_alphabet))
+    return chr(0)*(len(data) - len(data.lstrip(base58_alphabet[0]))) + bases.natural_to_string(bases.string_to_natural(data.lstrip(base58_alphabet[0]), base58_alphabet))
 
index e93d612..9b152ae 100644 (file)
@@ -250,15 +250,15 @@ class ChecksummedType(Type):
         obj, file = self.inner.read(file)
         data = self.inner.pack(obj)
         
-        if file.read(4) != hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]:
+        checksum, file = read(file, 4)
+        if checksum != hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]:
             raise ValueError('invalid checksum')
         
         return obj, file
     
     def write(self, file, item):
         data = self.inner.pack(item)
-        file = file, data
-        return file, hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]
+        return (file, data), hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]
 
 class FloatingIntegerType(Type):
     # redundancy doesn't matter here because bitcoin checks binary bits against its own computed bits
@@ -391,6 +391,14 @@ def address_to_pubkey_hash(address, net):
         raise ValueError('address not for this net!')
     return x['pubkey_hash']
 
+# transactions
+
+def pubkey_to_script2(pubkey):
+    return ('\x41' + pubkey_type.pack(pubkey)) + '\xac'
+
+def pubkey_hash_to_script2(pubkey_hash):
+    return '\x76\xa9' + ('\x14' + ShortHashType().pack(pubkey_hash)) + '\x88\xac'
+
 # linked list tracker
 
 class Tracker(object):
index aaf46d6..9f2524b 100644 (file)
@@ -10,7 +10,7 @@ import struct
 import time
 import zlib
 
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol, reactor, task
 from twisted.python import log
 
 from . import data as bitcoin_data
@@ -29,19 +29,34 @@ class BaseProtocol(protocol.Protocol):
             command = (yield 12).rstrip('\0')
             length, = struct.unpack('<I', (yield 4))
             
+            if length > self.max_net_payload_length:
+                print "length too long"
+                continue
+            
             if self.use_checksum:
                 checksum = yield 4
             else:
                 checksum = None
             
-            payload = yield length
+            compressed_payload = yield length
             
             if self.compress:
                 try:
-                    payload = zlib.decompress(payload)
+                    d = zlib.decompressobj()
+                    payload = d.decompress(compressed_payload, self.max_payload_length)
+                    if d.unconsumed_tail:
+                        print "compressed payload expanded too much"
+                        continue
+                    assert not len(payload) > self.max_payload_length
                 except:
                     print 'FAILURE DECOMPRESSING'
+                    log.err()
                     continue
+            else:
+                if len(compressed_payload) > self.max_payload_length:
+                    print "compressed payload expanded too much"
+                    continue
+                payload = compressed_payload
             
             if checksum is not None:
                 if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum:
@@ -85,13 +100,16 @@ class BaseProtocol(protocol.Protocol):
             raise ValueError('invalid command')
         #print 'SEND', command, repr(payload2)[:500]
         payload = type_.pack(payload2)
+        if len(payload) > self.max_payload_length:
+            raise ValueError('payload too long')
         if self.use_checksum:
             checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
         else:
             checksum = ''
-        if self.compress:
-            payload = zlib.compress(payload)
-        data = self._prefix + struct.pack('<12sI', command, len(payload)) + checksum + payload
+        compressed_payload = zlib.compress(payload) if self.compress else payload
+        if len(compressed_payload) > self.max_net_payload_length:
+            raise ValueError('compressed payload too long')
+        data = self._prefix + struct.pack('<12sI', command, len(compressed_payload)) + checksum + compressed_payload
         self.transport.write(data)
     
     def __getattr__(self, attr):
@@ -108,6 +126,8 @@ class Protocol(BaseProtocol):
     
     version = 0
     
+    max_payload_length = max_net_payload_length = 1000000
+    
     compress = False
     @property
     def use_checksum(self):
@@ -300,84 +320,66 @@ class HeaderWrapper(object):
 
 class HeightTracker(object):
     '''Point this at a factory and let it take care of getting block heights'''
-    # XXX think keeps object alive
     
     def __init__(self, factory):
         self.factory = factory
         self.tracker = bitcoin_data.Tracker()
         self.most_recent = None
         
-        self.factory.new_headers.watch(self.heard_headers)
+        self._watch1 = self.factory.new_headers.watch(self.heard_headers)
+        self._watch2 = self.factory.new_block.watch(self.heard_block)
+        
+        self.requested = set()
+        self._clear_task = task.LoopingCall(self.requested.clear)
+        self._clear_task.start(60)
         
         self.think()
     
-    @defer.inlineCallbacks
     def think(self):
-        last = None
-        yield self.factory.getProtocol()
-        while True:
-            highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None
-            it = self.tracker.get_chain_known(highest_head)
-            have = []
-            step = 1
-            try:
-                cur = it.next()
-            except StopIteration:
-                cur = None
-            while True:
-                if cur is None:
-                    break
-                have.append(cur.hash)
-                for i in xrange(step): # XXX inefficient
-                    try:
-                        cur = it.next()
-                    except StopIteration:
-                        break
-                else:
-                    if len(have) > 10:
-                        step *= 2
-                    continue
+        highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None
+        height, last = self.tracker.get_height_and_last(highest_head)
+        cur = highest_head
+        cur_height = height
+        have = []
+        step = 1
+        while cur is not None:
+            have.append(cur)
+            if step > cur_height:
                 break
-            chain = list(self.tracker.get_chain_known(highest_head))
-            if chain:
-                have.append(chain[-1].hash)
-            if not have:
-                have.append(0)
-            if have == last:
-                yield deferral.sleep(1)
-                last = None
+            cur = self.tracker.get_nth_parent_hash(cur, step)
+            cur_height -= step
+            if len(have) > 10:
+                step *= 2
+        if height:
+            have.append(self.tracker.get_nth_parent_hash(highest_head, height - 1))
+        if not have:
+            have.append(0)
+        self.request(have, None)
+        
+        for tail in self.tracker.tails:
+            if tail is None:
                 continue
-            
-            last = have
-            good_tails = [x for x in self.tracker.tails if x is not None]
-            self.request(have, random.choice(good_tails) if good_tails else None)
-            for tail in self.tracker.tails:
-                if tail is None:
-                    continue
-                self.request([], tail)
-            try:
-                yield self.factory.new_headers.get_deferred(timeout=5)
-            except defer.TimeoutError:
-                pass
+            self.request([], tail)
+        for head in self.tracker.heads:
+            self.request([head], None)
     
     def heard_headers(self, headers):
-        header2s = map(HeaderWrapper, headers)
         for header2 in header2s:
-            self.tracker.add(header2)
-        if header2s:
-            if self.tracker.get_height_and_last(header2s[-1].hash)[1] is None:
-                self.most_recent = header2s[-1].hash
-                if random.random() < .6:
-                    self.request([header2s[-1].hash], None)
-        print len(self.tracker.shares)
+            self.tracker.add(HeaderWrapper(header2))
+        self.think()
     
+    def heard_block(self, block_hash):
+        self.request([], block_hash)
+    
+    @defer.inlineCallbacks
     def request(self, have, last):
-        #print 'REQ', ('[' + ', '.join(map(hex, have)) + ']', hex(last) if last is not None else None)
-        if self.factory.conn.value is not None:
-            self.factory.conn.value.send_getheaders(version=1, have=have, last=last)
+        if (tuple(have), last) in self.requested:
+            return
+        self.requested.add((tuple(have), last))
+        (yield self.factory.getProtocol()).send_getheaders(version=1, have=have, last=last)
     
     #@defer.inlineCallbacks
-    #XXX should defer
+    #XXX should defer?
     def getHeight(self, block_hash):
         height, last = self.tracker.get_height_and_last(block_hash)
         if last is not None:
@@ -393,6 +395,11 @@ class HeightTracker(object):
     
     def get_highest_height(self):
         return self.tracker.get_highest_height()
+    
+    def stop(self):
+        self.factory.new_headers.unwatch(self._watch1)
+        self.factory.new_block.unwatch(self._watch2)
+        self._clear_task.stop()
 
 if __name__ == '__main__':
     factory = ClientFactory(bitcoin_data.Mainnet)
@@ -405,9 +412,6 @@ if __name__ == '__main__':
     def think():
         while True:
             yield deferral.sleep(1)
-            try:
-                print h.getHeight(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523)
-            except Exception, e:
-                log.err()
+            print h.get_min_height(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523)
     
     reactor.run()
index 507db67..22dab1b 100644 (file)
@@ -7,6 +7,7 @@ from twisted.python import log
 
 from p2pool.util import math
 from p2pool.bitcoin import data as bitcoin_data
+from p2pool.util import memoize, expiring_dict
 
 class CompressedList(bitcoin_data.Type):
     def __init__(self, inner):
@@ -350,12 +351,9 @@ class OkayTracker(bitcoin_data.Tracker):
             head_height, last_hash = self.verified.get_height_and_last(head)
             last_height, last_last_hash = self.get_height_and_last(last_hash)
             # XXX review boundary conditions
-            want = self.net.CHAIN_LENGTH - head_height
+            want = max(self.net.CHAIN_LENGTH - head_height, 0)
             can = max(last_height - 1 - self.net.CHAIN_LENGTH, 0) if last_last_hash is not None else last_height
-            if want > can:
-                get = can
-            else:
-                get = want
+            get = min(want, can)
             #print 'Z', head_height, last_hash is None, last_height, last_last_hash is None, want, can, get
             for share in itertools.islice(self.get_chain_known(last_hash), get):
                 if not self.attempt_verify(share):
@@ -364,26 +362,29 @@ class OkayTracker(bitcoin_data.Tracker):
                 desired.add((self.verified.shares[random.choice(list(self.verified.reverse_shares[last_hash]))].peer, last_last_hash))
         
         # decide best verified head
-        def score(share_hash):
-            head_height, last = self.verified.get_height_and_last(share_hash)
-            score2 = 0
-            attempts = 0
-            max_height = 0
-            # XXX should only look past a certain share, not at recent ones
-            share2_hash = self.verified.get_nth_parent_hash(share_hash, self.net.CHAIN_LENGTH//2) if last is not None else share_hash
-            for share in itertools.islice(self.verified.get_chain_known(share2_hash), self.net.CHAIN_LENGTH):
-                max_height = max(max_height, ht.get_min_height(share.header['previous_block']))
-                attempts += bitcoin_data.target_to_average_attempts(share.target2)
-                this_score = attempts//(ht.get_highest_height() - max_height + 1)
-                #this_score = -(ht.get_highest_height() - max_height + 1)//attempts
-                if this_score > score2:
-                    score2 = this_score
-            res = (min(head_height, self.net.CHAIN_LENGTH), score2)
-            print res
-            return res
-        best = max(self.verified.heads, key=score) if self.verified.heads else None
+        best = max(self.verified.heads, key=lambda h: self.score(h, ht)) if self.verified.heads else None
         
         return best, desired
+    
+    @memoize.memoize_with_backing(expiring_dict.ExpiringDict(5, get_touches=False))
+    def score(self, share_hash, ht):
+        head_height, last = self.verified.get_height_and_last(share_hash)
+        score2 = 0
+        attempts = 0
+        max_height = 0
+        # XXX should only look past a certain share, not at recent ones
+        share2_hash = self.verified.get_nth_parent_hash(share_hash, min(self.net.CHAIN_LENGTH//2, head_height//2)) if last is not None else share_hash
+        # XXX this must go in the opposite direction for max_height to make sense
+        for share in reversed(list(itertools.islice(self.verified.get_chain_known(share2_hash), self.net.CHAIN_LENGTH))):
+            max_height = max(max_height, ht.get_min_height(share.header['previous_block']))
+            attempts += bitcoin_data.target_to_average_attempts(share.target2)
+            this_score = attempts//(ht.get_highest_height() - max_height + 1)
+            #this_score = -(ht.get_highest_height() - max_height + 1)//attempts
+            if this_score > score2:
+                score2 = this_score
+        res = (min(head_height, self.net.CHAIN_LENGTH), score2)
+        print res
+        return res
 
 
 class Mainnet(bitcoin_data.Mainnet):
index e6e098d..6811cc2 100644 (file)
@@ -25,101 +25,6 @@ try:
 except:
     __version__ = 'unknown'
 
-class Chain(object):
-    def __init__(self, chain_id_data):
-        assert False
-        self.chain_id_data = chain_id_data
-        self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
-        
-        self.share2s = {} # hash -> share2
-        self.highest = variable.Variable(None) # hash
-        
-        self.requesting = set()
-        self.request_map = {}
-    
-    def accept(self, share, net):
-        if share.chain_id_data != self.chain_id_data:
-            raise ValueError('share does not belong to this chain')
-        
-        if share.hash in self.share2s:
-            return 'dup'
-        
-        if share.previous_share_hash is None:
-            previous_height, previous_share2 = -1, None
-        elif share.previous_share_hash not in self.share2s:
-            return 'orphan'
-        else:
-            previous_share2 = self.share2s[share.previous_share_hash]
-            previous_height = previous_share2.height
-        
-        height = previous_height + 1
-        
-        share2 = share.check(self, height, previous_share2, net) # raises exceptions
-        
-        if share2.share is not share:
-            raise ValueError()
-        
-        self.share2s[share.hash] = share2
-        
-        if self.highest.value is None or height > self.share2s[self.highest.value].height:
-            self.highest.set(share.hash)
-        
-        return 'good'
-    
-    def get_highest_share2(self):
-        return self.share2s[self.highest.value] if self.highest.value is not None else None
-    
-    def get_down(self, share_hash):
-        blocks = []
-        
-        while True:
-            blocks.append(share_hash)
-            if share_hash not in self.share2s:
-                break
-            share2 = self.share2s[share_hash]
-            if share2.share.previous_share_hash is None:
-                break
-            share_hash = share2.share.previous_share_hash
-        
-        return blocks
-
-@defer.inlineCallbacks
-def get_last_p2pool_block_hash(current_block_hash, get_block, net):
-    block_hash = current_block_hash
-    while True:
-        if block_hash == net.ROOT_BLOCK:
-            defer.returnValue(block_hash)
-        try:
-            block = yield get_block(block_hash)
-        except:
-            print
-            print 'Error getting block while searching block chain:'
-            log.err()
-            print
-            continue
-        coinbase_data = block['txs'][0]['tx_ins'][0]['script']
-        try:
-            coinbase = p2pool.coinbase_type.unpack(coinbase_data)
-        except bitcoin.data.EarlyEnd:
-            pass
-        else:
-            try:
-                if coinbase['identifier'] == net.IDENTIFIER:
-                    payouts = {}
-                    for tx_out in block['txs'][0]['tx_outs']:
-                        payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
-                    subsidy = sum(payouts.itervalues())
-                    if coinbase['subsidy'] == subsidy:
-                        if payouts.get(net.SCRIPT, 0) >= subsidy//64:
-                            defer.returnValue(block_hash)
-            except Exception:
-                print
-                print 'Error matching block:'
-                print 'block:', block
-                log.err()
-                print
-        block_hash = block['header']['previous_block']
-
 @deferral.retry('Error getting work from bitcoind:', 1)
 @defer.inlineCallbacks
 def getwork(bitcoind):
@@ -133,26 +38,21 @@ def getwork(bitcoind):
         height_df.addErrback(lambda fail: None)
     defer.returnValue((getwork, height))
 
+@deferral.retry('Error getting payout script from bitcoind:', 1)
 @defer.inlineCallbacks
 def get_payout_script(factory):
-    while True:
-        try:
-            res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
-            if res['reply'] != 'success':
-                print
-                print 'Error getting payout script:'
-                print res
-                print
-                continue
-            my_script = res['script']
-        except Exception:
-            print
-            print 'Error getting payout script:'
-            log.err()
-            print
-        else:
-            defer.returnValue(my_script)
-        yield deferral.sleep(1)
+    res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
+    if res['reply'] == 'success':
+        my_script = res['script']
+    elif res['reply'] == 'denied':
+        my_script = None
+    else:
+        raise ValueError('Unexpected reply: %r' % (res,))
+
+@deferral.retry('Error creating payout script:', 10)
+@defer.inlineCallbacks
+def get_payout_script2(bitcoind, net):
+    defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
 
 @defer.inlineCallbacks
 def main(args):
@@ -174,6 +74,9 @@ def main(args):
         factory = bitcoin.p2p.ClientFactory(args.net)
         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
         my_script = yield get_payout_script(factory)
+        if my_script is None:
+            print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
+            my_script = yield get_payout_script2(bitcoind, args.net)
         print '    ...success!'
         print '    Payout script:', my_script.encode('hex')
         print
@@ -205,7 +108,7 @@ def main(args):
             best, desired = tracker.think(ht)
             for peer2, share_hash in desired:
                 print 'Requesting parent share %x' % (share_hash,)
-                peer2.send_getshares(hashes=[share_hash], parents=2000)
+                peer2.send_getshares(hashes=[share_hash], parents=2000, stops=list(set(tracker.heads) | set())
             current_work.set(dict(
                 version=work.version,
                 previous_block=work.previous_block,
@@ -269,7 +172,7 @@ def main(args):
                 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
             else:
                 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
-                peer.send_getshares(hashes=[share_hash], parents=0)
+                peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
         
         def p2p_get_to_best(chain_id_data, have, peer):
             # XXX
@@ -381,25 +284,25 @@ def main(args):
             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
         
         def got_response(data):
-            # match up with transactions
-            header = bitcoin.getwork.decode_data(data)
-            transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
-            if transactions is None:
-                print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
-                return False
-            block = dict(header=header, txs=transactions)
-            hash_ = bitcoin.data.block_header_type.hash256(block['header'])
-            if hash_ <= block['header']['target']:
-                print
-                print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
-                print
-                if factory.conn.value is not None:
-                    factory.conn.value.send_block(block=block)
-                else:
-                    print 'No bitcoind connection! Erp!'
-            share = p2pool.Share.from_block(block)
-            print 'GOT SHARE! %x' % (share.hash,)
             try:
+                # match up with transactions
+                header = bitcoin.getwork.decode_data(data)
+                transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
+                if transactions is None:
+                    print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
+                    return False
+                block = dict(header=header, txs=transactions)
+                hash_ = bitcoin.data.block_header_type.hash256(block['header'])
+                if hash_ <= block['header']['target']:
+                    print
+                    print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
+                    print
+                    if factory.conn.value is not None:
+                        factory.conn.value.send_block(block=block)
+                    else:
+                        print 'No bitcoind connection! Erp!'
+                share = p2pool.Share.from_block(block)
+                print 'GOT SHARE! %x' % (share.hash,)
                 p2p_share(share)
             except:
                 print
@@ -509,7 +412,7 @@ def run():
     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
     parser.add_argument('--version', action='version', version=__version__)
     parser.add_argument('--testnet',
-        help='use the testnet; make sure you change the ports too',
+        help='use the testnet',
         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
     
     p2pool_group = parser.add_argument_group('p2pool interface')
index 877b4de..e3d8a3d 100644 (file)
@@ -24,6 +24,8 @@ class Protocol(bitcoin_p2p.BaseProtocol):
         
         self._prefix = self.node.net.PREFIX
     
+    max_payload_length = 1000000
+    max_net_payload_length = 2000000
     use_checksum = True
     compress = True
     
index 2d043b2..66fc0e5 100644 (file)
@@ -1,25 +1,28 @@
-def natural_to_string(n, alphabet=None, min_width=0):
+def natural_to_string(n, alphabet=None):
     if alphabet is None:
         s = '%x' % (n,)
         if len(s) % 2:
             s = '0' + s
-        return s.decode('hex').lstrip('\x00').rjust(min_width, '\x00')
-    assert len(set(alphabet)) == len(alphabet)
-    res = []
-    while n:
-        n, x = divmod(n, len(alphabet))
-        res.append(alphabet[x])
-    res.reverse()
-    return ''.join(res).rjust(min_width, '\x00')
+        return s.decode('hex').lstrip('\x00')
+    else:
+        assert len(set(alphabet)) == len(alphabet)
+        res = []
+        while n:
+            n, x = divmod(n, len(alphabet))
+            res.append(alphabet[x])
+        res.reverse()
+        return ''.join(res).rjust(min_width, '\x00')
 
 def string_to_natural(s, alphabet=None):
     if alphabet is None:
-        s = s.encode('hex')
-        return int('0'+s, 16)
-    assert len(set(alphabet)) == len(alphabet)
-    if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
-        raise ValueError()
-    return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
+        #if s.startswith('\x00'):
+        #    raise ValueError()
+        return int('0' + s.encode('hex'), 16)
+    else:
+        assert len(set(alphabet)) == len(alphabet)
+        #if s.startswith(alphabet[0]):
+        #    raise ValueError()
+        return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
 
 import random