moved BaseProtocol to util.p2protocol
[p2pool.git] / p2pool / bitcoin / p2p.py
1 '''
2 Implementation of Bitcoin's p2p protocol
3 '''
4
5 import random
6 import time
7
8 from twisted.internet import defer, protocol, reactor, task
9
10 from . import data as bitcoin_data
11 from p2pool.util import deferral, p2protocol, pack, variable
12
13 class Protocol(p2protocol.Protocol):
14     def __init__(self, net):
15         p2protocol.Protocol.__init__(self, net.P2P_PREFIX, 1000000)
16     
17     def connectionMade(self):
18         self.send_version(
19             version=32200,
20             services=1,
21             time=int(time.time()),
22             addr_to=dict(
23                 services=1,
24                 address=self.transport.getPeer().host,
25                 port=self.transport.getPeer().port,
26             ),
27             addr_from=dict(
28                 services=1,
29                 address=self.transport.getHost().host,
30                 port=self.transport.getHost().port,
31             ),
32             nonce=random.randrange(2**64),
33             sub_version_num='',
34             start_height=0,
35         )
36     
37     message_version = pack.ComposedType([
38         ('version', pack.IntType(32)),
39         ('services', pack.IntType(64)),
40         ('time', pack.IntType(64)),
41         ('addr_to', bitcoin_data.address_type),
42         ('addr_from', bitcoin_data.address_type),
43         ('nonce', pack.IntType(64)),
44         ('sub_version_num', pack.VarStrType()),
45         ('start_height', pack.IntType(32)),
46     ])
47     def handle_version(self, version, services, time, addr_to, addr_from, nonce, sub_version_num, start_height):
48         self.send_verack()
49     
50     message_verack = pack.ComposedType([])
51     def handle_verack(self):
52         self.get_block = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='block', hash=hash)]))
53         self.get_block_header = deferral.ReplyMatcher(lambda hash: self.send_getheaders(version=1, have=[], last=hash))
54         self.get_tx = deferral.ReplyMatcher(lambda hash: self.send_getdata(requests=[dict(type='tx', hash=hash)]))
55         
56         if hasattr(self.factory, 'resetDelay'):
57             self.factory.resetDelay()
58         if hasattr(self.factory, 'gotConnection'):
59             self.factory.gotConnection(self)
60         
61         self.pinger = task.LoopingCall(self.send_ping)
62         self.pinger.start(30)
63     
64     message_inv = pack.ComposedType([
65         ('invs', pack.ListType(pack.ComposedType([
66             ('type', pack.EnumType(pack.IntType(32), {'tx': 1, 'block': 2})),
67             ('hash', pack.IntType(256)),
68         ]))),
69     ])
70     def handle_inv(self, invs):
71         for inv in invs:
72             if inv['type'] == 'tx':
73                 self.factory.new_tx.happened(inv['hash'])
74             elif inv['type'] == 'block':
75                 self.factory.new_block.happened(inv['hash'])
76             else:
77                 print 'Unknown inv type', item
78     
79     message_getdata = pack.ComposedType([
80         ('requests', pack.ListType(pack.ComposedType([
81             ('type', pack.EnumType(pack.IntType(32), {'tx': 1, 'block': 2})),
82             ('hash', pack.IntType(256)),
83         ]))),
84     ])
85     message_getblocks = pack.ComposedType([
86         ('version', pack.IntType(32)),
87         ('have', pack.ListType(pack.IntType(256))),
88         ('last', pack.PossiblyNoneType(0, pack.IntType(256))),
89     ])
90     message_getheaders = pack.ComposedType([
91         ('version', pack.IntType(32)),
92         ('have', pack.ListType(pack.IntType(256))),
93         ('last', pack.PossiblyNoneType(0, pack.IntType(256))),
94     ])
95     message_getaddr = pack.ComposedType([])
96     
97     message_addr = pack.ComposedType([
98         ('addrs', pack.ListType(pack.ComposedType([
99             ('timestamp', pack.IntType(32)),
100             ('address', bitcoin_data.address_type),
101         ]))),
102     ])
103     def handle_addr(self, addrs):
104         for addr in addrs:
105             pass
106     
107     message_tx = pack.ComposedType([
108         ('tx', bitcoin_data.tx_type),
109     ])
110     def handle_tx(self, tx):
111         self.get_tx.got_response(bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx)), tx)
112     
113     message_block = pack.ComposedType([
114         ('block', bitcoin_data.block_type),
115     ])
116     def handle_block(self, block):
117         block_hash = bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header']))
118         self.get_block.got_response(block_hash, block)
119         self.get_block_header.got_response(block_hash, block['header'])
120     
121     message_headers = pack.ComposedType([
122         ('headers', pack.ListType(bitcoin_data.block_type)),
123     ])
124     def handle_headers(self, headers):
125         for header in headers:
126             header = header['header']
127             self.get_block_header.got_response(bitcoin_data.hash256(bitcoin_data.block_header_type.pack(header)), header)
128         self.factory.new_headers.happened([header['header'] for header in headers])
129     
130     message_ping = pack.ComposedType([])
131     def handle_ping(self):
132         pass
133     
134     message_alert = pack.ComposedType([
135         ('message', pack.VarStrType()),
136         ('signature', pack.VarStrType()),
137     ])
138     def handle_alert(self, message, signature):
139         print 'ALERT:', (message, signature)
140     
141     def connectionLost(self, reason):
142         if hasattr(self.factory, 'gotConnection'):
143             self.factory.gotConnection(None)
144         if hasattr(self, 'pinger'):
145             self.pinger.stop()
146         print 'Bitcoin connection lost. Reason:', reason.getErrorMessage()
147
148 class ClientFactory(protocol.ReconnectingClientFactory):
149     protocol = Protocol
150     
151     maxDelay = 1
152     
153     def __init__(self, net):
154         self.net = net
155         self.conn = variable.Variable(None)
156         
157         self.new_block = variable.Event()
158         self.new_tx = variable.Event()
159         self.new_headers = variable.Event()
160     
161     def buildProtocol(self, addr):
162         p = self.protocol(self.net)
163         p.factory = self
164         return p
165     
166     def gotConnection(self, conn):
167         self.conn.set(conn)
168     
169     def getProtocol(self):
170         return self.conn.get_not_none()
171
172 if __name__ == '__main__':
173     from . import networks
174     factory = ClientFactory(networks.BitcoinMainnet)
175     reactor.connectTCP('127.0.0.1', 8333, factory)
176     
177     @repr
178     @apply
179     @defer.inlineCallbacks
180     def think():
181         try:
182             print (yield (yield factory.getProtocol()).get_block(0x000000000000003aaaf7638f9f9c0d0c60e8b0eb817dcdb55fd2b1964efc5175))
183         except defer.TimeoutError:
184             print "timeout"
185         reactor.stop()
186     
187     reactor.run()