skip list implementation
authorforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Thu, 14 Jul 2011 12:27:50 +0000 (12:27 +0000)
committerforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Thu, 14 Jul 2011 12:27:50 +0000 (12:27 +0000)
git-svn-id: svn://forre.st/p2pool@1385 470744a7-cac9-478e-843e-5ec1b25c69e8

p2pool/bitcoin/data.py
p2pool/bitcoin/p2p.py
p2pool/data.py
p2pool/main.py
p2pool/p2p.py
p2pool/util/deferral.py
p2pool/util/math.py
p2pool/worker_interface.py

index 3351b56..f9cef35 100644 (file)
@@ -401,6 +401,7 @@ class Tracker(object):
         self.heads = {} # head hash -> tail_hash
         self.tails = {} # tail hash -> set of head hashes
         self.heights = {} # share_hash -> height_to, other_share_hash
+        self.skips = {} # share_hash -> skip list
     
     def add(self, share):
         assert not isinstance(share, (int, long, type(None)))
@@ -493,6 +494,70 @@ class Tracker(object):
     
     def get_highest_height(self):
         return max(self.get_height_and_last(head)[0] for head in self.heads) if self.heads else 0
+    
+    def get_nth_parent_hash(self, item_hash, n):
+        if n < 0:
+            raise ValueError("n must be >= 0")
+        
+        updates = {}
+        while n:
+            if item_hash not in self.skips:
+                self.skips[item_hash] = math.geometric(.5), [(1, self.shares[item_hash].previous_hash)]
+            skip_length, skip = self.skips[item_hash]
+            
+            for i in xrange(skip_length):
+                if i in updates:
+                    n_then, that_hash = updates.pop(i)
+                    x, y = self.skips[that_hash]
+                    assert len(y) == i
+                    y.append((n_then - n, item_hash))
+            
+            for i in xrange(len(skip), skip_length):
+                updates[i] = n, item_hash
+            
+            for i, (dist, then_hash) in enumerate(reversed(skip)):
+                if dist <= n:
+                    break
+            else:
+                raise AssertionError()
+            
+            n -= dist
+            item_hash = then_hash
+        
+        return item_hash
+    
+    def get_nth_parent2(self, item_hash, n):
+        x = item_hash
+        for i in xrange(n):
+            x = self.shares[item_hash].previous_hash
+        return x
+
+if __name__ == '__main__':
+    class FakeShare(object):
+        def __init__(self, hash, previous_hash):
+            self.hash = hash
+            self.previous_hash = previous_hash
+    
+    t = Tracker()
+    
+    for i in xrange(10000):
+        t.add(FakeShare(i, i - 1 if i > 0 else None))
+    
+    #for share_hash in sorted(t.shares):
+    #    print share_hash, t.get_height_and_last(share_hash)
+    
+    print t.get_nth_parent_hash(9000, 5000)
+    print t.get_nth_parent_hash(9001, 412)
+    #print t.get_nth_parent_hash(90, 51)
+    
+    for share_hash in sorted(t.shares):
+        print str(share_hash).rjust(4),
+        x = t.skips.get(share_hash, None)
+        if x is not None:
+            print str(x[0]).rjust(4),
+            for a in x[1]:
+                print str(a).rjust(10),
+        print
 
 # network definitions
 
index dab765a..19064d5 100644 (file)
@@ -8,10 +8,10 @@ import hashlib
 import random
 import struct
 import time
-import traceback
 import zlib
 
 from twisted.internet import defer, protocol, reactor
+from twisted.python import log
 
 from . import data as bitcoin_data
 from p2pool.util import variable, datachunker, deferral
@@ -59,7 +59,7 @@ class BaseProtocol(protocol.Protocol):
                 payload2 = type_.unpack(payload)
             except:
                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
-                traceback.print_exc()
+                log.err()
                 continue
             
             handler = getattr(self, 'handle_' + command, None)
@@ -74,7 +74,7 @@ class BaseProtocol(protocol.Protocol):
                 handler(**payload2)
             except:
                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
-                traceback.print_exc()
+                log.err()
                 continue
     
     def sendPacket(self, command, payload2):
@@ -408,6 +408,6 @@ if __name__ == '__main__':
             try:
                 print h.getHeight(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523)
             except Exception, e:
-                traceback.print_exc()
+                log.err()
     
     reactor.run()
index d463521..593fb0f 100644 (file)
@@ -2,7 +2,8 @@ from __future__ import division
 
 import itertools
 import random
-import traceback
+
+from twisted.python import log
 
 from p2pool.util import math
 from p2pool.bitcoin import data as bitcoin_data
@@ -180,14 +181,14 @@ class Share(object):
             if self.header['timestamp'] <= math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(self.previous_share_hash), 11)), use_float=False):
                 raise ValueError('share from too far in the past!')
         
+        # XXX use adjusted time
         if self.header['timestamp'] > time.time() + 2*60*60:
             raise ValueError('share from too far in the future!')
         
         gentx = share_info_to_gentx(self.share_info, self.header['target'], tracker, net)
         
-        if self.merkle_branch is not None:
-            if check_merkle_branch(gentx, self.merkle_branch) != self.header['merkle_root']:
-                raise ValueError("gentx doesn't match header via merkle_branch")
+        if check_merkle_branch(gentx, self.merkle_branch) != self.header['merkle_root']:
+            raise ValueError("gentx doesn't match header via merkle_branch")
         
         if self.other_txs is not None:
             if bitcoin_data.merkle_hash([gentx] + self.other_txs) != self.header['merkle_root']:
@@ -291,29 +292,6 @@ def generate_transaction(tracker, previous_share_hash, new_script, subsidy, nonc
 
 
 
-
-if __name__ == '__main__':
-    class FakeShare(object):
-        def __init__(self, hash, previous_share_hash):
-            self.hash = hash
-            self.previous_share_hash = previous_share_hash
-    
-    t = Tracker()
-    
-    t.add_share(FakeShare(1, 2))
-    print t.heads, t.tails
-    t.add_share(FakeShare(4, 0))
-    print t.heads, t.tails
-    t.add_share(FakeShare(3, 4))
-    print t.heads, t.tails
-    t.add_share(FakeShare(5, 0))
-    print t.heads, t.tails
-    t.add_share(FakeShare(0, 1))
-    print t.heads, t.tails
-    
-    for share_hash in t.shares:
-        print share_hash, t.get_height_and_last(share_hash)
-
 class OkayTracker(bitcoin_data.Tracker):
     def __init__(self, net):
         bitcoin_data.Tracker.__init__(self)
@@ -349,6 +327,9 @@ class OkayTracker(bitcoin_data.Tracker):
         return validate(share, to_end_rev[::-1])
     """
     def attempt_verify(self, share):
+        height, last = self.get_height_and_last(share.hash)
+        if height < self.net.CHAIN_LENGTH and last is not None:
+            raise AssertionError()
         if share.hash in self.verified.shares:
             return True
         try:
@@ -356,31 +337,29 @@ class OkayTracker(bitcoin_data.Tracker):
         except:
             print
             print "Share check failed:"
-            traceback.print_exc()
+            log.err()
             print
             return False
         else:
             self.verified.add(share)
             return True
+    
     def think(self, ht):
         desired = set()
         
+        # O(len(self.heads))
+        #   make 'unverified heads' set?
         # for each overall head, attempt verification
         # if it fails, attempt on parent, and repeat
         # if no successful verification because of lack of parents, request parent
         for head in self.heads:
             head_height, last = self.get_height_and_last(head)
-            if head_height < self.net.CHAIN_LENGTH and last is not None:
-                # request more
-                print 1, hex(last)
-                desired.add((self.shares[random.choice(list(self.reverse_shares[last]))].peer, last))
-                continue
             
             for share in itertools.islice(self.get_chain_known(head), None if last is None else head_height - self.net.CHAIN_LENGTH):
                 if self.attempt_verify(share):
                     break
-        
-        last = object
+            else:
+                desired.add((self.shares[random.choice(list(self.reverse_shares[last]))].peer, last))
         
         # try to get at least CHAIN_LENGTH height for each verified head, requesting parents if needed
         for head in self.verified.heads:
@@ -406,7 +385,9 @@ class OkayTracker(bitcoin_data.Tracker):
             score2 = 0
             attempts = 0
             max_height = 0
-            for share in itertools.islice(self.verified.get_chain_known(share_hash), self.net.CHAIN_LENGTH):
+            # XXX should only look past a certain share, not at recent ones
+            share2_hash = self.verified.get_nth_parent(share_hash, self.net.CHAIN_LENGTH//2)
+            for share in itertools.islice(self.verified.get_chain_known(share2_hash), self.net.CHAIN_LENGTH):
                 max_height = max(max_height, ht.get_min_height(share.header['previous_block']))
                 attempts += bitcoin_data.target_to_average_attempts(share.target2)
                 this_score = attempts//(ht.get_highest_height() - max_height + 1)
index 1ff37e4..b416220 100644 (file)
@@ -10,10 +10,10 @@ import sqlite3
 import struct
 import subprocess
 import sys
-import traceback
 
 from twisted.internet import defer, reactor
 from twisted.web import server
+from twisted.python import log
 
 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
 from util import db, expiring_dict, jsonrpc, variable, deferral, math
@@ -92,7 +92,10 @@ def get_last_p2pool_block_hash(current_block_hash, get_block, net):
         try:
             block = yield get_block(block_hash)
         except:
-            traceback.print_exc()
+            print
+            print "Error getting block while searching block chain:"
+            log.err()
+            print
             continue
         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
         try:
@@ -113,7 +116,7 @@ def get_last_p2pool_block_hash(current_block_hash, get_block, net):
                 print
                 print 'Error matching block:'
                 print 'block:', block
-                traceback.print_exc()
+                log.err()
                 print
         block_hash = block['header']['previous_block']
 
@@ -145,7 +148,7 @@ def get_payout_script(factory):
             except Exception:
                 print
                 print 'Error getting payout script:'
-                traceback.print_exc()
+                log.err()
                 print
             else:
                 defer.returnValue(my_script)
@@ -303,7 +306,10 @@ def main(args):
         try:
             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
         except:
-            traceback.print_exc()
+            print
+            print "Error resolving bootstrap node IP:"
+            log.err()
+            print
         
         p2p_node = p2p.Node(
             current_work=current_work,
@@ -392,7 +398,7 @@ def main(args):
             except:
                 print
                 print 'Error processing data received from worker:'
-                traceback.print_exc()
+                log.err()
                 print
                 return False
             else:
@@ -470,7 +476,10 @@ def main(args):
                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
             except:
-                traceback.print_exc()
+                print
+                print "Error handling tx:"
+                log.err()
+                print
         factory.new_tx.watch(new_tx)
         
         def new_block(block):
@@ -486,7 +495,7 @@ def main(args):
     except:
         print
         print 'Fatal error:'
-        traceback.print_exc()
+        log.err()
         print
         reactor.stop()
 
index 89f8879..b74b8fc 100644 (file)
@@ -2,9 +2,9 @@ from __future__ import division
 
 import random
 import time
-import traceback
 
 from twisted.internet import defer, protocol, reactor
+from twisted.python import log
 
 from p2pool.bitcoin import p2p as bitcoin_p2p
 from p2pool.bitcoin import data as bitcoin_data
@@ -334,7 +334,7 @@ class Node(object):
                         #print 'Trying to connect to', host, port
                         reactor.connectTCP(host, port, ClientFactory(self), timeout=10)
             except:
-                traceback.print_exc()
+                log.err()
             
             yield deferral.sleep(random.expovariate(1/5))
     
@@ -345,7 +345,7 @@ class Node(object):
                 if len(self.addr_store) < self.preferred_addrs and self.peers:
                     random.choice(self.peers.values()).send_getaddrs(count=8)
             except:
-                traceback.print_exc()
+                log.err()
             
             yield deferral.sleep(random.expovariate(1/20))
     
index a5c1c67..60a0add 100644 (file)
@@ -1,10 +1,9 @@
 from __future__ import division
 
 import random
-import traceback
 
 from twisted.internet import defer, reactor
-from twisted.python import failure
+from twisted.python import failure, log
 
 def sleep(t):
     d = defer.Deferred()
@@ -28,7 +27,7 @@ def retry(message, delay):
                 except:
                     print
                     print message
-                    traceback.print_exc()
+                    log.err()
                     print
                     
                     yield sleep(delay)
@@ -158,6 +157,9 @@ class DeferredCacher(object):
                 self.waiting.pop(key).callback(None)
             def eb(fail):
                 self.waiting.pop(key).callback(None)
+                print
+                print "Error when requesting noncached value:"
                 fail.printTraceback()
+                print
             self.func(key).addCallback(cb).addErrback(eb)
             raise NotNowError()
index 3137368..0442c36 100644 (file)
@@ -1,4 +1,7 @@
-from __future__ import division
+from __future__ import absolute_import, division
+
+import math
+import random
 
 def median(x, use_float=True):
     # there exist better algorithms...
@@ -37,3 +40,10 @@ def nth(i, n=0):
     for _ in xrange(n):
         i.next()
     return i.next()
+
+def geometric(p):
+    if p <= 0 or p > 1:
+        raise ValueError("p must be in the interval (0.0, 1.0]")
+    if p == 1:
+        return 1
+    return int(math.log1p(-random.random()) / math.log1p(-p)) + 1
index 9d9eb21..cf4fb67 100644 (file)
@@ -1,7 +1,6 @@
 from __future__ import division
 
 import json
-import traceback
 
 from twisted.internet import defer