From 21071bb3c74a4f5feeffec438f311467ae4747f9 Mon Sep 17 00:00:00 2001 From: forrest Date: Tue, 12 Jul 2011 11:57:26 +0000 Subject: [PATCH] work git-svn-id: svn://forre.st/p2pool@1379 470744a7-cac9-478e-843e-5ec1b25c69e8 --- p2pool/bitcoin/data.py | 94 ++++++++++++++++++++++++++++++++++++ p2pool/bitcoin/p2p.py | 113 ++++++++++++++++++++++++++++++++++++++++-- p2pool/data.py | 123 ++++++++--------------------------------------- p2pool/main.py | 10 ++-- p2pool/util/variable.py | 14 ++++- 5 files changed, 237 insertions(+), 117 deletions(-) diff --git a/p2pool/bitcoin/data.py b/p2pool/bitcoin/data.py index 930d27a..c06163c 100644 --- a/p2pool/bitcoin/data.py +++ b/p2pool/bitcoin/data.py @@ -391,6 +391,100 @@ def address_to_pubkey_hash(address, net): raise ValueError('address not for this net!') return x['pubkey_hash'] +# linked list tracker + +class Tracker(object): + def __init__(self): + self.shares = {} # hash -> share + self.reverse_shares = {} # previous_hash -> set of share_hashes + + self.heads = {} # head hash -> tail_hash + self.tails = {} # tail hash -> set of head hashes + self.heights = {} # share_hash -> height_to, other_share_hash + + def add(self, share): + if share.hash in self.shares: + return # XXX raise exception? + + self.shares[share.hash] = share + self.reverse_shares.setdefault(share.previous_hash, set()).add(share.hash) + + if share.hash in self.tails: + heads = self.tails.pop(share.hash) + else: + heads = set([share.hash]) + + if share.previous_hash in self.heads: + tail = self.heads.pop(share.previous_hash) + else: + tail = share.previous_hash + + self.tails.setdefault(tail, set()).update(heads) + if share.previous_hash in self.tails[tail]: + self.tails[tail].remove(share.previous_hash) + + for head in heads: + self.heads[head] = tail + + def get_height_and_last(self, share_hash): + orig = share_hash + height = 0 + updates = [] + while True: + if share_hash is None or share_hash not in self.shares: + break + updates.append((share_hash, height)) + if share_hash in self.heights: + height_inc, share_hash = self.heights[share_hash] + else: + height_inc, share_hash = 1, self.shares[share_hash].previous_hash + height += height_inc + for update_hash, height_then in updates: + self.heights[update_hash] = height - height_then, share_hash + assert (height, share_hash) == self.get_height_and_last2(orig), ((height, share_hash), self.get_height_and_last2(orig)) + return height, share_hash + + def get_height_and_last2(self, share_hash): + height = 0 + while True: + if share_hash not in self.shares: + break + share_hash = self.shares[share_hash].previous_hash + height += 1 + return height, share_hash + + def get_chain_known(self, start_hash): + ''' + Chain starting with item of hash I{start_hash} of items that this Tracker contains + ''' + item_hash_to_get = start_hash + while True: + if item_hash_to_get not in self.shares: + break + share = self.shares[item_hash_to_get] + assert not isinstance(share, long) + yield share + item_hash_to_get = share.previous_hash + + def get_chain_to_root(self, start_hash, root=None): + ''' + Chain of hashes starting with share_hash of shares to the root (doesn't include root) + Raises an error if one is missing + ''' + share_hash_to_get = start_hash + while share_hash_to_get != root: + share = self.shares[share_hash_to_get] + yield share + share_hash_to_get = share.previous_hash + + def get_best_hash(self): + ''' + Returns hash of item with the most items in its chain + ''' + if not self.heads: + return None + return max(self.heads, key=self.get_height_and_last) + # network definitions class Mainnet(object): diff --git a/p2pool/bitcoin/p2p.py b/p2pool/bitcoin/p2p.py index dd4ed40..d48fdf7 100644 --- a/p2pool/bitcoin/p2p.py +++ b/p2pool/bitcoin/p2p.py @@ -10,7 +10,7 @@ import struct import time import traceback -from twisted.internet import protocol, reactor +from twisted.internet import defer, protocol, reactor from . import data as bitcoin_data from p2pool.util import variable, datachunker, deferral @@ -230,7 +230,9 @@ class Protocol(BaseProtocol): ('block', bitcoin_data.block_type), ]) def handle_block(self, block): - self.get_block.got_response(bitcoin_data.block_header_type.hash256(block['header']), block) + block_hash = bitcoin_data.block_header_type.hash256(block['header']) + self.get_block.got_response(block_hash, block) + self.get_block_header.got_response(block_hash, block['header']) message_headers = bitcoin_data.ComposedType([ ('headers', bitcoin_data.ListType(bitcoin_data.block_type)), @@ -238,8 +240,8 @@ class Protocol(BaseProtocol): def handle_headers(self, headers): for header in headers: header = header['header'] - print header self.get_block_header.got_response(bitcoin_data.block_header_type.hash256(header), header) + self.factory.new_headers.happened([header['header'] for header in headers]) message_reply = bitcoin_data.ComposedType([ ('hash', bitcoin_data.HashType()), @@ -276,6 +278,7 @@ class ClientFactory(protocol.ReconnectingClientFactory): self.new_block = variable.Event() self.new_tx = variable.Event() + self.new_headers = variable.Event() def buildProtocol(self, addr): p = self.protocol(self.net) @@ -288,14 +291,112 @@ class ClientFactory(protocol.ReconnectingClientFactory): def getProtocol(self): return self.conn.get_not_none() +class HeaderWrapper(object): + def __init__(self, header): + self.hash = bitcoin_data.block_header_type.hash256(header) + self.previous_hash = header['previous_block'] + +class HeightTracker(object): + '''Point this at a factory and let it take care of getting block heights''' + # XXX think keeps object alive + + def __init__(self, factory): + self.factory = factory + self.tracker = bitcoin_data.Tracker() + self.most_recent = None + + self.factory.new_headers.watch(self.heard_headers) + + self.think() + + @defer.inlineCallbacks + def think(self): + last = None + yield self.factory.getProtocol() + while True: + highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None + it = self.tracker.get_chain_known(highest_head) + have = [] + step = 1 + try: + cur = it.next() + except StopIteration: + cur = None + while True: + if cur is None: + break + have.append(cur.hash) + for i in xrange(step): # XXX inefficient + try: + cur = it.next() + except StopIteration: + break + else: + if len(have) > 10: + step *= 2 + continue + break + chain = list(self.tracker.get_chain_known(highest_head)) + if chain: + have.append(chain[-1].hash) + if not have: + have.append(0) + if have == last: + yield deferral.sleep(1) + last = None + continue + + last = have + good_tails = [x for x in self.tracker.tails if x is not None] + self.request(have, random.choice(good_tails) if good_tails else None) + for tail in self.tracker.tails: + if tail is None: + continue + self.request([], tail) + try: + yield self.factory.new_headers.get_deferred(timeout=1) + except defer.TimeoutError: + pass + + def heard_headers(self, headers): + header2s = map(HeaderWrapper, headers) + for header2 in header2s: + self.tracker.add(header2) + if header2s: + if self.tracker.get_height_and_last(header2s[-1].hash)[1] is None: + self.most_recent = header2s[-1].hash + if random.random() < .6: + self.request([header2s[-1].hash], None) + print len(self.tracker.shares) + + def request(self, have, last): + #print "REQ", ('[' + ', '.join(map(hex, have)) + ']', hex(last) if last is not None else None) + if self.factory.conn.value is not None: + self.factory.conn.value.send_getheaders(version=1, have=have, last=last) + + #@defer.inlineCallbacks + #XXX should defer + def getHeight(self, block_hash): + height, last = self.tracker.get_height_and_last(block_hash) + if last is not None: + self.request([], last) + #raise ValueError(last) + return height, last + if __name__ == '__main__': - factory = ClientFactory() + factory = ClientFactory(bitcoin_data.Mainnet) reactor.connectTCP('127.0.0.1', 8333, factory) - + h = HeightTracker(factory) + @repr @apply @defer.inlineCallbacks def think(): - (yield factory.getProtocol()) + while True: + yield deferral.sleep(1) + try: + print h.getHeight(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523) + except Exception, e: + traceback.print_exc() reactor.run() diff --git a/p2pool/data.py b/p2pool/data.py index b3f2f86..55a3a66 100644 --- a/p2pool/data.py +++ b/p2pool/data.py @@ -11,17 +11,17 @@ class CompressedList(bitcoin_data.Type): self.inner = inner def read(self, file): - values = bitcoin_data.ListType(self.inner).read(file) + values, file = bitcoin_data.ListType(self.inner).read(file) if values != sorted(set(values)): raise ValueError("invalid values") - references = bitcoin_data.ListType(bitcoin_data.VarIntType()).read(file) - return [values[reference] for reference in references] + references, file = bitcoin_data.ListType(bitcoin_data.VarIntType()).read(file) + return [values[reference] for reference in references], file def write(self, file, item): values = sorted(set(item)) values_map = dict((value, i) for i, value in enumerate(values)) - bitcoin_data.ListType(self.inner).write(file, values) - bitcoin_data.ListType(bitcoin_data.VarIntType()).write(file, [values_map[subitem] for subitem in item]) + file = bitcoin_data.ListType(self.inner).write(file, values) + return bitcoin_data.ListType(bitcoin_data.VarIntType()).write(file, [values_map[subitem] for subitem in item]) merkle_branch_type = bitcoin_data.ListType(bitcoin_data.ComposedType([ @@ -140,7 +140,7 @@ class Share(object): self.new_script = self.share_info['new_script'] self.subsidy = self.share_info['subsidy'] - self.previous_share_hash = self.share_data['previous_share_hash'] + self.previous_hash = self.previous_share_hash = self.share_data['previous_share_hash'] self.previous_shares_hash = self.share_data['previous_shares_hash'] self.target2 = self.share_data['target2'] @@ -270,94 +270,7 @@ def generate_transaction(tracker, previous_share_hash, new_script, subsidy, nonc ) -class Tracker(object): - def __init__(self): - self.shares = {} # hash -> share - self.reverse_shares = {} # previous_share_hash -> set of share_hashes - - self.heads = {} # head hash -> tail_hash - self.tails = {} # tail hash -> set of head hashes - self.heights = {} # share_hash -> height_to, other_share_hash - - def add_share(self, share): - if share.hash in self.shares: - return # XXX raise exception? - - self.shares[share.hash] = share - self.reverse_shares.setdefault(share.previous_share_hash, set()).add(share.hash) - - if share.hash in self.tails: - heads = self.tails.pop(share.hash) - else: - heads = set([share.hash]) - - if share.previous_share_hash in self.heads: - tail = self.heads.pop(share.previous_share_hash) - else: - tail = share.previous_share_hash - - self.tails.setdefault(tail, set()).update(heads) - if share.previous_share_hash in self.tails[tail]: - self.tails[tail].remove(share.previous_share_hash) - - for head in heads: - self.heads[head] = tail - - def get_height_and_last(self, share_hash): - orig = share_hash - height = 0 - updates = [] - while True: - if share_hash is None or share_hash not in self.shares: - break - updates.append((share_hash, height)) - if share_hash in self.heights: - height_inc, share_hash = self.heights[share_hash] - else: - height_inc, share_hash = 1, self.shares[share_hash].previous_share_hash - height += height_inc - for update_hash, height_then in updates: - self.heights[update_hash] = height - height_then, share_hash - assert (height, share_hash) == self.get_height_and_last2(orig), ((height, share_hash), self.get_height_and_last2(orig)) - return height, share_hash - - def get_height_and_last2(self, share_hash): - height = 0 - while True: - if share_hash not in self.shares: - break - share_hash = self.shares[share_hash].previous_share_hash - height += 1 - return height, share_hash - - def get_chain_known(self, share_hash): - while True: - if share_hash not in self.shares: - break - yield share_hash - share_hash = self.shares[share_hash].previous_share_hash - - def get_chain_to_root(self, start): - share_hash_to_get = start - while share_hash_to_get is not None: - share = self.shares[share_hash_to_get] - yield share - share_hash_to_get = share.previous_share_hash - - - def get_best_share_hash(self): - return None - return max(self.heads, key=self.score_chain) - ''' - def score_chain(self, start): - length = len(self.get_chain(start)) - - score = 0 - for share in itertools.islice(self.get_chain(start), self.net.CHAIN_LENGTH): - score += a - - return (min(length, 1000), score) - ''' + if __name__ == '__main__': class FakeShare(object): @@ -381,11 +294,11 @@ if __name__ == '__main__': for share_hash in t.shares: print share_hash, t.get_height_and_last(share_hash) -class OkayTracker(Tracker): +class OkayTracker(bitcoin_data.Tracker): def __init__(self, net): - Tracker.__init__(self) + bitcoin_data.Tracker.__init__(self) self.net = net - self.verified = Tracker() + self.verified = bitcoin_data.Tracker() """ self.okay_cache = {} # hash -> height @@ -423,12 +336,14 @@ class OkayTracker(Tracker): # if no successful verification because of lack of parents, request parent for head in self.heads: head_height, last = self.get_height_and_last(head) - if head_height < a and last is not None: + if head_height < self.net.CHAIN_LENGTH and last is not None: # request more + pass for share in itertools.islice(self.get_chain_known(head), None if last is None else head_height - self.net.CHAIN_LENGTH): # XXX change length for None - in share in self.verified.shares: + if share.hash in self.verified.shares: break + print share try: share.check(self, self.net) except: @@ -437,20 +352,22 @@ class OkayTracker(Tracker): traceback.print_exc() print else: - self.verified.add_share(share_hash) + self.verified.add(share) break # try to get at least CHAIN_LENGTH height for each verified head, requesting parents if needed for head in self.verified.heads: head_height, last = self.verified.get_height_and_last(head) - a + if head_height < self.net.CHAIN_LENGTH and last is not None: + desired.add(last) # decide best verified head def score(share_hash): share = self.verified.shares[share_hash] head_height, last = self.verified.get_height_and_last(share) - return (min(head_height, net.CHAIN_LENGTH), RECENTNESS) - best = max(self.verified.heads, key=score) + RECENTNESS = 0 + return (min(head_height, self.net.CHAIN_LENGTH), RECENTNESS) + best = max(self.verified.heads, key=score) if self.verified.heads else None return best, desired diff --git a/p2pool/main.py b/p2pool/main.py index 50db2ff..a898d20 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -198,6 +198,8 @@ def main(args): @defer.inlineCallbacks def set_real_work(): work, height = yield getwork(bitcoind) + best, desired = tracker.think() + # XXX desired? current_work.set(dict( version=work.version, previous_block=work.previous_block, @@ -205,7 +207,7 @@ def main(args): height=height + 1, - best_share_hash=tracker.get_best_share_hash(), + best_share_hash=best, )) current_work2.set(dict( timestamp=work.timestamp, @@ -236,7 +238,7 @@ def main(args): print "Received share %x" % (share.hash,) - tracker.add_share(share) + tracker.add(share) best, desired = tracker.think() for peer2, share_hash in desired: peer2.get_shares([share_hash]) @@ -340,9 +342,7 @@ def main(args): # send share when the chain changes to their chain def work_changed(new_work): #print 'Work changed:', new_work - for share_hash in tracker.get_chain_known(new_work['best_share_hash']): - if share_hash is None: continue - share = tracker.shares[share_hash] + for share in tracker.get_chain_known(new_work['best_share_hash']): if share.shared: break share_share(share) diff --git a/p2pool/util/variable.py b/p2pool/util/variable.py index dfa1c3e..cdc64d7 100644 --- a/p2pool/util/variable.py +++ b/p2pool/util/variable.py @@ -1,6 +1,7 @@ import itertools -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.python import failure class Event(object): def __init__(self): @@ -31,9 +32,16 @@ class Event(object): for func in one_time_observers.itervalues(): func(event) - def get_deferred(self): + def get_deferred(self, timeout=None): df = defer.Deferred() - self.watch_one_time(df.callback) + id1 = self.watch_one_time(df.callback) + if timeout is not None: + def do_timeout(): + df.errback(failure.Failure(defer.TimeoutError())) + self.unwatch_one_time(id1) + self.unwatch_one_time(x) + delay = reactor.callLater(timeout, do_timeout) + x = self.watch_one_time(lambda value: delay.cancel()) return df class Variable(object): -- 1.7.1