aba9532e406eb5fb9da5edd59f7f4a81e7c05275
[p2pool.git] / p2pool / bitcoin / p2p.py
1 '''
2 Implementation of Bitcoin's p2p protocol
3 '''
4
5 from __future__ import division
6
7 import hashlib
8 import random
9 import struct
10 import time
11 import traceback
12
13 from twisted.internet import protocol, reactor
14
15 from . import data as bitcoin_data
16 from p2pool.util import variable, datachunker, deferral
17
18 class BaseProtocol(protocol.Protocol):
19     def connectionMade(self):
20         self.dataReceived = datachunker.DataChunker(self.dataReceiver())
21     
22     def dataReceiver(self):
23         while True:
24             start = ''
25             while start != self._prefix:
26                 start = (start + (yield 1))[-len(self._prefix):]
27             
28             command = (yield 12).rstrip('\0')
29             length, = struct.unpack('<I', (yield 4))
30             
31             if self.use_checksum:
32                 checksum = yield 4
33             else:
34                 checksum = None
35             
36             payload = yield length
37             
38             if checksum is not None:
39                 if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum:
40                     print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
41                     print 'INVALID HASH'
42                     continue
43             
44             type_ = getattr(self, "message_" + command, None)
45             if type_ is None:
46                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
47                 print 'NO TYPE FOR', repr(command)
48                 continue
49             
50             try:
51                 payload2 = type_.unpack(payload)
52             except:
53                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
54                 traceback.print_exc()
55                 continue
56             
57             handler = getattr(self, 'handle_' + command, None)
58             if handler is None:
59                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
60                 print 'NO HANDLER FOR', command
61                 continue
62             
63             #print 'RECV', command, payload2
64             
65             try:
66                 handler(**payload2)
67             except:
68                 print 'RECV', command, checksum.encode('hex') if checksum is not None else None, repr(payload.encode('hex')), len(payload)
69                 traceback.print_exc()
70                 continue
71     
72     def sendPacket(self, command, payload2={}):
73         type_ = getattr(self, "message_" + command, None)
74         if type_ is None:
75             raise ValueError('invalid command')
76         payload = type_.pack(payload2)
77         if len(command) >= 12:
78             raise ValueError('command too long')
79         if self.use_checksum:
80             checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
81         else:
82             checksum = ''
83         data = self._prefix + struct.pack('<12sI', command, len(payload)) + checksum + payload
84         self.transport.write(data)
85         #print 'SEND', command, payload2
86     
87     def __getattr__(self, attr):
88         prefix = 'send_'
89         if attr.startswith(prefix):
90             command = attr[len(prefix):]
91             return lambda **payload2: self.sendPacket(command, payload2)
92         #return protocol.Protocol.__getattr__(self, attr)
93         raise AttributeError(attr)
94
95 class Protocol(BaseProtocol):
96     def __init__(self, net):
97         self._prefix = net.BITCOIN_P2P_PREFIX
98     
99     version = 0
100     
101     @property
102     def use_checksum(self):
103         return self.version >= 209
104     
105     
106     null_order = '\0'*60
107     
108     def connectionMade(self):
109         BaseProtocol.connectionMade(self)
110         
111         self.send_version(
112             version=32200,
113             services=1,
114             time=int(time.time()),
115             addr_to=dict(
116                 services=1,
117                 address=self.transport.getPeer().host,
118                 port=self.transport.getPeer().port,
119             ),
120             addr_from=dict(
121                 services=1,
122                 address=self.transport.getHost().host,
123                 port=self.transport.getHost().port,
124             ),
125             nonce=random.randrange(2**64),
126             sub_version_num='',
127             start_height=0,
128         )
129     
130     message_version = bitcoin_data.ComposedType([
131         ('version', bitcoin_data.StructType('<I')),
132         ('services', bitcoin_data.StructType('<Q')),
133         ('time', bitcoin_data.StructType('<Q')),
134         ('addr_to', bitcoin_data.address_type),
135         ('addr_from', bitcoin_data.address_type),
136         ('nonce', bitcoin_data.StructType('<Q')),
137         ('sub_version_num', bitcoin_data.VarStrType()),
138         ('start_height', bitcoin_data.StructType('<I')),
139     ])
140     def handle_version(self, version, services, time, addr_to, addr_from, nonce, sub_version_num, start_height):
141         #print 'VERSION', locals()
142         self.version_after = version
143         self.send_verack()
144     
145     message_verack = bitcoin_data.ComposedType([])
146     def handle_verack(self):
147         self.version = self.version_after
148         
149         # connection ready
150         self.check_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_checkorder(id=id, order=order))
151         self.submit_order = deferral.GenericDeferrer(2**256, lambda id, order: self.send_submitorder(id=id, order=order))
152         self.get_block = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
153         self.get_block_header = deferral.ReplyMatcher(lambda hash: self.send_getheaders(version=1, have=[], last=hash))
154         self.get_tx = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='tx', hash=hash)]))
155         
156         if hasattr(self.factory, 'resetDelay'):
157             self.factory.resetDelay()
158         if hasattr(self.factory, 'gotConnection'):
159             self.factory.gotConnection(self)
160     
161     message_inv = bitcoin_data.ComposedType([
162         ('invs', bitcoin_data.ListType(bitcoin_data.ComposedType([
163             ('type', bitcoin_data.EnumType(bitcoin_data.StructType('<I'), {'tx': 1, 'block': 2})),
164             ('hash', bitcoin_data.HashType()),
165         ]))),
166     ])
167     def handle_inv(self, invs):
168         for inv in invs:
169             if inv['type'] == 'tx':
170                 self.factory.new_tx.happened(inv['hash'])
171             elif inv['type'] == 'block':
172                 self.factory.new_block.happened(inv['hash'])
173             else:
174                 print "Unknown inv type", item
175     
176     message_getdata = bitcoin_data.ComposedType([
177         ('requests', bitcoin_data.ListType(bitcoin_data.ComposedType([
178             ('type', bitcoin_data.EnumType(bitcoin_data.StructType('<I'), {'tx': 1, 'block': 2})),
179             ('hash', bitcoin_data.HashType()),
180         ]))),
181     ])
182     message_getblocks = bitcoin_data.ComposedType([
183         ('version', bitcoin_data.StructType('<I')),
184         ('have', bitcoin_data.ListType(bitcoin_data.HashType())),
185         ('last', bitcoin_data.HashType()),
186     ])
187     message_getheaders = bitcoin_data.ComposedType([
188         ('version', bitcoin_data.StructType('<I')),
189         ('have', bitcoin_data.ListType(bitcoin_data.HashType())),
190         ('last', bitcoin_data.HashType()),
191     ])
192     message_getaddr = bitcoin_data.ComposedType([])
193     message_checkorder = bitcoin_data.ComposedType([
194         ('id', bitcoin_data.HashType()),
195         ('order', bitcoin_data.FixedStrType(60)), # XXX
196     ])
197     message_submitorder = bitcoin_data.ComposedType([
198         ('id', bitcoin_data.HashType()),
199         ('order', bitcoin_data.FixedStrType(60)), # XXX
200     ])
201
202     message_addr = bitcoin_data.ComposedType([
203         ('addrs', bitcoin_data.ListType(bitcoin_data.ComposedType([
204             ('timestamp', bitcoin_data.StructType('<I')),
205             ('address', bitcoin_data.address_type),
206         ]))),
207     ])
208     def handle_addr(self, addrs):
209         for addr in addrs:
210             pass
211     
212     message_tx = bitcoin_data.ComposedType([
213         ('tx', bitcoin_data.tx_type),
214     ])
215     def handle_tx(self, tx):
216         self.get_block.got_response(bitcoin_data.tx_type.hash256(tx), tx)
217     
218     message_block = bitcoin_data.ComposedType([
219         ('block', bitcoin_data.block_type),
220     ])
221     def handle_block(self, block):
222         self.get_block.got_response(bitcoin_data.block_header_type.hash256(block['header']), block)
223     
224     message_headers = bitcoin_data.ComposedType([
225         ('headers', bitcoin_data.ListType(bitcoin_data.block_header_type)),
226     ])
227     def handle_headers(self, headers):
228         for header in headers:
229             self.get_block_header.got_response(bitcoin_data.block_hash(header), header)
230     
231     message_reply = bitcoin_data.ComposedType([
232         ('hash', bitcoin_data.HashType()),
233         ('reply',  bitcoin_data.EnumType(bitcoin_data.StructType('<I'), {'success': 0, 'failure': 1, 'denied': 2})),
234         ('script', bitcoin_data.VarStrType()),
235     ])
236     def handle_reply(self, hash, reply, script):
237         self.check_order.got_response(hash, dict(reply=reply, script=script))
238         self.submit_order.got_response(hash, dict(reply=reply, script=script))
239
240     message_ping = bitcoin_data.ComposedType([])
241     def handle_ping(self):
242         pass
243
244     message_alert = bitcoin_data.ComposedType([
245         ('message', bitcoin_data.VarStrType()),
246         ('signature', bitcoin_data.VarStrType()),
247     ])
248     def handle_alert(self, message, signature):
249         print "ALERT:", (message, signature)
250     
251     def connectionLost(self, reason):
252         if hasattr(self.factory, 'gotConnection'):
253             self.factory.gotConnection(None)
254
255 class ClientFactory(protocol.ReconnectingClientFactory):
256     protocol = Protocol
257     
258     maxDelay = 15
259     
260     def __init__(self, net):
261         self.net = net
262         self.conn = variable.Variable(None)
263         
264         self.new_block = variable.Event()
265         self.new_tx = variable.Event()
266     
267     def buildProtocol(self, addr):
268         p = self.protocol(self.net)
269         p.factory = self
270         return p
271     
272     def gotConnection(self, conn):
273         self.conn.set(conn)
274     
275     def getProtocol(self):
276         return self.conn.get_not_none()
277
278 if __name__ == '__main__':
279     factory = ClientFactory()
280     reactor.connectTCP('127.0.0.1', 8333, factory)
281
282     @repr
283     @apply
284     @defer.inlineCallbacks
285     def think():
286         (yield factory.getProtocol())
287     
288     reactor.run()