send kwargs
[p2pool.git] / p2p.py
1 from __future__ import division
2
3 import random
4 import time
5 import traceback
6
7 from twisted.internet import defer, reactor, protocol, task
8
9 import bitcoin_p2p
10 import conv
11 import util
12
13 # mode
14 #     0: send hash first (high latency, low bandwidth)
15 #     1: send entire share (low latency, high bandwidth)
16
17 if 0:
18     import pygame
19     d = pygame.display.set_mode((512, 512))
20     task.LoopingCall(pygame.display.update).start(.1)
21     def draw_circle(id, color=(255,0,0)):
22         id = repr(id)
23         pygame.draw.circle(d, (255, 0, 0), (hash(id)%512, hash(id)//512%512), 4)
24     def draw_line(id, id2, color):
25         id = repr(id)
26         pygame.draw.line(d, color, (hash(id)%512, hash(id)//512%512), (hash(id2)%512, hash(id2)//512%512))
27 else:
28     draw_circle = draw_line = lambda *args, **kwargs: None
29
30 class Protocol(bitcoin_p2p.BaseProtocol):
31     version = 0
32     sub_version = ""
33     
34     def __init__(self, node):
35         self.node = node
36     
37     @property
38     def _prefix(self):
39         if self.node.testnet:
40             return 'f77cea5d16a2183f'.decode('hex')
41         else:
42             return '95ec1eda53c5e716'.decode('hex')
43     
44     use_checksum = True
45     
46     message_types = {
47         'version': bitcoin_p2p.ComposedType([
48             ('version', bitcoin_p2p.StructType('<I')),
49             ('services', bitcoin_p2p.StructType('<Q')),
50             ('addr_to', bitcoin_p2p.address),
51             ('addr_from', bitcoin_p2p.address),
52             ('nonce', bitcoin_p2p.StructType('<Q')),
53             ('sub_version', bitcoin_p2p.VarStrType()),
54             ('mode', bitcoin_p2p.StructType('<I')),
55             ('state', bitcoin_p2p.ComposedType([
56                 ('chain_id', bitcoin_p2p.ComposedType([
57                     ('previous_p2pool_block', bitcoin_p2p.HashType()),
58                     ('bits', bitcoin_p2p.StructType('<I')),
59                 ])),
60                 ('highest', bitcoin_p2p.ComposedType([
61                     ('hash', bitcoin_p2p.HashType()),
62                     ('height', bitcoin_p2p.StructType('<Q')),
63                 ])),
64             ])),
65         ]),
66         
67         'update_mode': bitcoin_p2p.ComposedType([
68             ('mode', bitcoin_p2p.StructType('<I')),
69         ]),
70         
71         'ping': bitcoin_p2p.ComposedType([]),
72         
73         'addrme': bitcoin_p2p.ComposedType([
74             ('port', bitcoin_p2p.StructType('<H')),
75         ]),
76         'addrs': bitcoin_p2p.ComposedType([
77             ('addrs', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
78                 ('timestamp', bitcoin_p2p.StructType('<Q')),
79                 ('address', bitcoin_p2p.address),
80             ]))),
81         ]),
82         'getaddrs': bitcoin_p2p.ComposedType([
83             ('count', bitcoin_p2p.StructType('<I')),
84         ]),
85         
86         'getsharesbychain': bitcoin_p2p.ComposedType([
87             ('chain_id', bitcoin_p2p.ComposedType([
88                 ('previous_p2pool_block', bitcoin_p2p.HashType()),
89                 ('bits', bitcoin_p2p.StructType('<I')),
90             ])),
91             ('have', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
92         ]),
93         'getshares': bitcoin_p2p.ComposedType([
94             ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
95         ]),
96         
97         'share0s': bitcoin_p2p.ComposedType([
98             ('share0s', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
99                 ('chain_id', bitcoin_p2p.ComposedType([
100                     ('previous_p2pool_block', bitcoin_p2p.HashType()),
101                     ('bits', bitcoin_p2p.StructType('<I')),
102                 ])),
103                 ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
104             ]))),
105         ]), 
106         'share1s': bitcoin_p2p.ComposedType([
107             ('share1s', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
108                 ('header', bitcoin_p2p.block_header),
109                 ('gentx', bitcoin_p2p.ComposedType([
110                     ('tx', bitcoin_p2p.tx),
111                     ('merkle_branch', bitcoin_p2p.merkle_branch),
112                 ])),
113             ]))),
114         ]),
115         'share2s': bitcoin_p2p.ComposedType([
116             ('share1s', bitcoin_p2p.ListType(bitcoin_p2p.block)),
117         ]),
118     }
119     
120     other_version = None
121     node_var_watch = None
122     
123     @property
124     def mode(self):
125         return min(self.node.mode_var.value, self.other_mode_var.value)
126     
127     def connectionMade(self):
128         bitcoin_p2p.BaseProtocol.connectionMade(self)
129         if isinstance(self.factory, ClientFactory):
130             draw_line(self.node.port, self.transport.getPeer().port, (128, 128, 128))
131         
132         chain = self.node.current_work['current_chain']
133         self.send_version(
134             version=self.version,
135             services=0,
136             addr_to=dict(
137                 services=0,
138                 address='::ffff:' + self.transport.getPeer().host,
139                 port=self.transport.getPeer().port,
140             ),
141             addr_from=dict(
142                 services=0,
143                 address='::ffff:' + self.transport.getHost().host,
144                 port=self.transport.getHost().port,
145             ),
146             nonce=self.node.nonce,
147             sub_version=self.sub_version,
148             mode=self.node.mode_var.value,
149             state=dict(
150                 chain_id=dict(
151                     previous_p2pool_block=0,
152                     bits=0,
153                 ),
154                 highest=dict(
155                     hash=0,
156                     height=0,
157                 ),
158             ),
159         )
160         
161         self.node_var_watch = self.node.mode_var.changed.watch(lambda new_mode: self.send_set_mode(mode=new_mode))
162         
163         self.connected = False
164         
165         self._think()
166         self._think2()
167         
168         reactor.callLater(10, self._connect_timeout)
169     
170     def _connect_timeout(self):
171         if not self.connected and self.transport.connected:
172             print "Handshake timed out, disconnecting"
173             self.transport.loseConnection()
174     
175     @defer.inlineCallbacks
176     def _think(self):
177         while self.transport.connected:
178             self.send_ping()
179             yield util.sleep(random.expovariate(1/100))
180     
181     @defer.inlineCallbacks
182     def _think2(self):
183         while self.transport.connected:
184             self.send_addrme(port=self.node.port)
185             yield util.sleep(random.expovariate(1/100))
186     
187     def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
188         self.other_version = version
189         self.other_services = services
190         self.other_mode_var = util.Variable(mode)
191         
192         if nonce == self.node.nonce:
193             #print "Detected connection to self, disconnecting"
194             self.transport.loseConnection()
195             return
196         
197         self.connected = True
198         self.node.got_conn(self, services)
199         if isinstance(self.factory, ClientFactory):
200             draw_line(self.node.port, self.transport.getPeer().port, (0, 255, 0))
201     
202     def handle_set_mode(self, mode):
203         self.other_mode_var.set(mode)
204     
205     def handle_ping(self):
206         pass
207     
208     def handle_addrme(self, port):
209         self.node.got_addr(('::ffff:' + self.transport.getPeer().host, port), self.other_services, int(time.time()))
210         if random.random() < .7 and self.node.peers:
211             random.choice(self.node.peers.values()).send_addrs(addrs=[dict(address=dict(services=self.other_services, address='::ffff:' + self.transport.getPeer().host, port=port), timestamp=int(time.time()))])
212     def handle_addrs(self, addrs):
213         for addr_record in addrs:
214             self.node.got_addr((addr_record['address']['address'], addr_record['address']['port']), addr_record['address']['services'], min(int(time.time()), addr_record['timestamp']))
215             if random.random() < .7 and self.node.peers:
216                 random.choice(self.node.peers.values()).send_addrs(addrs=[addr_record])
217     def handle_getaddrs(self, count):
218         self.send_addrs(addrs=[
219             dict(
220                 timestamp=self.node.addr_store[host, port][2],
221                 address=dict(
222                     services=self.node.addr_store[host, port][0],
223                     address=host,
224                     port=port,
225                 ),
226             ) for host, port in 
227             random.sample(self.node.addr_store.keys(), min(count, len(self.node.addr_store)))
228         ])
229     
230     def handle_share0s(self, share0s):
231         for share0 in share0s:
232             print share0
233             self.node.handle_share_hash
234     def handle_share1s(self, share1s):
235         for share1 in share1s:
236             hash_ = bitcoin_p2p.block_hash(share1['header'])
237             if hash_ <= conv.bits_to_target(share1['header']['bits']):
238                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
239                 self.transport.loseConnection()
240                 return
241             share1()
242     def handle_share2s(self, share2s):
243         for share2 in share2s:
244             hash_ = bitcoin_p2p.block_hash(share2['header'])
245             if not hash_ <= conv.bits_to_target(share1['header']['bits']):
246                 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
247                 self.transport.loseConnection()
248                 return
249             share1()
250     
251     def send_share(self, share):
252         hash_ = bitcoin_p2p.block_hash(share['header'])
253         if hash_ <= conv.bits_to_target(share['header']['bits']):
254             if 'txns' not in share:
255                 raise ValueError("partial block matching bits passed to send_share")
256             self.send_share2s(share2s=[share])
257         else:
258             if self.mode == 0:
259                 self.send_share0s(share0s=[hash_])
260             elif self.mode == 1:
261                 self.send_share1s(share1s=[dict(
262                     header=share['header'],
263                     gentx=dict(
264                         tx=share['txns'][0],
265                         merkle_branch=bitcoin_p2p.calculate_merkle_branch(share['txns'], 0),
266                     ),
267                 )])
268             else:
269                 raise ValueError(self.mode)
270     
271     def connectionLost(self, reason):
272         if self.node_var_watch is not None:
273             self.node.mode_var.changed.unwatch(self.node_var_watch)
274         
275         if self.connected:
276             self.node.lost_conn(self)
277         
278         if isinstance(self.factory, ClientFactory):
279             draw_line(self.node.port, self.transport.getPeer().port, (255, 0, 0))
280
281 class ServerFactory(protocol.ServerFactory):
282     def __init__(self, node):
283         self.node = node
284     
285     def buildProtocol(self, addr):
286         p = Protocol(self.node)
287         p.factory = self
288         return p
289
290 class ClientFactory(protocol.ClientFactory):
291     def __init__(self, node):
292         self.node = node
293     
294     def buildProtocol(self, addr):
295         p = Protocol(self.node)
296         p.factory = self
297         return p
298     
299     def startedConnecting(self, connector):
300         self.node.attempt_started(connector)
301     
302     def clientConnectionFailed(self, connector, reason):
303         self.node.attempt_failed(connector)
304     
305     def clientConnectionLost(self, connector, reason):
306         self.node.attempt_ended(connector)
307
308 addrdb_key = bitcoin_p2p.ComposedType([
309     ('address', bitcoin_p2p.IPV6AddressType()),
310     ('port', bitcoin_p2p.StructType('>H')),
311 ])
312 addrdb_value = bitcoin_p2p.ComposedType([
313     ('services', bitcoin_p2p.StructType('<Q')),
314     ('first_seen', bitcoin_p2p.StructType('<Q')),
315     ('last_seen', bitcoin_p2p.StructType('<Q')),
316 ])
317
318 class AddrStore(util.DictWrapper):
319     def encode_key(self, (address, port)):
320         return addrdb_key.pack(dict(address=address, port=port))
321     def decode_key(self, encoded_key):
322         k = addrdb_key.unpack(encoded_key)
323         return k['address'], k['port']
324     
325     def encode_value(self, (services, first_seen, last_seen)):
326         return addrdb_value.pack(dict(services=services, first_seen=first_seen, last_seen=last_seen))
327     def decode_value(self, encoded_value):
328         v = addrdb_value.unpack(encoded_value)
329         return v['services'], v['first_seen'], v['last_seen']
330
331 class Node(object):
332     def __init__(self, port, testnet, addr_store=None, preferred_addrs=[], mode=0, desired_peers=10, max_attempts=100):
333         if addr_store is None:
334             addr_store = {}
335         
336         self.port = port
337         self.testnet = testnet
338         self.addr_store = AddrStore(addr_store)
339         self.preferred_addrs = preferred_addrs
340         self.mode_var = util.Variable(mode)
341         self.desired_peers = desired_peers
342         self.max_attempts = max_attempts
343         
344         self.current_work = dict(current_chain=None)
345         
346         self.nonce = random.randrange(2**64)
347         self.attempts = {}
348         self.peers = {}
349         self.running = False
350         
351         draw_circle(self.port)
352     
353     def start(self):
354         if self.running:
355             raise ValueError("already running")
356         
357         self.running = True
358         
359         self.listen_port = reactor.listenTCP(self.port, ServerFactory(self))
360         
361         self._think()
362         self._think2()
363     
364     @defer.inlineCallbacks
365     def _think(self):
366         while self.running:
367             try:
368                 if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)):
369                     if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store):
370                         host, port = random.choice(self.preferred_addrs)
371                     else:
372                         host2, port = random.choice(self.addr_store.keys())
373                         prefix = '::ffff:'
374                         if not host2.startswith(prefix):
375                             raise ValueError("invalid address")
376                         host = host2[len(prefix):]
377                     
378                     if (host, port) not in self.attempts and (host, port) not in self.peers:
379                         reactor.connectTCP(host, port, ClientFactory(self), timeout=10)
380             except:
381                 traceback.print_exc()
382             
383             yield util.sleep(random.expovariate(1/5))
384     
385     @defer.inlineCallbacks
386     def _think2(self):
387         while self.running:
388             try:
389                 if len(self.addr_store) < self.preferred_addrs and self.peers:
390                     random.choice(self.peers.values()).send_getaddrs(count=8)
391             except:
392                 traceback.print_exc()
393             
394             yield util.sleep(random.expovariate(1/20))
395     
396     def stop(self):
397         if not self.running:
398             raise ValueError("already stopped")
399         
400         self.running = False
401         
402         self.listen_port.stopListening()
403     
404     
405     def attempt_started(self, connector):
406         host, port = connector.getDestination().host, connector.getDestination().port
407         if (host, port) in self.attempts:
408             raise ValueError("already have attempt")
409         self.attempts[host, port] = connector
410     
411     def attempt_failed(self, connector):
412         self.attempt_ended(connector)
413     
414     def attempt_ended(self, connector):
415         host, port = connector.getDestination().host, connector.getDestination().port
416         if (host, port) not in self.attempts:
417             raise ValueError("don't have attempt")
418         if connector is not self.attempts[host, port]:
419             raise ValueError("wrong connector")
420         del self.attempts[host, port]
421     
422     
423     def got_conn(self, conn, services):
424         host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
425         if (host, port) in self.peers:
426             raise ValueError("already have peer")
427         self.peers[host, port] = conn
428         
429         print "Connected to peer %s:%i" % (host, port)
430     
431     def lost_conn(self, conn):
432         host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
433         if (host, port) not in self.peers:
434             raise ValueError("don't have peer")
435         if conn is not self.peers[host, port]:
436             raise ValueError("wrong conn")
437         del self.peers[host, port]
438         
439         print "Lost peer %s:%i" % (host, port)
440     
441     
442     def got_addr(self, (host, port), services, timestamp):
443         if (host, port) in self.addr_store:
444             old_services, old_first_seen, old_last_seen = self.addr_store[host, port]
445             self.addr_store[host, port] = services, old_first_seen, max(old_last_seen, timestamp)
446         else:
447             self.addr_store[host, port] = services, timestamp, timestamp
448
449 if __name__ == '__main__':
450     p = random.randrange(2**15, 2**16)
451     for i in xrange(5):
452         p2 = random.randrange(2**15, 2**16)
453         print p, p2
454         n = Node(p2, True, {addrdb_key.pack(dict(address='::ffff:' + '127.0.0.1', port=p)): addrdb_value.pack(dict(services=0, first_seen=int(time.time())-10, last_seen=int(time.time())))})
455         n.start()
456         p = p2
457     
458     reactor.run()