fixed network issue, added memory profiler
[p2pool.git] / p2p.py
1 from __future__ import division
2
3 import random
4 import time
5 import traceback
6
7 from twisted.internet import defer, reactor, protocol, task
8
9 import bitcoin_p2p
10 import conv
11 import util
12 import p2pool
13
14 # mode
15 #     0: send hash first (high latency, low bandwidth)
16 #     1: send entire share (low latency, high bandwidth)
17
18 if 0:
19     import pygame
20     d = pygame.display.set_mode((512, 512))
21     task.LoopingCall(pygame.display.update).start(.1)
22     def draw_circle(id, color=(255,0,0)):
23         id = repr(id)
24         pygame.draw.circle(d, (255, 0, 0), (hash(id)%512, hash(id)//512%512), 4)
25     def draw_line(id, id2, color):
26         id = repr(id)
27         pygame.draw.line(d, color, (hash(id)%512, hash(id)//512%512), (hash(id2)%512, hash(id2)//512%512))
28 else:
29     draw_circle = draw_line = lambda *args, **kwargs: None
30
31 class Protocol(bitcoin_p2p.BaseProtocol):
32     version = 0
33     sub_version = ""
34     
35     def __init__(self, node):
36         self.node = node
37     
38     @property
39     def _prefix(self):
40         if self.node.testnet:
41             return 'f77cea5d16a2183f'.decode('hex')
42         else:
43             return '95ec1eda53c5e716'.decode('hex')
44     
45     use_checksum = True
46     
47     message_types = {
48         'version': bitcoin_p2p.ComposedType([
49             ('version', bitcoin_p2p.StructType('<I')),
50             ('services', bitcoin_p2p.StructType('<Q')),
51             ('addr_to', bitcoin_p2p.address),
52             ('addr_from', bitcoin_p2p.address),
53             ('nonce', bitcoin_p2p.StructType('<Q')),
54             ('sub_version', bitcoin_p2p.VarStrType()),
55             ('mode', bitcoin_p2p.StructType('<I')),
56             ('state', bitcoin_p2p.ComposedType([
57                 ('chain_id', p2pool.chain_id_type),
58                 ('highest', bitcoin_p2p.ComposedType([
59                     ('hash', bitcoin_p2p.HashType()),
60                     ('height', bitcoin_p2p.StructType('<Q')),
61                 ])),
62             ])),
63         ]),
64         
65         'update_mode': bitcoin_p2p.ComposedType([
66             ('mode', bitcoin_p2p.StructType('<I')),
67         ]),
68         
69         'ping': bitcoin_p2p.ComposedType([]),
70         
71         'addrme': bitcoin_p2p.ComposedType([
72             ('port', bitcoin_p2p.StructType('<H')),
73         ]),
74         'addrs': bitcoin_p2p.ComposedType([
75             ('addrs', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
76                 ('timestamp', bitcoin_p2p.StructType('<Q')),
77                 ('address', bitcoin_p2p.address),
78             ]))),
79         ]),
80         'getaddrs': bitcoin_p2p.ComposedType([
81             ('count', bitcoin_p2p.StructType('<I')),
82         ]),
83         
84         'gettobest': bitcoin_p2p.ComposedType([
85             ('chain_id', p2pool.chain_id_type),
86             ('have', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
87         ]),
88         'getshares': bitcoin_p2p.ComposedType([
89             ('chain_id', p2pool.chain_id_type),
90             ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
91         ]),
92         
93         'share0s': bitcoin_p2p.ComposedType([
94             ('chains', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
95                 ('chain_id', p2pool.chain_id_type),
96                 ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
97             ]))),
98         ]),
99         'share1s': bitcoin_p2p.ComposedType([
100             ('share1s', bitcoin_p2p.ListType(p2pool.share1)),
101         ]),
102         'share2s': bitcoin_p2p.ComposedType([
103             ('share2s', bitcoin_p2p.ListType(bitcoin_p2p.block)),
104         ]),
105     }
106     
107     other_version = None
108     node_var_watch = None
109     connected2 = False
110     
111     @property
112     def mode(self):
113         return min(self.node.mode_var.value, self.other_mode_var.value)
114     
115     def connectionMade(self):
116         bitcoin_p2p.BaseProtocol.connectionMade(self)
117         if isinstance(self.factory, ClientFactory):
118             draw_line(self.node.port, self.transport.getPeer().port, (128, 128, 128))
119         
120         chain = self.node.current_work.value['current_chain']
121         highest_share2 = chain.get_highest_share2()
122         self.send_version(
123             version=self.version,
124             services=0,
125             addr_to=dict(
126                 services=0,
127                 address='::ffff:' + self.transport.getPeer().host,
128                 port=self.transport.getPeer().port,
129             ),
130             addr_from=dict(
131                 services=0,
132                 address='::ffff:' + self.transport.getHost().host,
133                 port=self.transport.getHost().port,
134             ),
135             nonce=self.node.nonce,
136             sub_version=self.sub_version,
137             mode=self.node.mode_var.value,
138             state=dict(
139                 chain_id=p2pool.chain_id_type.unpack(chain.chain_id_data),
140                 highest=dict(
141                     hash=highest_share2.share.hash if highest_share2 is not None else 2**256-1,
142                     height=highest_share2.height if highest_share2 is not None else 0,
143                 ),
144             ),
145         )
146         
147         self.node_var_watch = self.node.mode_var.changed.watch(lambda new_mode: self.send_set_mode(mode=new_mode))
148         
149         reactor.callLater(10, self._connect_timeout)
150     
151     def _connect_timeout(self):
152         if not self.connected2 and self.transport.connected:
153             print "Handshake timed out, disconnecting"
154             self.transport.loseConnection()
155     
156     @defer.inlineCallbacks
157     def _think(self):
158         while self.connected2:
159             self.send_ping()
160             yield util.sleep(random.expovariate(1/100))
161     
162     @defer.inlineCallbacks
163     def _think2(self):
164         while self.connected2:
165             self.send_addrme(port=self.node.port)
166             print "sending addrme"
167             yield util.sleep(random.expovariate(1/100))
168     
169     def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
170         self.other_version = version
171         self.other_services = services
172         self.other_mode_var = util.Variable(mode)
173         
174         if nonce == self.node.nonce:
175             #print "Detected connection to self, disconnecting"
176             self.transport.loseConnection()
177             return
178         if nonce in self.node.peers:
179             print "Detected duplicate connection, disconnecting"
180             self.transport.loseConnection()
181             return
182         
183         self.nonce = nonce
184         self.connected2 = True
185         self.node.got_conn(self)
186         
187         self._think()
188         self._think2()
189         
190         if state['highest']['hash'] != 2**256 - 1:
191             self.handle_share0s(chains=[dict(
192                 chain_id=state['chain_id'],
193                 hashes=[state['highest']['hash']],
194             )])
195         
196         if isinstance(self.factory, ClientFactory):
197             draw_line(self.node.port, self.transport.getPeer().port, (0, 255, 0))
198     
199     def handle_set_mode(self, mode):
200         self.other_mode_var.set(mode)
201     
202     def handle_ping(self):
203         pass
204     
205     def handle_addrme(self, port):
206         host = self.transport.getPeer().host
207         print "addrme from", host, port
208         if host == '127.0.0.1':
209             if random.random() < .7 and self.node.peers:
210                 random.choice(self.node.peers.values()).send_addrme(port=port) # services...
211         else:
212             self.node.got_addr(('::ffff:' + self.transport.getPeer().host, port), self.other_services, int(time.time()))
213             if random.random() < .7 and self.node.peers:
214                 random.choice(self.node.peers.values()).send_addrs(addrs=[
215                     dict(
216                         address=dict(
217                             services=self.other_services,
218                             address='::ffff:' + host,
219                             port=port,
220                         ),
221                         timestamp=int(time.time()),
222                     ),
223                 ])
224     def handle_addrs(self, addrs):
225         for addr_record in addrs:
226             self.node.got_addr((addr_record['address']['address'], addr_record['address']['port']), addr_record['address']['services'], min(int(time.time()), addr_record['timestamp']))
227             if random.random() < .7 and self.node.peers:
228                 random.choice(self.node.peers.values()).send_addrs(addrs=[addr_record])
229     def handle_getaddrs(self, count):
230         self.send_addrs(addrs=[
231             dict(
232                 timestamp=self.node.addr_store[host, port][2],
233                 address=dict(
234                     services=self.node.addr_store[host, port][0],
235                     address=host,
236                     port=port,
237                 ),
238             ) for host, port in
239             random.sample(self.node.addr_store.keys(), min(count, len(self.node.addr_store)))
240         ])
241     
242     def handle_gettobest(self, chain_id, have):
243         self.node.handle_get_to_best(p2pool.chain_id_type.pack(chain_id), have, self)
244     
245     def handle_getshares(self, chain_id, hashes):
246         self.node.handle_get_shares(p2pool.chain_id_type.pack(chain_id), hashes, self)
247     
248     def handle_share0s(self, chains):
249         for chain in chains:
250             for hash_ in chain['hashes']:
251                 self.node.handle_share_hash(p2pool.chain_id_type.pack(chain['chain_id']), hash_, self)
252     def handle_share1s(self, share1s):
253         for share1 in share1s:
254             hash_ = bitcoin_p2p.block_hash(share1['header'])
255             if hash_ <= conv.bits_to_target(share1['header']['bits']):
256                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
257                 self.transport.loseConnection()
258                 return
259             share = p2pool.Share(share1['header'], gentx=share1['gentx'])
260             self.node.handle_share(share, self)
261     def handle_share2s(self, share2s):
262         for share2 in share2s:
263             hash_ = bitcoin_p2p.block_hash(share2['header'])
264             if not hash_ <= conv.bits_to_target(share2['header']['bits']):
265                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
266                 self.transport.loseConnection()
267                 return
268             share = p2pool.Share(share2['header'], txns=share2['txns'])
269             self.node.handle_share(share, self)
270     
271     def send_share(self, share, full=False):
272         if share.hash <= conv.bits_to_target(share.header['bits']):
273             self.send_share2s(share2s=[share.as_block()])
274         else:
275             if self.mode == 0 and not full:
276                 self.send_share0s(chains=[dict(
277                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
278                     hashes=[share.hash],
279                 )])
280             elif self.mode == 1 or full:
281                 self.send_share1s(share1s=[dict(
282                     header=share.header,
283                     gentx=share.gentx,
284                 )])
285             else:
286                 raise ValueError(self.mode)
287     
288     def connectionLost(self, reason):
289         if self.node_var_watch is not None:
290             self.node.mode_var.changed.unwatch(self.node_var_watch)
291         
292         if self.connected2:
293             self.node.lost_conn(self)
294         
295         if isinstance(self.factory, ClientFactory):
296             draw_line(self.node.port, self.transport.getPeer().port, (255, 0, 0))
297
298 class ServerFactory(protocol.ServerFactory):
299     def __init__(self, node):
300         self.node = node
301     
302     def buildProtocol(self, addr):
303         p = Protocol(self.node)
304         p.factory = self
305         return p
306
307 class ClientFactory(protocol.ClientFactory):
308     def __init__(self, node):
309         self.node = node
310     
311     def buildProtocol(self, addr):
312         p = Protocol(self.node)
313         p.factory = self
314         return p
315     
316     def startedConnecting(self, connector):
317         self.node.attempt_started(connector)
318     
319     def clientConnectionFailed(self, connector, reason):
320         self.node.attempt_failed(connector)
321     
322     def clientConnectionLost(self, connector, reason):
323         self.node.attempt_ended(connector)
324
325 addrdb_key = bitcoin_p2p.ComposedType([
326     ('address', bitcoin_p2p.IPV6AddressType()),
327     ('port', bitcoin_p2p.StructType('>H')),
328 ])
329 addrdb_value = bitcoin_p2p.ComposedType([
330     ('services', bitcoin_p2p.StructType('<Q')),
331     ('first_seen', bitcoin_p2p.StructType('<Q')),
332     ('last_seen', bitcoin_p2p.StructType('<Q')),
333 ])
334
335 class AddrStore(util.DictWrapper):
336     def encode_key(self, (address, port)):
337         return addrdb_key.pack(dict(address=address, port=port))
338     def decode_key(self, encoded_key):
339         k = addrdb_key.unpack(encoded_key)
340         return k['address'], k['port']
341     
342     def encode_value(self, (services, first_seen, last_seen)):
343         return addrdb_value.pack(dict(services=services, first_seen=first_seen, last_seen=last_seen))
344     def decode_value(self, encoded_value):
345         v = addrdb_value.unpack(encoded_value)
346         return v['services'], v['first_seen'], v['last_seen']
347
348 class Node(object):
349     def __init__(self, current_work, port, testnet, addr_store=None, preferred_addrs=[], mode=0, desired_peers=10, max_attempts=100):
350         if addr_store is None:
351             addr_store = {}
352         
353         self.port = port
354         self.testnet = testnet
355         self.addr_store = AddrStore(addr_store)
356         self.preferred_addrs = preferred_addrs
357         self.mode_var = util.Variable(mode)
358         self.desired_peers = desired_peers
359         self.max_attempts = max_attempts
360         self.current_work = current_work
361         
362         self.nonce = random.randrange(2**64)
363         self.attempts = {}
364         self.peers = {}
365         self.running = False
366         
367         draw_circle(self.port)
368     
369     def start(self):
370         if self.running:
371             raise ValueError("already running")
372         
373         self.running = True
374         
375         self.listen_port = reactor.listenTCP(self.port, ServerFactory(self))
376         
377         self._think()
378         self._think2()
379     
380     @defer.inlineCallbacks
381     def _think(self):
382         while self.running:
383             try:
384                 if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)):
385                     if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store):
386                         host, port = random.choice(self.preferred_addrs)
387                     else:
388                         host2, port = random.choice(self.addr_store.keys())
389                         prefix = '::ffff:'
390                         if not host2.startswith(prefix):
391                             raise ValueError("invalid address")
392                         host = host2[len(prefix):]
393                     
394                     if (host, port) not in self.attempts:
395                         #print "Trying to connect to", host, port
396                         reactor.connectTCP(host, port, ClientFactory(self), timeout=10)
397             except:
398                 traceback.print_exc()
399             
400             yield util.sleep(random.expovariate(1/5))
401     
402     @defer.inlineCallbacks
403     def _think2(self):
404         while self.running:
405             try:
406                 if len(self.addr_store) < self.preferred_addrs and self.peers:
407                     random.choice(self.peers.values()).send_getaddrs(count=8)
408             except:
409                 traceback.print_exc()
410             
411             yield util.sleep(random.expovariate(1/20))
412     
413     def stop(self):
414         if not self.running:
415             raise ValueError("already stopped")
416         
417         self.running = False
418         
419         self.listen_port.stopListening()
420     
421     
422     def attempt_started(self, connector):
423         host, port = connector.getDestination().host, connector.getDestination().port
424         if (host, port) in self.attempts:
425             raise ValueError("already have attempt")
426         self.attempts[host, port] = connector
427     
428     def attempt_failed(self, connector):
429         self.attempt_ended(connector)
430     
431     def attempt_ended(self, connector):
432         host, port = connector.getDestination().host, connector.getDestination().port
433         if (host, port) not in self.attempts:
434             raise ValueError("don't have attempt")
435         if connector is not self.attempts[host, port]:
436             raise ValueError("wrong connector")
437         del self.attempts[host, port]
438     
439     
440     def got_conn(self, conn):
441         if conn.nonce in self.peers:
442             raise ValueError("already have peer")
443         self.peers[conn.nonce] = conn
444         
445         print "Connected to peer %s:%i" % (conn.transport.getPeer().host, conn.transport.getPeer().port)
446     
447     def lost_conn(self, conn):
448         if conn.nonce not in self.peers:
449             raise ValueError("don't have peer")
450         if conn is not self.peers[conn.nonce]:
451             raise ValueError("wrong conn")
452         del self.peers[conn.nonce]
453         
454         print "Lost peer %s:%i" % (conn.transport.getPeer().host, conn.transport.getPeer().port)
455     
456     
457     def got_addr(self, (host, port), services, timestamp):
458         if (host, port) in self.addr_store:
459             old_services, old_first_seen, old_last_seen = self.addr_store[host, port]
460             self.addr_store[host, port] = services, old_first_seen, max(old_last_seen, timestamp)
461         else:
462             self.addr_store[host, port] = services, timestamp, timestamp
463     
464     def handle_get_to_best(self, chain_id_data, have, peer):
465         pass
466     
467     def handle_get_shares(self, chain_id_data, hashes, peer):
468         pass
469     
470     def handle_share_hash(self, chain_id_data, hash, peer):
471         pass
472
473 if __name__ == '__main__':
474     p = random.randrange(2**15, 2**16)
475     for i in xrange(5):
476         p2 = random.randrange(2**15, 2**16)
477         print p, p2
478         n = Node(p2, True, {addrdb_key.pack(dict(address='::ffff:' + '127.0.0.1', port=p)): addrdb_value.pack(dict(services=0, first_seen=int(time.time())-10, last_seen=int(time.time())))})
479         n.start()
480         p = p2
481     
482     reactor.run()