cleaned indentation, imports
[p2pool.git] / p2p.py
1 import random
2 import time
3 import traceback
4
5 from entangled.kademlia import node, encoding, protocol
6 from twisted.internet import defer
7
8 import util
9
10 class CustomBencode(encoding.Bencode):
11     def __init__(self, prefix=""):
12         self.prefix = prefix
13     
14     def encode(self, data):
15         return self.prefix + encoding.Bencode.encode(data)
16     
17     def decode(self, data):
18         if not data.startswith(self.prefix):
19             raise ValueError("invalid prefix")
20         return encoding.Bencode.decode(data[len(self.prefix):])
21
22 class Node(node.Node):
23     @property
24     def peers(self):
25         for bucket in self._routingTable._buckets:
26             for contact in bucket._contacts:
27                 yield contact
28     
29     def __init__(self, blockCallback, **kwargs):
30         node.Node.__init__(self, networkProtocol=protocol.KademliaProtocol(self, msgEncoder=CustomBencode("p2pool")), **kwargs)
31         self.blockCallback = blockCallback
32         self.clock_offset = 0
33     
34     # time
35     
36     def joinNetwork(self, *args, **kwargs):
37         node.Node.joinNetwork(self, *args, **kwargs)
38         
39         def go(res):
40             self.joined()
41             return res
42         self._joinDeferred.addBoth(go)
43     
44     def joined(self):
45         self.time_task()
46     
47     def get_my_time(self):
48         return time.time() - self.clock_offset
49     
50     @node.rpcmethod
51     def get_time(self):
52         return time.time()
53     
54     @defer.inlineCallbacks
55     def time_task(self):
56         while True:
57             t_send = time.time()
58             clock_deltas = {None: (t_send, t_send)}
59             for peer, request in [(peer, peer.get_time().addCallback(lambda res: (time.time(), res))) for peer in self.peers]:
60                 try:
61                     t_recv, response = yield request
62                     t = (t_send + t_recv)/2
63                     clock_deltas[(peer.id, peer.address, peer.port)] = (t, float(response))
64                 except:
65                     traceback.print_exc()
66                     continue
67             
68             self.clock_offset = util.median(mine - theirs for mine, theirs in clock_deltas.itervalues())
69             
70             yield util.sleep(random.expovariate(1/500.))
71     
72     # disable data storage
73     
74     @node.rpcmethod
75     def store(self, key, value, originalPublisherID=None, age=0, **kwargs):
76         return
77     
78     @node.rpcmethod
79     def findValue(self, key, value, originalPublisherID=None, age=0, **kwargs):
80         return
81     
82     def _republishData(self, *args):
83         return defer.succeed(None)
84     
85     # meat
86     
87     @node.rpcmethod
88     def block(self, block_data):
89         self.blockCallback(block_data)