optimized share weight adding, fixed runaway bitcoind request bug, made peer requests...
authorforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Mon, 18 Jul 2011 07:31:53 +0000 (07:31 +0000)
committerforrest <forrest@470744a7-cac9-478e-843e-5ec1b25c69e8>
Mon, 18 Jul 2011 07:31:53 +0000 (07:31 +0000)
git-svn-id: svn://forre.st/p2pool@1400 470744a7-cac9-478e-843e-5ec1b25c69e8

p2pool/bitcoin/data.py
p2pool/data.py
p2pool/main.py
p2pool/util/math.py
p2pool/util/skiplist.py

index e8559b3..2049e06 100644 (file)
@@ -6,7 +6,7 @@ import itertools
 import warnings
 
 from . import base58
-from p2pool.util import bases, math
+from p2pool.util import bases, math, skiplist
 
 class EarlyEnd(Exception):
     pass
@@ -415,6 +415,8 @@ class Tracker(object):
         
         self.id_generator = itertools.count()
         self.tails_by_id = {}
+        
+        self.get_nth_parent = skiplist.DistanceSkipList(self)
     
     def add(self, share):
         assert not isinstance(share, (int, long, type(None)))
@@ -646,17 +648,12 @@ class Tracker(object):
         for i in xrange(n):
             x = self.shares[item_hash].previous_hash
         return x
-    
-    def distance_up_to_branch(self, item_hash, max_dist=None):
-        while True:
-            if a:
-                pass
+
+class FakeShare(object):
+    def __init__(self, **kwargs):
+        self.__dict__.update(kwargs)
 
 if __name__ == '__main__':
-    class FakeShare(object):
-        def __init__(self, hash, previous_hash):
-            self.hash = hash
-            self.previous_hash = previous_hash
     
     t = Tracker()
     
index 54bef36..7d62fac 100644 (file)
@@ -6,9 +6,8 @@ import time
 
 from twisted.python import log
 
-from p2pool.util import math
 from p2pool.bitcoin import data as bitcoin_data
-from p2pool.util import memoize, expiring_dict
+from p2pool.util import memoize, expiring_dict, math, skiplist
 
 class CompressedList(bitcoin_data.Type):
     def __init__(self, inner):
@@ -249,15 +248,16 @@ def generate_transaction(tracker, previous_share_hash, new_script, subsidy, nonc
     
     attempts_to_block = bitcoin_data.target_to_average_attempts(block_target)
     max_weight = net.SPREAD * attempts_to_block
-    total_weight = 0
     
+    '''
     class fake_share(object):
         pass
     fake_share.new_script = new_script
     fake_share.target2 = target2
     
     dest_weights = {}
-    for i, share in enumerate(itertools.chain([fake_share], itertools.islice(tracker.get_chain_to_root(previous_share_hash), net.CHAIN_LENGTH))):
+    total_weight = 0
+    for share in itertools.chain([fake_share], itertools.islice(tracker.get_chain_to_root(previous_share_hash), net.CHAIN_LENGTH)):
         weight = bitcoin_data.target_to_average_attempts(share.target2)
         if weight > max_weight - total_weight:
             weight = max_weight - total_weight
@@ -267,6 +267,12 @@ def generate_transaction(tracker, previous_share_hash, new_script, subsidy, nonc
         
         if total_weight == max_weight:
             break
+    '''
+    
+    this_weight = min(bitcoin_data.target_to_average_attempts(target2), max_weight)
+    dest_weights = math.add_dicts([{new_script: this_weight}, tracker.get_cumulative_weights(previous_share_hash, min(tracker.get_height_and_last(previous_share_hash)[0], net.CHAIN_LENGTH), max(0, max_weight - this_weight))[0]])
+    total_weight = sum(dest_weights.itervalues())
+    #assert dest_weights == dest_weights2
     
     amounts = dict((script, subsidy*(199*weight)//(200*total_weight)) for (script, weight) in dest_weights.iteritems())
     amounts[net.SCRIPT] = amounts.get(net.SCRIPT, 0) + subsidy*1//200
@@ -301,7 +307,7 @@ class OkayTracker(bitcoin_data.Tracker):
         self.net = net
         self.verified = bitcoin_data.Tracker()
         
-        self.okay_cache = {} # hash -> height
+        self.get_cumulative_weights = skiplist.WeightsSkipList(self)
     
     def attempt_verify(self, share):
         height, last = self.get_height_and_last(share.hash)
index a813335..0aae47c 100644 (file)
@@ -102,11 +102,17 @@ def main(args):
         # information affecting work that should not trigger a long-polling update
         current_work2 = variable.Variable(None)
         
+        requested = set()
+        
         @defer.inlineCallbacks
         def set_real_work():
             work, height = yield getwork(bitcoind)
             best, desired = tracker.think(ht)
             for peer2, share_hash in desired:
+                if peer2 is None:
+                    continue
+                if (peer2.nonce, share_hash) in requested:
+                    continue
                 print 'Requesting parent share %x' % (share_hash,)
                 peer2.send_getshares(
                     hashes=[share_hash],
@@ -115,6 +121,7 @@ def main(args):
                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
                     )),
                 )
+                requested.add((peer2.nonce, share_hash))
             current_work.set(dict(
                 version=work.version,
                 previous_block=work.previous_block,
@@ -396,7 +403,7 @@ def main(args):
         
         while True:
             yield deferral.sleep(1)
-            set_real_work()
+            yield set_real_work()
     except:
         print
         print 'Fatal error:'
index d291d1e..07a631a 100644 (file)
@@ -47,3 +47,10 @@ def geometric(p):
     if p == 1:
         return 1
     return int(math.log1p(-random.random()) / math.log1p(-p)) + 1
+
+def add_dicts(dicts):
+    res = {}
+    for d in dicts:
+        for k, v in d.iteritems():
+            res[k] = res.get(k, 0) + v
+    return dict((k, v) for k, v in res.iteritems() if v)
index 8729ecd..5737fca 100644 (file)
-class SkipList(object):
-    def query(self, start, *args, **kwargs):
+from p2pool.util import math
+
+class Base(object):
+    def finalize(self, sol):
+        return sol
+
+class SkipList(Base):
+    def __init__(self):
+        self.skips = {}
+    
+    def __call__(self, start, *args, **kwargs):
         updates = {}
         pos = start
+        sol = self.initial_solution(start, args)
+        if self.judge(sol, args) == 0:
+            return self.finalize(sol)
         while True:
             if pos not in self.skips:
-                self.skips[pos] = math.geometric(.5), [self.base(pos)]
+                self.skips[pos] = math.geometric(.5), [(self.previous(pos), self.get_delta(pos))]
             skip_length, skip = self.skips[pos]
             
+            # fill previous updates
             for i in xrange(skip_length):
                 if i in updates:
-                    n_then, that_hash = updates.pop(i)
+                    that_hash, delta = updates.pop(i)
                     x, y = self.skips[that_hash]
                     assert len(y) == i
-                    y.append((n_then - n, pos))
+                    y.append((pos, delta))
             
+            # put desired skip nodes in updates
             for i in xrange(len(skip), skip_length):
-                updates[i] = n, item_hash
+                updates[i] = pos, None
             
-            if skip_length + 1 in updates:
-                updates[skip_length + 1] = self.combine(updates[skip_length + 1], updates[skip_length])
+            #if skip_length + 1 in updates:
+            #    updates[skip_length + 1] = self.combine(updates[skip_length + 1], updates[skip_length])
             
-            for delta, jump in reversed(skip):
-                sol_if = self.combine(sol, delta)
-                decision = self.judge(sol_if)
+            for jump, delta in reversed(skip):
+                sol_if = self.apply_delta(sol, delta, args)
+                decision = self.judge(sol_if, args)
+                #print pos, sol, jump, delta, sol_if, decision
                 if decision == 0:
-                    return sol_if
+                    return self.finalize(sol_if)
                 elif decision < 0:
+                    sol = sol_if
                     break
             else:
                 raise AssertionError()
+            
+            sol = sol_if
+            pos = jump
+            
+            # XXX could be better by combining updates
+            for x in updates:
+                updates[x] = updates[x][0], self.combine_deltas(updates[x][1], delta) if updates[x][1] is not None else delta
+            
         
         return item_hash
 
+class DumbSkipList(Base):
+    def __call__(self, start, *args):
+        pos = start
+        sol = self.initial_solution(start, args)
+        while True:
+            decision = self.judge(sol, args)
+            if decision > 0:
+                raise AssertionError()
+            elif decision == 0:
+                return self.finalize(sol)
+            
+            delta = self.get_delta(pos)
+            sol = self.apply_delta(sol, delta, args)
+            
+            pos = self.previous(pos)
+
 class DistanceSkipList(SkipList):
-    def combine(self, a, b):
-        return a + b
+    def __init__(self, tracker):
+        SkipList.__init__(self)
+        self.tracker = tracker
+    
+    def previous(self, element):
+        return self.tracker.shares[element].previous_hash
+    
+    def get_delta(self, element):
+        return element, 1, self.tracker.shares[element].previous_hash
+    
+    def combine_deltas(self, (from_hash1, dist1, to_hash1), (from_hash2, dist2, to_hash2)):
+        if to_hash1 != from_hash2:
+            raise AssertionError()
+        return from_hash1, dist1 + dist2, to_hash2
     
-    def base(self, element):
-        return 1, self.tracker.shares[element].previous_hash
+    def initial_solution(self, start, (n,)):
+        return 0, start
+    
+    def apply_delta(self, (dist1, to_hash1), (from_hash2, dist2, to_hash2), (n,)):
+        if to_hash1 != from_hash2:
+            raise AssertionError()
+        return dist1 + dist2, to_hash2
+    
+    def judge(self, (dist, hash), (n,)):
+        if dist > n:
+            return 1
+        elif dist == n:
+            return 0
+        else:
+            return -1
+    
+    def finalize(self, (dist, hash)):
+        return hash
 
-class WeightsList(SkipList):
+if __name__ == '__main__':
+    import random
+    from p2pool.bitcoin import data
+    t = data.Tracker()
+    d = DistanceSkipList(t)
+    for i in xrange(2000):
+        t.add(data.FakeShare(hash=i, previous_hash=i - 1 if i > 0 else None))
+    for i in xrange(2000):
+        a = random.randrange(2000)
+        b = random.randrange(a + 1)
+        res = d(a, b)
+        assert res == a - b, (a, b, res)
+
+class WeightsSkipList(SkipList):
     # share_count, weights, total_weight
-    def combine(self, (ac, a, at), (bc, b, bt)):
-        return ac + bc, dict((k, a.get(k, 0) + b.get(k, 0)) for k in set(a.keys() + b.keys())), at + bt
     
-    def base(self, element):
+    def __init__(self, tracker):
+        SkipList.__init__(self)
+        self.tracker = tracker
+    
+    def previous(self, element):
+        return self.tracker.shares[element].previous_hash
+    
+    def get_delta(self, element):
+        from p2pool.bitcoin import data as bitcoin_data
+        if element is None:
+            return (2**256, {}, 0) # XXX
         share = self.tracker.shares[element]
-        att = target_to_average_attempts(share.target2)
-        return (1, {share.new_script: att}, att), self.tracker.shares[element].previous_hash
+        att = bitcoin_data.target_to_average_attempts(share.target2)
+        return 1, {share.new_script: att}, att
     
-    def judge(self, (share_count, weights, total_weight), max_shares, desired_weight):
-        if share_count > max_shares:
+    def combine_deltas(self, (share_count1, weights1, total_weight1), (share_count2, weights2, total_weight2)):
+        return share_count1 + share_count2, dict((k, weights1.get(k, 0) + weights2.get(k, 0)) for k in set(weights1.keys() + weights2.keys())), total_weight1 + total_weight2
+    
+    def initial_solution(self, start, (max_shares, desired_weight)):
+        return 0, {}, 0 
+    
+    def combine_deltas(self, (share_count1, weights1, total_weight1), (share_count2, weights2, total_weight2)):
+        return share_count1 + share_count2, dict((k, weights1.get(k, 0) + weights2.get(k, 0)) for k in set(weights1.keys() + weights2.keys())), total_weight1 + total_weight2
+    
+    
+    def apply_delta(self, (share_count1, weights1, total_weight1), (share_count2, weights2, total_weight2), (max_shares, desired_weight)):
+        if total_weight1 + total_weight2 > desired_weight and len(weights2) == 1:
+            script, = weights2.iterkeys()
+            new_weights = dict(weights1)
+            new_weights[script] = new_weights.get(script, 0) + desired_weight - total_weight1
+            return share_count1 + share_count2, new_weights, desired_weight
+        return share_count1 + share_count2, dict((k, weights1.get(k, 0) + weights2.get(k, 0)) for k in set(weights1.keys() + weights2.keys())), total_weight1 + total_weight2
+    
+    def judge(self, (share_count, weights, total_weight), (max_shares, desired_weight)):
+        if share_count > max_shares or total_weight > desired_weight:
             return 1
+        elif share_count == max_shares or total_weight == desired_weight:
+            return 0
+        else:
+            return -1
+    
+    def finalize(self, (share_count, weights, total_weight)):
+        return weights, total_weight
+
+if __name__ == '__main__':
+    import random
+    from p2pool.bitcoin import data
+    t = data.Tracker()
+    d = WeightsSkipList(t)
+    for i in xrange(2000):
+        t.add(data.FakeShare(hash=i, previous_hash=i - 1 if i > 0 else None, new_script=i, target2=random.randrange(2**249, 2**250)))
+    for i in xrange(2000):
+        #a = random.randrange(2000)
+        a = 1999
+        print d(a, a, 1000000)[1]