''' Implementation of Bitcoin's p2p protocol ''' from __future__ import division import hashlib import random import struct import time import zlib from twisted.internet import defer, protocol, reactor, task from twisted.python import log from . import data as bitcoin_data from p2pool.util import variable, datachunker, deferral class BaseProtocol(protocol.Protocol): def connectionMade(self): self.dataReceived = datachunker.DataChunker(self.dataReceiver()) def dataReceiver(self): while True: start = '' while start != self._prefix: start = (start + (yield 1))[-len(self._prefix):] command = (yield 12).rstrip('\0') length, = struct.unpack(' self.max_net_payload_length: print 'length too long' continue if self.use_checksum: checksum = yield 4 else: checksum = None compressed_payload = yield length if self.compress: try: d = zlib.decompressobj() payload = d.decompress(compressed_payload, self.max_payload_length) if d.unconsumed_tail: print 'compressed payload expanded too much' continue assert not len(payload) > self.max_payload_length except: log.err(None, 'Failure decompressing message:') continue else: if len(compressed_payload) > self.max_payload_length: print 'compressed payload expanded too much' continue payload = compressed_payload if checksum is not None: if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum: print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) print 'INVALID HASH' continue type_ = getattr(self, 'message_' + command, None) if type_ is None: print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) print 'NO TYPE FOR', repr(command) continue try: payload2 = type_.unpack(payload) except: print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) log.err(None, 'Error parsing message: (see RECV line)') continue handler = getattr(self, 'handle_' + command, None) if handler is None: print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) print 'NO HANDLER FOR', command continue #print 'RECV', command, repr(payload2)[:500] try: handler(**payload2) except: print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload) log.err(None, 'Error handling message: (see RECV line)') continue def sendPacket(self, command, payload2): if len(command) >= 12: raise ValueError('command too long') type_ = getattr(self, 'message_' + command, None) if type_ is None: raise ValueError('invalid command') #print 'SEND', command, repr(payload2)[:500] payload = type_.pack(payload2) if len(payload) > self.max_payload_length: raise ValueError('payload too long') if self.use_checksum: checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] else: checksum = '' compressed_payload = zlib.compress(payload) if self.compress else payload if len(compressed_payload) > self.max_net_payload_length: raise ValueError('compressed payload too long') data = self._prefix + struct.pack('<12sI', command, len(compressed_payload)) + checksum + compressed_payload self.transport.write(data) def __getattr__(self, attr): prefix = 'send_' if attr.startswith(prefix): command = attr[len(prefix):] return lambda **payload2: self.sendPacket(command, payload2) #return protocol.Protocol.__getattr__(self, attr) raise AttributeError(attr) class Protocol(BaseProtocol): def __init__(self, net): self._prefix = net.BITCOIN_P2P_PREFIX version = 0 max_payload_length = max_net_payload_length = 1000000 compress = False @property def use_checksum(self): return self.version >= 209 null_order = '\0'*60 def connectionMade(self): BaseProtocol.connectionMade(self) self.send_version( version=32200, services=1, time=int(time.time()), addr_to=dict( services=1, address=self.transport.getPeer().host, port=self.transport.getPeer().port, ), addr_from=dict( services=1, address=self.transport.getHost().host, port=self.transport.getHost().port, ), nonce=random.randrange(2**64), sub_version_num='', start_height=0, ) message_version = bitcoin_data.ComposedType([ ('version', bitcoin_data.StructType(' cur_height: break cur = self.tracker.get_nth_parent_hash(cur, step) cur_height -= step if len(have) > 10: step *= 2 if height: have.append(self.tracker.get_nth_parent_hash(highest_head, height - 1)) if not have: have.append(0) self.request(have, None) for tail in self.tracker.tails: if tail is None: continue self.request([], tail) for head in self.tracker.heads: if head == highest_head: continue self.request([head], None) def heard_headers(self, headers): for header in headers: self.tracker.add(HeaderWrapper(header)) self.think() if len(self.tracker.shares) > self.last_notified_size + 10: print 'Have %i block headers' % len(self.tracker.shares) self.last_notified_size = len(self.tracker.shares) def heard_block(self, block_hash): self.request([], block_hash) @defer.inlineCallbacks def request(self, have, last): if (tuple(have), last) in self.requested: return self.requested.add((tuple(have), last)) (yield self.factory.getProtocol()).send_getheaders(version=1, have=have, last=last) #@defer.inlineCallbacks #XXX should defer? def getHeight(self, block_hash): height, last = self.tracker.get_height_and_last(block_hash) if last is not None: self.request([], last) raise ValueError() return height def get_min_height(self, block_hash): height, last = self.tracker.get_height_and_last(block_hash) if last is not None: self.request([], last) return height def get_highest_height(self): return self.tracker.get_highest_height() def stop(self): self.factory.new_headers.unwatch(self._watch1) self.factory.new_block.unwatch(self._watch2) self._clear_task.stop() if __name__ == '__main__': factory = ClientFactory(bitcoin_data.Mainnet) reactor.connectTCP('127.0.0.1', 8333, factory) h = HeightTracker(factory) @repr @apply @defer.inlineCallbacks def think(): while True: yield deferral.sleep(1) print h.get_min_height(0xa285c3cb2a90ac7194cca034512748289e2526d9d7ae6ee7523) reactor.run()