2 Implementation of Bitcoin's p2p protocol
5 from __future__ import division
15 from twisted.internet import defer, protocol, reactor
20 class EarlyEnd(Exception):
23 class LateEnd(Exception):
27 # the same data can have only one unpacked representation, but multiple packed binary representations
29 def _unpack(self, data):
30 f = StringIO.StringIO(data)
34 if f.tell() != len(data):
35 raise LateEnd('underread ' + repr((self, data)))
39 def unpack(self, data):
40 obj = self._unpack(data)
41 assert self._unpack(self._pack(obj)) == obj
45 f = StringIO.StringIO()
54 data = self._pack(obj)
55 assert self._unpack(data) == obj
58 class VarIntType(Type):
63 first, = struct.unpack('<B', data)
64 if first == 0xff: desc = '<Q'
65 elif first == 0xfe: desc = '<I'
66 elif first == 0xfd: desc = '<H'
68 length = struct.calcsize(desc)
69 data = file.read(length)
70 if len(data) != length:
72 return struct.unpack(desc, data)[0]
74 def write(self, file, item):
76 file.write(struct.pack('<B', item))
78 file.write(struct.pack('<BH', 0xfd, item))
79 elif item <= 0xffffffff:
80 file.write(struct.pack('<BI', 0xfe, item))
81 elif item <= 0xffffffffffffffff:
82 file.write(struct.pack('<BQ', 0xff, item))
84 raise ValueError('int too large for varint')
86 class VarStrType(Type):
88 length = VarIntType().read(file)
89 res = file.read(length)
90 if len(res) != length:
91 raise EarlyEnd('var str not long enough %r' % ((length, len(res), res),))
94 def write(self, file, item):
95 VarIntType().write(file, len(item))
98 class FixedStrType(Type):
99 def __init__(self, length):
102 def read(self, file):
103 res = file.read(self.length)
104 if len(res) != self.length:
105 raise EarlyEnd('early EOF!')
108 def write(self, file, item):
109 if len(item) != self.length:
110 raise ValueError('incorrect length!')
113 class EnumType(Type):
114 def __init__(self, inner, values):
119 for k, v in values.iteritems():
121 raise ValueError('duplicate value in values')
124 def read(self, file):
125 return self.keys[self.inner.read(file)]
127 def write(self, file, item):
128 self.inner.write(file, self.values[item])
130 class HashType(Type):
131 def read(self, file):
132 data = file.read(256//8)
133 if len(data) != 256//8:
134 raise EarlyEnd('incorrect length!')
135 return int(data[::-1].encode('hex'), 16)
137 def write(self, file, item):
138 file.write(('%064x' % (item,)).decode('hex')[::-1])
140 class ShortHashType(Type):
141 def read(self, file):
142 data = file.read(160//8)
143 if len(data) != 160//8:
144 raise EarlyEnd('incorrect length!')
145 return int(data[::-1].encode('hex'), 16)
147 def write(self, file, item):
148 file.write(('%020x' % (item,)).decode('hex')[::-1])
150 class ListType(Type):
151 def __init__(self, type):
154 def read(self, file):
155 length = VarIntType().read(file)
156 return [self.type.read(file) for i in xrange(length)]
158 def write(self, file, item):
159 VarIntType().write(file, len(item))
161 self.type.write(file, subitem)
163 class StructType(Type):
164 def __init__(self, desc):
166 self.length = struct.calcsize(self.desc)
168 def read(self, file):
169 data = file.read(self.length)
170 if len(data) != self.length:
172 res, = struct.unpack(self.desc, data)
175 def write(self, file, item):
176 data = struct.pack(self.desc, item)
177 if struct.unpack(self.desc, data)[0] != item:
178 # special test because struct doesn't error on some overflows
179 raise ValueError("item didn't survive pack cycle (%r)" % (item,))
182 class IPV6AddressType(Type):
183 def read(self, file):
187 if data[:12] != '00000000000000000000ffff'.decode('hex'):
188 raise ValueError("ipv6 addresses not supported yet")
189 return '::ffff:' + '.'.join(str(ord(x)) for x in data[12:])
191 def write(self, file, item):
193 if not item.startswith(prefix):
194 raise ValueError("ipv6 addresses not supported yet")
195 item = item[len(prefix):]
196 bits = map(int, item.split('.'))
198 raise ValueError("invalid address: %r" % (bits,))
199 data = '00000000000000000000ffff'.decode('hex') + ''.join(chr(x) for x in bits)
200 assert len(data) == 16, len(data)
203 class ComposedType(Type):
204 def __init__(self, fields):
207 def read(self, file):
209 for key, type_ in self.fields:
210 item[key] = type_.read(file)
213 def write(self, file, item):
214 for key, type_ in self.fields:
215 type_.write(file, item[key])
217 address_type = ComposedType([
218 ('services', StructType('<Q')),
219 ('address', IPV6AddressType()),
220 ('port', StructType('>H')),
223 tx_type = ComposedType([
224 ('version', StructType('<I')),
225 ('tx_ins', ListType(ComposedType([
226 ('previous_output', ComposedType([
227 ('hash', HashType()),
228 ('index', StructType('<I')),
230 ('script', VarStrType()),
231 ('sequence', StructType('<I')),
233 ('tx_outs', ListType(ComposedType([
234 ('value', StructType('<Q')),
235 ('script', VarStrType()),
237 ('lock_time', StructType('<I')),
240 block_header_type = ComposedType([
241 ('version', StructType('<I')),
242 ('previous_block', HashType()),
243 ('merkle_root', HashType()),
244 ('timestamp', StructType('<I')),
245 ('bits', StructType('<I')),
246 ('nonce', StructType('<I')),
249 block_type = ComposedType([
250 ('header', block_header_type),
251 ('txs', ListType(tx_type)),
255 return HashType().unpack(hashlib.sha256(hashlib.sha256(data).digest()).digest())
258 return ShortHashType().unpack(hashlib.new('ripemd160', hashlib.sha256(data).digest()).digest())
260 merkle_record_type = ComposedType([
261 ('left', HashType()),
262 ('right', HashType()),
265 def merkle_hash(tx_list):
266 hash_list = [doublesha(tx_type.pack(tx)) for tx in tx_list]
267 while len(hash_list) > 1:
268 hash_list = [doublesha(merkle_record_type.pack(dict(left=left, right=left if right is None else right)))
269 for left, right in zip(hash_list[::2], hash_list[1::2] + [None])]
273 return doublesha(tx_type.pack(tx))
275 def block_hash(header):
276 return doublesha(block_header.pack(header))
278 class BaseProtocol(protocol.Protocol):
279 def connectionMade(self):
280 self.dataReceived = util.DataChunker(self.dataReceiver())
282 def dataReceiver(self):
285 while start != self._prefix:
286 start = (start + (yield 1))[-len(self._prefix):]
288 command = (yield 12).rstrip('\0')
289 length, = struct.unpack('<I', (yield 4))
291 if self.use_checksum:
296 payload = yield length
298 if checksum is not None:
299 if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum:
300 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
304 type_ = self.message_types.get(command, None)
306 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
307 print 'NO TYPE FOR', repr(command)
311 payload2 = type_.unpack(payload)
313 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
314 traceback.print_exc()
317 handler = getattr(self, 'handle_' + command, None)
319 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
320 print 'NO HANDLER FOR', command
323 #print 'RECV', command, payload2
328 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
329 traceback.print_exc()
332 def sendPacket(self, command, payload2={}):
333 payload = self.message_types[command].pack(payload2)
334 if len(command) >= 12:
335 raise ValueError('command too long')
336 if self.use_checksum:
337 checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
340 data = self._prefix + struct.pack('<12sI', command, len(payload)) + checksum + payload
341 self.transport.write(data)
342 #print 'SEND', command, payload2
344 def __getattr__(self, attr):
346 if attr.startswith(prefix):
347 command = attr[len(prefix):]
348 return lambda **payload2: self.sendPacket(command, payload2)
349 #return protocol.Protocol.__getattr__(self, attr)
350 raise AttributeError(attr)
352 class Protocol(BaseProtocol):
353 def __init__(self, testnet=False):
355 self._prefix = 'fabfb5da'.decode('hex')
357 self._prefix = 'f9beb4d9'.decode('hex')
362 def use_checksum(self):
363 return self.version >= 209
366 'version': ComposedType([
367 ('version', StructType('<I')),
368 ('services', StructType('<Q')),
369 ('time', StructType('<Q')),
370 ('addr_to', address_type),
371 ('addr_from', address_type),
372 ('nonce', StructType('<Q')),
373 ('sub_version_num', VarStrType()),
374 ('start_height', StructType('<I')),
376 'verack': ComposedType([]),
377 'addr': ComposedType([
378 ('addrs', ListType(ComposedType([
379 ('timestamp', StructType('<I')),
380 ('address', address_type),
383 'inv': ComposedType([
384 ('invs', ListType(ComposedType([
385 ('type', EnumType(StructType('<I'), {'tx': 1, 'block': 2})),
386 ('hash', HashType()),
389 'getdata': ComposedType([
390 ('requests', ListType(ComposedType([
391 ('type', EnumType(StructType('<I'), {'tx': 1, 'block': 2})),
392 ('hash', HashType()),
395 'getblocks': ComposedType([
396 ('version', StructType('<I')),
397 ('have', ListType(HashType())),
398 ('last', HashType()),
400 'getheaders': ComposedType([
401 ('version', StructType('<I')),
402 ('have', ListType(HashType())),
403 ('last', HashType()),
408 'block': ComposedType([
409 ('block', block_type),
411 'headers': ComposedType([
412 ('headers', ListType(block_header_type)),
414 'getaddr': ComposedType([]),
415 'checkorder': ComposedType([
417 ('order', FixedStrType(60)), # XXX
419 'submitorder': ComposedType([
421 ('order', FixedStrType(60)), # XXX
423 'reply': ComposedType([
424 ('hash', HashType()),
425 ('reply', EnumType(StructType('<I'), {'success': 0, 'failure': 1, 'denied': 2})),
426 ('script', VarStrType()),
428 'ping': ComposedType([]),
429 'alert': ComposedType([
430 ('message', VarStrType()),
431 ('signature', VarStrType()),
437 def connectionMade(self):
438 BaseProtocol.connectionMade(self)
443 time=int(time.time()),
446 address='::ffff:' + self.transport.getPeer().host,
447 port=self.transport.getPeer().port,
451 address='::ffff:' + self.transport.getHost().host,
452 port=self.transport.getHost().port,
454 nonce=random.randrange(2**64),
459 def handle_version(self, version, services, time, addr_to, addr_from, nonce, sub_version_num, start_height):
460 #print 'VERSION', locals()
461 self.version_after = version
464 def handle_verack(self):
465 self.version = self.version_after
468 self.check_order = util.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
469 self.submit_order = util.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
470 self.get_block = util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
471 self.get_block_header = util.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
473 if hasattr(self.factory, 'resetDelay'):
474 self.factory.resetDelay()
475 if hasattr(self.factory, 'gotConnection'):
476 self.factory.gotConnection(self)
478 def handle_inv(self, invs):
480 #print 'INV', item['type'], hex(item['hash'])
481 self.send_getdata(requests=[inv])
483 def handle_addr(self, addrs):
485 pass#print 'ADDR', addr
487 def handle_reply(self, hash, reply, script):
488 self.check_order.got_response(hash, dict(reply=reply, script=script))
489 self.submit_order.got_response(hash, dict(reply=reply, script=script))
491 def handle_tx(self, tx):
492 #print 'TX', hex(merkle_hash([tx])), tx
493 self.factory.new_tx.happened(tx)
495 def handle_block(self, block):
496 self.get_block.got_response(block_hash(block['header']), block)
497 self.factory.new_block.happened(block)
499 def handle_ping(self):
502 def connectionLost(self, reason):
503 if hasattr(self.factory, 'gotConnection'):
504 self.factory.gotConnection(None)
506 class ClientFactory(protocol.ReconnectingClientFactory):
511 def __init__(self, testnet=False):
512 self.testnet = testnet
513 self.conn = util.Variable(None)
515 self.new_block = util.Event()
516 self.new_tx = util.Event()
518 def buildProtocol(self, addr):
519 p = self.protocol(self.testnet)
523 def gotConnection(self, conn):
527 def getProtocol(self):
528 return self.conn.get_not_none()
530 if __name__ == '__main__':
531 factory = ClientFactory()
532 reactor.connectTCP('127.0.0.1', 8333, factory)