X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fbitcoin%2Fp2p.py;h=83a3403f0f94695db3153d5e8419594e26f21aaf;hb=0a3493d6873cfef4fb189d39e64dfbc6e162e2a7;hp=d16047973ee3c54293c12e47d40466c9fe8ab2ee;hpb=8f9537e27fde1ad51078ac107f9572c28a768965;p=p2pool.git diff --git a/p2pool/bitcoin/p2p.py b/p2pool/bitcoin/p2p.py index d160479..83a3403 100644 --- a/p2pool/bitcoin/p2p.py +++ b/p2pool/bitcoin/p2p.py @@ -2,145 +2,21 @@ Implementation of Bitcoin's p2p protocol ''' -from __future__ import division - -import hashlib import random -import struct +import sys import time -import zlib -from twisted.internet import defer, protocol, reactor, task -from twisted.python import log +from twisted.internet import protocol +import p2pool from . import data as bitcoin_data -from p2pool.util import variable, datachunker, deferral - -class TooLong(Exception): - pass - -class BaseProtocol(protocol.Protocol): - def connectionMade(self): - self.dataReceived = datachunker.DataChunker(self.dataReceiver()) - - def dataReceiver(self): - while True: - start = '' - while start != self._prefix: - start = (start + (yield 1))[-len(self._prefix):] - - command = (yield 12).rstrip('\0') - length, = struct.unpack(' self.max_net_payload_length: - print 'length too long' - continue - - if self.use_checksum: - checksum = yield 4 - else: - checksum = None - - compressed_payload = yield length - - if self.compress: - try: - d = zlib.decompressobj() - payload = d.decompress(compressed_payload, self.max_payload_length) - if d.unconsumed_tail: - print 'compressed payload expanded too much' - continue - assert not len(payload) > self.max_payload_length - except: - log.err(None, 'Failure decompressing message:') - continue - else: - if len(compressed_payload) > self.max_payload_length: - print 'compressed payload expanded too much' - continue - payload = compressed_payload - - if checksum is not None: - if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum: - print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) - print 'INVALID HASH' - continue - - type_ = getattr(self, 'message_' + command, None) - if type_ is None: - print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) - print 'NO TYPE FOR', repr(command) - continue - - try: - payload2 = type_.unpack(payload) - except: - print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) - log.err(None, 'Error parsing message: (see RECV line)') - continue - - handler = getattr(self, 'handle_' + command, None) - if handler is None: - print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) - print 'NO HANDLER FOR', command - continue - - #print 'RECV', command, repr(payload2)[:500] - - try: - handler(**payload2) - except: - print 'RECV', command, len(payload), checksum.encode('hex')[:1000] - log.err(None, 'Error handling message: (see RECV line)') - continue - - def sendPacket(self, command, payload2): - if len(command) >= 12: - raise ValueError('command too long') - type_ = getattr(self, 'message_' + command, None) - if type_ is None: - raise ValueError('invalid command') - #print 'SEND', command, repr(payload2)[:500] - payload = type_.pack(payload2) - if len(payload) > self.max_payload_length: - raise TooLong('payload too long') - if self.use_checksum: - checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] - else: - checksum = '' - compressed_payload = zlib.compress(payload) if self.compress else payload - if len(compressed_payload) > self.max_net_payload_length: - raise TooLong('compressed payload too long') - data = self._prefix + struct.pack('<12sI', command, len(compressed_payload)) + checksum + compressed_payload - self.transport.write(data) - - def __getattr__(self, attr): - prefix = 'send_' - if attr.startswith(prefix): - command = attr[len(prefix):] - return lambda **payload2: self.sendPacket(command, payload2) - #return protocol.Protocol.__getattr__(self, attr) - raise AttributeError(attr) +from p2pool.util import deferral, p2protocol, pack, variable -class Protocol(BaseProtocol): +class Protocol(p2protocol.Protocol): def __init__(self, net): - self._prefix = net.BITCOIN_P2P_PREFIX - - version = 0 - - max_payload_length = max_net_payload_length = 1000000 - - compress = False - @property - def use_checksum(self): - return self.version >= 209 - - - null_order = '\0'*60 + p2protocol.Protocol.__init__(self, net.P2P_PREFIX, 1000000, ignore_trailing_payload=True) def connectionMade(self): - BaseProtocol.connectionMade(self) - self.send_version( version=32200, services=1, @@ -156,87 +32,72 @@ class Protocol(BaseProtocol): port=self.transport.getHost().port, ), nonce=random.randrange(2**64), - sub_version_num='', + sub_version_num='/P2Pool:%s/' % (p2pool.__version__,), start_height=0, ) - message_version = bitcoin_data.ComposedType([ - ('version', bitcoin_data.StructType('>sys.stderr, 'Bitcoin connection lost. Reason:', reason.getErrorMessage() class ClientFactory(protocol.ReconnectingClientFactory): protocol = Protocol - maxDelay = 15 + maxDelay = 1 def __init__(self, net): self.net = net @@ -314,126 +170,3 @@ class ClientFactory(protocol.ReconnectingClientFactory): def getProtocol(self): return self.conn.get_not_none() - -class HeaderWrapper(object): - target = 0 - __slots__ = 'hash previous_hash'.split(' ') - - def __init__(self, header): - self.hash = bitcoin_data.block_header_type.hash256(header) - self.previous_hash = header['previous_block'] - -class HeightTracker(object): - '''Point this at a factory and let it take care of getting block heights''' - - def __init__(self, factory): - self.factory = factory - self.tracker = bitcoin_data.Tracker() - self.most_recent = None - - self._watch1 = self.factory.new_headers.watch(self.heard_headers) - self._watch2 = self.factory.new_block.watch(self.heard_block) - - self.requested = set() - self._clear_task = task.LoopingCall(self.requested.clear) - self._clear_task.start(60) - - self.last_notified_size = 0 - - self.updated = variable.Event() - - self.think() - - def think(self): - highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None - height, last = self.tracker.get_height_and_last(highest_head) - cur = highest_head - cur_height = height - have = [] - step = 1 - while cur is not None: - have.append(cur) - if step > cur_height: - break - cur = self.tracker.get_nth_parent_hash(cur, step) - cur_height -= step - if len(have) > 10: - step *= 2 - if height: - have.append(self.tracker.get_nth_parent_hash(highest_head, height - 1)) - if not have: - have.append(0) - self.request(have, None) - - for tail in self.tracker.tails: - if tail is None: - continue - self.request([], tail) - for head in self.tracker.heads: - if head == highest_head: - continue - self.request([head], None) - - def heard_headers(self, headers): - changed = False - for header in headers: - hw = HeaderWrapper(header) - if hw.hash in self.tracker.shares: - continue - changed = True - self.tracker.add(hw) - if changed: - self.updated.happened() - self.think() - - if len(self.tracker.shares) > self.last_notified_size + 10: - print 'Have %i block headers' % len(self.tracker.shares) - self.last_notified_size = len(self.tracker.shares) - - def heard_block(self, block_hash): - self.request([], block_hash) - - @defer.inlineCallbacks - def request(self, have, last): - if (tuple(have), last) in self.requested: - return - self.requested.add((tuple(have), last)) - (yield self.factory.getProtocol()).send_getheaders(version=1, have=have, last=last) - - #@defer.inlineCallbacks - #XXX should defer? - def getHeight(self, block_hash): - height, last = self.tracker.get_height_and_last(block_hash) - if last is not None: - #self.request([], last) - raise ValueError() - return height - - def get_min_height(self, block_hash): - height, last = self.tracker.get_height_and_last(block_hash) - #if last is not None: - # self.request([], last) - return height - - def get_highest_height(self): - return self.tracker.get_highest_height() - - def stop(self): - self.factory.new_headers.unwatch(self._watch1) - self.factory.new_block.unwatch(self._watch2) - self._clear_task.stop() - -if __name__ == '__main__': - factory = ClientFactory(bitcoin_data.Mainnet) - reactor.connectTCP('127.0.0.1', 8333, factory) - h = HeightTracker(factory) - - @repr - @apply - @defer.inlineCallbacks - def think(): - while True: - yield deferral.sleep(1) - print h.get_min_height(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523) - - reactor.run()