X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Fbitcoin%2Fp2p.py;h=6048d816ba68b5514a45afc0f02df467575240ee;hb=f732111a6e08d7d0649c330d1c703535a8ea80b5;hp=dab765a56a9a524b8bed3bc428338117979576d0;hpb=20b363009089cfdfb7e9200e652384f2ed0b91ac;p=p2pool.git diff --git a/p2pool/bitcoin/p2p.py b/p2pool/bitcoin/p2p.py index dab765a..6048d81 100644 --- a/p2pool/bitcoin/p2p.py +++ b/p2pool/bitcoin/p2p.py @@ -2,125 +2,23 @@ Implementation of Bitcoin's p2p protocol ''' -from __future__ import division - -import hashlib import random -import struct +import sys import time -import traceback -import zlib -from twisted.internet import defer, protocol, reactor +from twisted.internet import protocol +import p2pool from . import data as bitcoin_data -from p2pool.util import variable, datachunker, deferral - -class BaseProtocol(protocol.Protocol): - def connectionMade(self): - self.dataReceived = datachunker.DataChunker(self.dataReceiver()) - - def dataReceiver(self): - while True: - start = '' - while start != self._prefix: - start = (start + (yield 1))[-len(self._prefix):] - - command = (yield 12).rstrip('\0') - length, = struct.unpack('= 12: - raise ValueError('command too long') - type_ = getattr(self, "message_" + command, None) - if type_ is None: - raise ValueError('invalid command') - #print 'SEND', command, repr(payload2)[:500] - payload = type_.pack(payload2) - if self.use_checksum: - checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] - else: - checksum = '' - if self.compress: - payload = zlib.compress(payload) - data = self._prefix + struct.pack('<12sI', command, len(payload)) + checksum + payload - self.transport.write(data) - - def __getattr__(self, attr): - prefix = 'send_' - if attr.startswith(prefix): - command = attr[len(prefix):] - return lambda **payload2: self.sendPacket(command, payload2) - #return protocol.Protocol.__getattr__(self, attr) - raise AttributeError(attr) +from p2pool.util import deferral, p2protocol, pack, variable -class Protocol(BaseProtocol): +class Protocol(p2protocol.Protocol): def __init__(self, net): - self._prefix = net.BITCOIN_P2P_PREFIX - - version = 0 - - compress = False - @property - def use_checksum(self): - return self.version >= 209 - - - null_order = '\0'*60 + p2protocol.Protocol.__init__(self, net.P2P_PREFIX, 1000000, ignore_trailing_payload=True) def connectionMade(self): - BaseProtocol.connectionMade(self) - self.send_version( - version=32200, + version=60011, services=1, time=int(time.time()), addr_to=dict( @@ -134,87 +32,72 @@ class Protocol(BaseProtocol): port=self.transport.getHost().port, ), nonce=random.randrange(2**64), - sub_version_num='', + sub_version_num='/P2Pool:%s/' % (p2pool.__version__,), start_height=0, ) - message_version = bitcoin_data.ComposedType([ - ('version', bitcoin_data.StructType('>sys.stderr, 'Bitcoin connection lost. Reason:', reason.getErrorMessage() class ClientFactory(protocol.ReconnectingClientFactory): protocol = Protocol - maxDelay = 15 + maxDelay = 1 def __init__(self, net): self.net = net @@ -292,122 +178,3 @@ class ClientFactory(protocol.ReconnectingClientFactory): def getProtocol(self): return self.conn.get_not_none() - -class HeaderWrapper(object): - def __init__(self, header): - self.hash = bitcoin_data.block_header_type.hash256(header) - self.previous_hash = header['previous_block'] - -class HeightTracker(object): - '''Point this at a factory and let it take care of getting block heights''' - # XXX think keeps object alive - - def __init__(self, factory): - self.factory = factory - self.tracker = bitcoin_data.Tracker() - self.most_recent = None - - self.factory.new_headers.watch(self.heard_headers) - - self.think() - - @defer.inlineCallbacks - def think(self): - last = None - yield self.factory.getProtocol() - while True: - highest_head = max(self.tracker.heads, key=lambda h: self.tracker.get_height_and_last(h)[0]) if self.tracker.heads else None - it = self.tracker.get_chain_known(highest_head) - have = [] - step = 1 - try: - cur = it.next() - except StopIteration: - cur = None - while True: - if cur is None: - break - have.append(cur.hash) - for i in xrange(step): # XXX inefficient - try: - cur = it.next() - except StopIteration: - break - else: - if len(have) > 10: - step *= 2 - continue - break - chain = list(self.tracker.get_chain_known(highest_head)) - if chain: - have.append(chain[-1].hash) - if not have: - have.append(0) - if have == last: - yield deferral.sleep(1) - last = None - continue - - last = have - good_tails = [x for x in self.tracker.tails if x is not None] - self.request(have, random.choice(good_tails) if good_tails else None) - for tail in self.tracker.tails: - if tail is None: - continue - self.request([], tail) - try: - yield self.factory.new_headers.get_deferred(timeout=5) - except defer.TimeoutError: - pass - - def heard_headers(self, headers): - header2s = map(HeaderWrapper, headers) - for header2 in header2s: - self.tracker.add(header2) - if header2s: - if self.tracker.get_height_and_last(header2s[-1].hash)[1] is None: - self.most_recent = header2s[-1].hash - if random.random() < .6: - self.request([header2s[-1].hash], None) - print len(self.tracker.shares) - - def request(self, have, last): - #print "REQ", ('[' + ', '.join(map(hex, have)) + ']', hex(last) if last is not None else None) - if self.factory.conn.value is not None: - self.factory.conn.value.send_getheaders(version=1, have=have, last=last) - - #@defer.inlineCallbacks - #XXX should defer - def getHeight(self, block_hash): - height, last = self.tracker.get_height_and_last(block_hash) - if last is not None: - self.request([], last) - raise ValueError() - return height - - def get_min_height(self, block_hash): - height, last = self.tracker.get_height_and_last(block_hash) - if last is not None: - self.request([], last) - return height - - def get_highest_height(self): - return self.tracker.get_highest_height() - -if __name__ == '__main__': - factory = ClientFactory(bitcoin_data.Mainnet) - reactor.connectTCP('127.0.0.1', 8333, factory) - h = HeightTracker(factory) - - @repr - @apply - @defer.inlineCallbacks - def think(): - while True: - yield deferral.sleep(1) - try: - print h.getHeight(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523) - except Exception, e: - traceback.print_exc() - - reactor.run()