1 from __future__ import division
7 from twisted.internet import defer, reactor, protocol, task
14 # 0: send hash first (high latency, low bandwidth)
15 # 1: send entire share (low latency, high bandwidth)
19 d = pygame.display.set_mode((512, 512))
20 task.LoopingCall(pygame.display.update).start(.1)
21 def draw_circle(id, color=(255,0,0)):
23 pygame.draw.circle(d, (255, 0, 0), (hash(id)%512, hash(id)//512%512), 4)
24 def draw_line(id, id2, color):
26 pygame.draw.line(d, color, (hash(id)%512, hash(id)//512%512), (hash(id2)%512, hash(id2)//512%512))
28 draw_circle = draw_line = lambda *args, **kwargs: None
30 class Protocol(bitcoin_p2p.BaseProtocol):
34 def __init__(self, node):
40 return 'f77cea5d16a2183f'.decode('hex')
42 return '95ec1eda53c5e716'.decode('hex')
47 'version': bitcoin_p2p.ComposedType([
48 ('version', bitcoin_p2p.StructType('<I')),
49 ('services', bitcoin_p2p.StructType('<Q')),
50 ('addr_to', bitcoin_p2p.address),
51 ('addr_from', bitcoin_p2p.address),
52 ('nonce', bitcoin_p2p.StructType('<Q')),
53 ('sub_version', bitcoin_p2p.VarStrType()),
54 ('mode', bitcoin_p2p.StructType('<I')),
55 ('state', bitcoin_p2p.ComposedType([
56 ('chain_id', bitcoin_p2p.ComposedType([
57 ('previous_p2pool_block', bitcoin_p2p.HashType()),
58 ('bits', bitcoin_p2p.StructType('<I')),
60 ('highest', bitcoin_p2p.ComposedType([
61 ('hash', bitcoin_p2p.HashType()),
62 ('height', bitcoin_p2p.StructType('<Q')),
67 'update_mode': bitcoin_p2p.ComposedType([
68 ('mode', bitcoin_p2p.StructType('<I')),
71 'ping': bitcoin_p2p.ComposedType([]),
73 'addrme': bitcoin_p2p.ComposedType([
74 ('port', bitcoin_p2p.StructType('<H')),
76 'addrs': bitcoin_p2p.ComposedType([
77 ('addrs', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
78 ('timestamp', bitcoin_p2p.StructType('<Q')),
79 ('address', bitcoin_p2p.address),
82 'getaddrs': bitcoin_p2p.ComposedType([
83 ('count', bitcoin_p2p.StructType('<I')),
86 'getsharesbychain': bitcoin_p2p.ComposedType([
87 ('chain_id', bitcoin_p2p.ComposedType([
88 ('previous_p2pool_block', bitcoin_p2p.HashType()),
89 ('bits', bitcoin_p2p.StructType('<I')),
91 ('have', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
93 'getshares': bitcoin_p2p.ComposedType([
94 ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
97 'share0s': bitcoin_p2p.ComposedType([
98 ('share0s', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
99 ('chain_id', bitcoin_p2p.ComposedType([
100 ('previous_p2pool_block', bitcoin_p2p.HashType()),
101 ('bits', bitcoin_p2p.StructType('<I')),
103 ('hashes', bitcoin_p2p.ListType(bitcoin_p2p.HashType())),
106 'share1s': bitcoin_p2p.ComposedType([
107 ('share1s', bitcoin_p2p.ListType(bitcoin_p2p.ComposedType([
108 ('header', bitcoin_p2p.block_header),
109 ('gentx', bitcoin_p2p.ComposedType([
110 ('tx', bitcoin_p2p.tx),
111 ('merkle_branch', bitcoin_p2p.merkle_branch),
115 'share2s': bitcoin_p2p.ComposedType([
116 ('share1s', bitcoin_p2p.ListType(bitcoin_p2p.block)),
121 node_var_watch = None
125 return min(self.node.mode_var.value, self.other_mode_var.value)
127 def connectionMade(self):
128 bitcoin_p2p.BaseProtocol.connectionMade(self)
129 if isinstance(self.factory, ClientFactory):
130 draw_line(self.node.port, self.transport.getPeer().port, (128, 128, 128))
132 chain = self.node.current_work['current_chain']
134 version=self.version,
138 address='::ffff:' + self.transport.getPeer().host,
139 port=self.transport.getPeer().port,
143 address='::ffff:' + self.transport.getHost().host,
144 port=self.transport.getHost().port,
146 nonce=self.node.nonce,
147 sub_version=self.sub_version,
148 mode=self.node.mode_var.value,
151 previous_p2pool_block=0,
161 self.node_var_watch = self.node.mode_var.changed.watch(lambda new_mode: self.send_set_mode(mode=new_mode))
163 self.connected = False
168 reactor.callLater(10, self._connect_timeout)
170 def _connect_timeout(self):
171 if not self.connected and self.transport.connected:
172 print "Handshake timed out, disconnecting"
173 self.transport.loseConnection()
175 @defer.inlineCallbacks
177 while self.transport.connected:
179 yield util.sleep(random.expovariate(1/100))
181 @defer.inlineCallbacks
183 while self.transport.connected:
184 self.send_addrme(port=self.node.port)
185 yield util.sleep(random.expovariate(1/100))
187 def handle_version(self, version, services, addr_to, addr_from, nonce, sub_version, mode, state):
188 self.other_version = version
189 self.other_services = services
190 self.other_mode_var = util.Variable(mode)
192 if nonce == self.node.nonce:
193 #print "Detected connection to self, disconnecting"
194 self.transport.loseConnection()
197 self.connected = True
198 self.node.got_conn(self, services)
199 if isinstance(self.factory, ClientFactory):
200 draw_line(self.node.port, self.transport.getPeer().port, (0, 255, 0))
202 def handle_set_mode(self, mode):
203 self.other_mode_var.set(mode)
205 def handle_ping(self):
208 def handle_addrme(self, port):
209 self.node.got_addr(('::ffff:' + self.transport.getPeer().host, port), self.other_services, int(time.time()))
210 if random.random() < .7 and self.node.peers:
211 random.choice(self.node.peers.values()).send_addrs(addrs=[dict(address=dict(services=self.other_services, address='::ffff:' + self.transport.getPeer().host, port=port), timestamp=int(time.time()))])
212 def handle_addrs(self, addrs):
213 for addr_record in addrs:
214 self.node.got_addr((addr_record['address']['address'], addr_record['address']['port']), addr_record['address']['services'], min(int(time.time()), addr_record['timestamp']))
215 if random.random() < .7 and self.node.peers:
216 random.choice(self.node.peers.values()).send_addrs(addrs=[addr_record])
217 def handle_getaddrs(self, count):
218 self.send_addrs(addrs=[
220 timestamp=self.node.addr_store[host, port][2],
222 services=self.node.addr_store[host, port][0],
227 random.sample(self.node.addr_store.keys(), min(count, len(self.node.addr_store)))
230 def handle_share0s(self, share0s):
231 for share0 in share0s:
233 self.node.handle_share_hash
234 def handle_share1s(self, share1s):
235 for share1 in share1s:
236 hash_ = bitcoin_p2p.block_hash(share1['header'])
237 if hash_ <= conv.bits_to_target(share1['header']['bits']):
238 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
239 self.transport.loseConnection()
242 def handle_share2s(self, share2s):
243 for share2 in share2s:
244 hash_ = bitcoin_p2p.block_hash(share2['header'])
245 if not hash_ <= conv.bits_to_target(share1['header']['bits']):
246 print "Dropping peer %s:%i due to invalid share" % (self.transport.getPeer().host, self.transport.getPeer().port)
247 self.transport.loseConnection()
251 def send_share(self, share):
252 hash_ = bitcoin_p2p.block_hash(share['header'])
253 if hash_ <= conv.bits_to_target(share['header']['bits']):
254 if 'txns' not in share:
255 raise ValueError("partial block matching bits passed to send_share")
256 self.send_share2s(share2s=[share])
259 self.send_share0s(share0s=[hash_])
261 self.send_share1s(share1s=[dict(
262 header=share['header'],
265 merkle_branch=bitcoin_p2p.calculate_merkle_branch(share['txns'], 0),
269 raise ValueError(self.mode)
271 def connectionLost(self, reason):
272 if self.node_var_watch is not None:
273 self.node.mode_var.changed.unwatch(self.node_var_watch)
276 self.node.lost_conn(self)
278 if isinstance(self.factory, ClientFactory):
279 draw_line(self.node.port, self.transport.getPeer().port, (255, 0, 0))
281 class ServerFactory(protocol.ServerFactory):
282 def __init__(self, node):
285 def buildProtocol(self, addr):
286 p = Protocol(self.node)
290 class ClientFactory(protocol.ClientFactory):
291 def __init__(self, node):
294 def buildProtocol(self, addr):
295 p = Protocol(self.node)
299 def startedConnecting(self, connector):
300 self.node.attempt_started(connector)
302 def clientConnectionFailed(self, connector, reason):
303 self.node.attempt_failed(connector)
305 def clientConnectionLost(self, connector, reason):
306 self.node.attempt_ended(connector)
308 addrdb_key = bitcoin_p2p.ComposedType([
309 ('address', bitcoin_p2p.IPV6AddressType()),
310 ('port', bitcoin_p2p.StructType('>H')),
312 addrdb_value = bitcoin_p2p.ComposedType([
313 ('services', bitcoin_p2p.StructType('<Q')),
314 ('first_seen', bitcoin_p2p.StructType('<Q')),
315 ('last_seen', bitcoin_p2p.StructType('<Q')),
318 class AddrStore(util.DictWrapper):
319 def encode_key(self, (address, port)):
320 return addrdb_key.pack(dict(address=address, port=port))
321 def decode_key(self, encoded_key):
322 k = addrdb_key.unpack(encoded_key)
323 return k['address'], k['port']
325 def encode_value(self, (services, first_seen, last_seen)):
326 return addrdb_value.pack(dict(services=services, first_seen=first_seen, last_seen=last_seen))
327 def decode_value(self, encoded_value):
328 v = addrdb_value.unpack(encoded_value)
329 return v['services'], v['first_seen'], v['last_seen']
332 def __init__(self, port, testnet, addr_store=None, preferred_addrs=[], mode=0, desired_peers=10, max_attempts=100):
333 if addr_store is None:
337 self.testnet = testnet
338 self.addr_store = AddrStore(addr_store)
339 self.preferred_addrs = preferred_addrs
340 self.mode_var = util.Variable(mode)
341 self.desired_peers = desired_peers
342 self.max_attempts = max_attempts
344 self.current_work = dict(current_chain=None)
346 self.nonce = random.randrange(2**64)
351 draw_circle(self.port)
355 raise ValueError("already running")
359 self.listen_port = reactor.listenTCP(self.port, ServerFactory(self))
364 @defer.inlineCallbacks
368 if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)):
369 if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store):
370 host, port = random.choice(self.preferred_addrs)
372 host2, port = random.choice(self.addr_store.keys())
374 if not host2.startswith(prefix):
375 raise ValueError("invalid address")
376 host = host2[len(prefix):]
378 if (host, port) not in self.attempts and (host, port) not in self.peers:
379 reactor.connectTCP(host, port, ClientFactory(self), timeout=10)
381 traceback.print_exc()
383 yield util.sleep(random.expovariate(1/5))
385 @defer.inlineCallbacks
389 if len(self.addr_store) < self.preferred_addrs and self.peers:
390 random.choice(self.peers.values()).send_getaddrs(count=8)
392 traceback.print_exc()
394 yield util.sleep(random.expovariate(1/20))
398 raise ValueError("already stopped")
402 self.listen_port.stopListening()
405 def attempt_started(self, connector):
406 host, port = connector.getDestination().host, connector.getDestination().port
407 if (host, port) in self.attempts:
408 raise ValueError("already have attempt")
409 self.attempts[host, port] = connector
411 def attempt_failed(self, connector):
412 self.attempt_ended(connector)
414 def attempt_ended(self, connector):
415 host, port = connector.getDestination().host, connector.getDestination().port
416 if (host, port) not in self.attempts:
417 raise ValueError("don't have attempt")
418 if connector is not self.attempts[host, port]:
419 raise ValueError("wrong connector")
420 del self.attempts[host, port]
423 def got_conn(self, conn, services):
424 host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
425 if (host, port) in self.peers:
426 raise ValueError("already have peer")
427 self.peers[host, port] = conn
429 print "Connected to peer %s:%i" % (host, port)
431 def lost_conn(self, conn):
432 host, port = conn.transport.getPeer().host, conn.transport.getPeer().port
433 if (host, port) not in self.peers:
434 raise ValueError("don't have peer")
435 if conn is not self.peers[host, port]:
436 raise ValueError("wrong conn")
437 del self.peers[host, port]
439 print "Lost peer %s:%i" % (host, port)
442 def got_addr(self, (host, port), services, timestamp):
443 if (host, port) in self.addr_store:
444 old_services, old_first_seen, old_last_seen = self.addr_store[host, port]
445 self.addr_store[host, port] = services, old_first_seen, max(old_last_seen, timestamp)
447 self.addr_store[host, port] = services, timestamp, timestamp
449 if __name__ == '__main__':
450 p = random.randrange(2**15, 2**16)
452 p2 = random.randrange(2**15, 2**16)
454 n = Node(p2, True, {addrdb_key.pack(dict(address='::ffff:' + '127.0.0.1', port=p)): addrdb_value.pack(dict(services=0, first_seen=int(time.time())-10, last_seen=int(time.time())))})