bug
[p2pool.git] / p2p.py
1 from __future__ import division
2
3 import random
4 import time
5 import traceback
6
7 from twisted.internet import defer, reactor, protocol, task
8
9 import bitcoin_p2p
10 import conv
11 import util
12 import p2pool
13
14 # mode
15 #     0: send hash first (high latency, low bandwidth)
16 #     1: send entire share (low latency, high bandwidth)
17
18 if 0:
19     import pygame
20     d = pygame.display.set_mode((512, 512))
21     task.LoopingCall(pygame.display.update).start(.1)
22     def draw_circle(id, color=(255,0,0)):
23         id = repr(id)
24         pygame.draw.circle(d, (255, 0, 0), (hash(id)%512, hash(id)//512%512), 4)
25     def draw_line(id, id2, color):
26         id = repr(id)
27         pygame.draw.line(d, color, (hash(id)%512, hash(id)//512%512), (hash(id2)%512, hash(id2)//512%512))
28 else:
29     draw_circle = draw_line = lambda *args, **kwargs: None
30
31 class Protocol(bitcoin_p2p.BaseProtocol):
32     version = 0
33     sub_version = ""
34     
35     def __init__(self, node):
36         self.node = node
37     
38     @property
39     def _prefix(self):
40         if self.node.testnet:
41             return 'f77cea5d16a2183f'.decode('hex')
42         else:
43             return '95ec1eda53c5e716'.decode('hex')
44     
45     use_checksum = True
46     
47     message_types = {
48         'version': bitcoin_p2p.ComposedType([
49             ('version', bitcoin_p2p.StructType('<I')),
50             ('services', bitcoin_p2p.StructType('<Q')),
51             ('addr_to', bitcoin_p2p.address),
52             ('addr_from', bitcoin_p2p.address),
53             ('nonce', bitcoin_p2p.StructType('<Q')),
54             ('sub_version', bitcoin_p2p.VarStrType()),
55             ('mode', bitcoin_p2p.StructType('<I')),
56             ('state', bitcoin_p2p.ComposedType([
57                 ('chain_id', p2pool.chain_id_type),
58                 ('highest', bitcoin_p2p.ComposedType([
59                     ('hash', bitcoin_p2p.HashType()),
60                     ('height', bitcoin_p2p.StructType('<Q')),
61                 ])),
62             ])),
63         ]),
64         
65         'update_mode': bitcoin_p2p.ComposedType([
66             ('mode', bitcoin_p2p.StructType('<I')),
67         ]),
68         
69         'ping': bitcoin_p2p.ComposedType([]),
70         
71         'addrme': bitcoin_p2p.ComposedType([
72             ('port', bitcoin_p2p.StructType('<H')),
73         ]),
74         'addrs': bitcoin_p2p.ComposedType([
75             ('addrs', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
76                 ('timestamp', bitcoin_p2p.StructType('<Q')),
77                 ('address', bitcoin_p2p.address),
78             ]))),
79         ]),
80         'getaddrs': bitcoin_p2p.ComposedType([
81             ('count', bitcoin_p2p.StructType('<I')),
82         ]),
83         
84         'getsharesbychain': bitcoin_p2p.ComposedType([
85             ('chain_id', p2pool.chain_id_type),
86             ('have', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
87         ]),
88         'getshares': bitcoin_p2p.ComposedType([
89             ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
90         ]),
91         
92         'share0s': bitcoin_p2p.ComposedType([
93             ('chains', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
94                 ('chain_id', p2pool.chain_id_type),
95                 ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
96             ]))),
97         ]),
98         'share1s': bitcoin_p2p.ComposedType([
99             ('share1s', bitcoin_p2p.ListType(p2pool.share1)),
100         ]),
101         'share2s': bitcoin_p2p.ComposedType([
102             ('share2s', bitcoin_p2p.ListType(bitcoin_p2p.block)),
103         ]),
104     }
105     
106     other_version = None
107     node_var_watch = None
108     connected2 = False
109     
110     @property
111     def mode(self):
112         return min(self.node.mode_var.value, self.other_mode_var.value)
113     
114     def connectionMade(self):
115         bitcoin_p2p.BaseProtocol.connectionMade(self)
116         if isinstance(self.factory, ClientFactory):
117             draw_line(self.node.port, self.transport.getPeer().port, (128, 128, 128))
118         
119         chain = self.node.current_work.value['current_chain']
120         highest_share2 = chain.get_highest_share2()
121         self.send_version(
122             version=self.version,
123             services=0,
124             addr_to=dict(
125                 services=0,
126                 address='::ffff:' + self.transport.getPeer().host,
127                 port=self.transport.getPeer().port,
128             ),
129             addr_from=dict(
130                 services=0,
131                 address='::ffff:' + self.transport.getHost().host,
132                 port=self.transport.getHost().port,
133             ),
134             nonce=self.node.nonce,
135             sub_version=self.sub_version,
136             mode=self.node.mode_var.value,
137             state=dict(
138                 chain_id=p2pool.chain_id_type.unpack(chain.chain_id_data),
139                 highest=dict(
140                     hash=highest_share2.share.hash,
141                     height=highest_share2.height,
142                 ),
143             ),
144         )
145         
146         self.node_var_watch = self.node.mode_var.changed.watch(lambda new_mode: self.send_set_mode(mode=new_mode))
147         
148         self._think()
149         self._think2()
150         
151         reactor.callLater(10, self._connect_timeout)
152     
153     def _connect_timeout(self):
154         if not self.connected2 and self.transport.connected:
155             print "Handshake timed out, disconnecting"
156             self.transport.loseConnection()
157     
158     @defer.inlineCallbacks
159     def _think(self):
160         while self.connected2:
161             self.send_ping()
162             yield util.sleep(random.expovariate(1/100))
163     
164     @defer.inlineCallbacks
165     def _think2(self):
166         while self.connected2:
167             self.send_addrme(port=self.node.port)
168             yield util.sleep(random.expovariate(1/100))
169     
170     def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
171         self.other_version = version
172         self.other_services = services
173         self.other_mode_var = util.Variable(mode)
174         
175         if nonce == self.node.nonce:
176             #print "Detected connection to self, disconnecting"
177             self.transport.loseConnection()
178             return
179         
180         # XXX use state
181         
182         self.connected2 = True
183         self.node.got_conn(self, services)
184         if isinstance(self.factory, ClientFactory):
185             draw_line(self.node.port, self.transport.getPeer().port, (0, 255, 0))
186     
187     def handle_set_mode(self, mode):
188         self.other_mode_var.set(mode)
189     
190     def handle_ping(self):
191         pass
192     
193     def handle_addrme(self, port):
194         self.node.got_addr(('::ffff:' + self.transport.getPeer().host, port), self.other_services, int(time.time()))
195         if random.random() < .7 and self.node.peers:
196             random.choice(self.node.peers.values()).send_addrs(addrs=[dict(address=dict(services=self.other_services, address='::ffff:' + self.transport.getPeer().host, port=port), timestamp=int(time.time()))])
197     def handle_addrs(self, addrs):
198         for addr_record in addrs:
199             self.node.got_addr((addr_record['address']['address'], addr_record['address']['port']), addr_record['address']['services'], min(int(time.time()), addr_record['timestamp']))
200             if random.random() < .7 and self.node.peers:
201                 random.choice(self.node.peers.values()).send_addrs(addrs=[addr_record])
202     def handle_getaddrs(self, count):
203         self.send_addrs(addrs=[
204             dict(
205                 timestamp=self.node.addr_store[host, port][2],
206                 address=dict(
207                     services=self.node.addr_store[host, port][0],
208                     address=host,
209                     port=port,
210                 ),
211             ) for host, port in
212             random.sample(self.node.addr_store.keys(), min(count, len(self.node.addr_store)))
213         ])
214     
215     def handle_share0s(self, chains):
216         for chain in chains:
217             for hash_ in chain['hashes']:
218                 self.node.handle_share_hash(p2pool.chain_id_type.pack(chain['chain_id']), hash_, self)
219     def handle_share1s(self, share1s):
220         for share1 in share1s:
221             hash_ = bitcoin_p2p.block_hash(share1['header'])
222             if hash_ <= conv.bits_to_target(share1['header']['bits']):
223                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
224                 self.transport.loseConnection()
225                 return
226             share = p2pool.Share(share1['header'], gentx=share1['gentx'])
227             self.node.handle_share(share)
228     def handle_share2s(self, share2s):
229         for share2 in share2s:
230             hash_ = bitcoin_p2p.block_hash(share2['header'])
231             if not hash_ <= conv.bits_to_target(share1['header']['bits']):
232                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
233                 self.transport.loseConnection()
234                 return
235             share = p2pool.Share(share1['header'], txns=share1['txns'])
236             self.node.handle_share(share)
237     
238     def send_share(self, share):
239         if share.hash <= conv.bits_to_target(share.header['bits']):
240             self.send_share2s(share2s=[share.as_block()])
241         else:
242             if self.mode == 0:
243                 self.send_share0s(chains=[dict(
244                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
245                     hashes=[share.hash],
246                 )])
247             elif self.mode == 1:
248                 self.send_share1s(share1s=[dict(
249                     header=share.header,
250                     gentx=share.gentx,
251                 )])
252             else:
253                 raise ValueError(self.mode)
254     
255     def connectionLost(self, reason):
256         if self.node_var_watch is not None:
257             self.node.mode_var.changed.unwatch(self.node_var_watch)
258         
259         if self.connected2:
260             self.node.lost_conn(self)
261         
262         if isinstance(self.factory, ClientFactory):
263             draw_line(self.node.port, self.transport.getPeer().port, (255, 0, 0))
264
265 class ServerFactory(protocol.ServerFactory):
266     def __init__(self, node):
267         self.node = node
268     
269     def buildProtocol(self, addr):
270         p = Protocol(self.node)
271         p.factory = self
272         return p
273
274 class ClientFactory(protocol.ClientFactory):
275     def __init__(self, node):
276         self.node = node
277     
278     def buildProtocol(self, addr):
279         p = Protocol(self.node)
280         p.factory = self
281         return p
282     
283     def startedConnecting(self, connector):
284         self.node.attempt_started(connector)
285     
286     def clientConnectionFailed(self, connector, reason):
287         self.node.attempt_failed(connector)
288     
289     def clientConnectionLost(self, connector, reason):
290         self.node.attempt_ended(connector)
291
292 addrdb_key = bitcoin_p2p.ComposedType([
293     ('address', bitcoin_p2p.IPV6AddressType()),
294     ('port', bitcoin_p2p.StructType('>H')),
295 ])
296 addrdb_value = bitcoin_p2p.ComposedType([
297     ('services', bitcoin_p2p.StructType('<Q')),
298     ('first_seen', bitcoin_p2p.StructType('<Q')),
299     ('last_seen', bitcoin_p2p.StructType('<Q')),
300 ])
301
302 class AddrStore(util.DictWrapper):
303     def encode_key(self, (address, port)):
304         return addrdb_key.pack(dict(address=address, port=port))
305     def decode_key(self, encoded_key):
306         k = addrdb_key.unpack(encoded_key)
307         return k['address'], k['port']
308     
309     def encode_value(self, (services, first_seen, last_seen)):
310         return addrdb_value.pack(dict(services=services, first_seen=first_seen, last_seen=last_seen))
311     def decode_value(self, encoded_value):
312         v = addrdb_value.unpack(encoded_value)
313         return v['services'], v['first_seen'], v['last_seen']
314
315 class Node(object):
316     def __init__(self, current_work, port, testnet, addr_store=None, preferred_addrs=[], mode=0, desired_peers=10, max_attempts=100):
317         if addr_store is None:
318             addr_store = {}
319         
320         self.port = port
321         self.testnet = testnet
322         self.addr_store = AddrStore(addr_store)
323         self.preferred_addrs = preferred_addrs
324         self.mode_var = util.Variable(mode)
325         self.desired_peers = desired_peers
326         self.max_attempts = max_attempts
327         self.current_work = current_work
328         
329         self.nonce = random.randrange(2**64)
330         self.attempts = {}
331         self.peers = {}
332         self.running = False
333         
334         draw_circle(self.port)
335     
336     def start(self):
337         if self.running:
338             raise ValueError("already running")
339         
340         self.running = True
341         
342         self.listen_port = reactor.listenTCP(self.port, ServerFactory(self))
343         
344         self._think()
345         self._think2()
346     
347     @defer.inlineCallbacks
348     def _think(self):
349         while self.running:
350             try:
351                 if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)):
352                     if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store):
353                         host, port = random.choice(self.preferred_addrs)
354                     else:
355                         host2, port = random.choice(self.addr_store.keys())
356                         prefix = '::ffff:'
357                         if not host2.startswith(prefix):
358                             raise ValueError("invalid address")
359                         host = host2[len(prefix):]
360                     
361                     if (host, port) not in self.attempts and (host, port) not in self.peers:
362                         reactor.connectTCP(host, port, ClientFactory(self), timeout=10)
363             except:
364                 traceback.print_exc()
365             
366             yield util.sleep(random.expovariate(1/5))
367     
368     @defer.inlineCallbacks
369     def _think2(self):
370         while self.running:
371             try:
372                 if len(self.addr_store) < self.preferred_addrs and self.peers:
373                     random.choice(self.peers.values()).send_getaddrs(count=8)
374             except:
375                 traceback.print_exc()
376             
377             yield util.sleep(random.expovariate(1/20))
378     
379     def stop(self):
380         if not self.running:
381             raise ValueError("already stopped")
382         
383         self.running = False
384         
385         self.listen_port.stopListening()
386     
387     
388     def attempt_started(self, connector):
389         host, port = connector.getDestination().host, connector.getDestination().port
390         if (host, port) in self.attempts:
391             raise ValueError("already have attempt")
392         self.attempts[host, port] = connector
393     
394     def attempt_failed(self, connector):
395         self.attempt_ended(connector)
396     
397     def attempt_ended(self, connector):
398         host, port = connector.getDestination().host, connector.getDestination().port
399         if (host, port) not in self.attempts:
400             raise ValueError("don't have attempt")
401         if connector is not self.attempts[host, port]:
402             raise ValueError("wrong connector")
403         del self.attempts[host, port]
404     
405     
406     def got_conn(self, conn, services):
407         host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
408         if (host, port) in self.peers:
409             raise ValueError("already have peer")
410         self.peers[host, port] = conn
411         
412         print "Connected to peer %s:%i" % (host, port)
413     
414     def lost_conn(self, conn):
415         host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
416         if (host, port) not in self.peers:
417             raise ValueError("don't have peer")
418         if conn is not self.peers[host, port]:
419             raise ValueError("wrong conn")
420         del self.peers[host, port]
421         
422         print "Lost peer %s:%i" % (host, port)
423     
424     
425     def got_addr(self, (host, port), services, timestamp):
426         if (host, port) in self.addr_store:
427             old_services, old_first_seen, old_last_seen = self.addr_store[host, port]
428             self.addr_store[host, port] = services, old_first_seen, max(old_last_seen, timestamp)
429         else:
430             self.addr_store[host, port] = services, timestamp, timestamp
431     
432     
433     def handle_share_hash(self, chain_id_data, hash, peer):
434         pass
435
436 if __name__ == '__main__':
437     p = random.randrange(2**15, 2**16)
438     for i in xrange(5):
439         p2 = random.randrange(2**15, 2**16)
440         print p, p2
441         n = Node(p2, True, {addrdb_key.pack(dict(address='::ffff:' + '127.0.0.1', port=p)): addrdb_value.pack(dict(services=0, first_seen=int(time.time())-10, last_seen=int(time.time())))})
442         n.start()
443         p = p2
444     
445     reactor.run()