changing layout, lots of things. currently broken
[p2pool.git] / p2pool / util.py
1 from __future__ import division
2
3 import collections
4 import hashlib
5 import itertools
6 import random
7
8 from twisted.internet import defer, reactor
9 from twisted.python import failure
10 from twisted.web import resource, server
11
12 class DeferredResource(resource.Resource):
13     def render(self, request):
14         def finish(x):
15             if request._disconnected:
16                 return
17             if x is not None:
18                 request.write(x)
19             request.finish()
20         
21         def finish_error(fail):
22             if request._disconnected:
23                 return
24             request.setResponseCode(500) # won't do anything if already written to
25             request.write('---ERROR---')
26             request.finish()
27             fail.printTraceback()
28         
29         defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
30         return server.NOT_DONE_YET
31
32 class Event(object):
33     def __init__(self):
34         self.observers = {}
35         self.one_time_observers = {}
36         self.id_generator = itertools.count()
37     
38     def watch(self, func):
39         id = self.id_generator.next()
40         self.observers[id] = func
41         return id
42     def unwatch(self, id):
43         self.observers.pop(id)
44     
45     def watch_one_time(self, func):
46         id = self.id_generator.next()
47         self.one_time_observers[id] = func
48         return id
49     def unwatch_one_time(self, id):
50         self.one_time_observers.pop(id)
51     
52     def happened(self, event=None):
53         for func in self.observers.itervalues():
54             func(event)
55         
56         one_time_observers = self.one_time_observers
57         self.one_time_observers = {}
58         for func in one_time_observers.itervalues():
59             func(event)
60     
61     def get_deferred(self):
62         df = defer.Deferred()
63         self.watch_one_time(df.callback)
64         return df
65
66 class Variable(object):
67     def __init__(self, value):
68         self.value = value
69         self.changed = Event()
70     
71     def set(self, value):
72         if value == self.value:
73             return
74         
75         self.value = value
76         self.changed.happened(value)
77     
78     def get_not_none(self):
79         if self.value is not None:
80             return defer.succeed(self.value)
81         else:
82             df = defer.Deferred()
83             self.changed.watch_one_time(df.callback)
84             return df
85
86 def sleep(t):
87     d = defer.Deferred()
88     reactor.callLater(t, d.callback, None)
89     return d
90
91 def median(x):
92     # don't really need a complex algorithm here
93     y = sorted(x)
94     left = (len(y) - 1)//2
95     right = len(y)//2
96     return (y[left] + y[right])/2
97
98 class StringBuffer(object):
99     'Buffer manager with great worst-case behavior'
100     
101     def __init__(self, data=''):
102         self.buf = collections.deque([data])
103         self.buf_len = len(data)
104         self.pos = 0
105     
106     def __len__(self):
107         return self.buf_len - self.pos
108     
109     def add(self, data):
110         self.buf.append(data)
111         self.buf_len += len(data)
112     
113     def get(self, wants):
114         if self.buf_len - self.pos < wants:
115             raise IndexError('not enough data')
116         data = []
117         while wants:
118             seg = self.buf[0][self.pos:self.pos+wants]
119             self.pos += len(seg)
120             while self.buf and self.pos >= len(self.buf[0]):
121                 x = self.buf.popleft()
122                 self.buf_len -= len(x)
123                 self.pos -= len(x)
124             
125             data.append(seg)
126             wants -= len(seg)
127         return ''.join(data)
128
129 def _DataChunker(receiver):
130     wants = receiver.next()
131     buf = StringBuffer()
132     
133     while True:
134         if len(buf) >= wants:
135             wants = receiver.send(buf.get(wants))
136         else:
137             buf.add((yield))
138 def DataChunker(receiver):
139     '''
140     Produces a function that accepts data that is input into a generator
141     (receiver) in response to the receiver yielding the size of data to wait on
142     '''
143     x = _DataChunker(receiver)
144     x.next()
145     return x.send
146
147 class ReplyMatcher(object):
148     def __init__(self, func, timeout=5):
149         self.func = func
150         self.timeout = timeout
151         self.map = {}
152     
153     def __call__(self, id):
154         try:
155             self.func(id)
156             uniq = random.randrange(2**256)
157             df = defer.Deferred()
158             def timeout():
159                 df, timer = self.map[id].pop(uniq)
160                 df.errback(failure.Failure(defer.TimeoutError()))
161                 if not self.map[id]:
162                     del self.map[id]
163             self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
164             return df
165         except:
166             import traceback
167             traceback.print_exc()
168     
169     def got_response(self, id, resp):
170         if id not in self.map:
171             return
172         for df, timer in self.map.pop(id).itervalues():
173             timer.cancel()
174             df.callback(resp)
175
176 class GenericDeferrer(object):
177     def __init__(self, max_id, func, timeout=5):
178         self.max_id = max_id
179         self.func = func
180         self.timeout = timeout
181         self.map = {}
182     
183     def __call__(self, *args, **kwargs):
184         while True:
185             id = random.randrange(self.max_id)
186             if id not in self.map:
187                 break
188         df = defer.Deferred()
189         def timeout():
190             self.map.pop(id)
191             df.errback(failure.Failure(defer.TimeoutError()))
192         timer = reactor.callLater(self.timeout, timeout)
193         self.func(id, *args, **kwargs)
194         self.map[id] = df, timer
195         return df
196     
197     def got_response(self, id, resp):
198         if id not in self.map:
199             return
200         df, timer = self.map.pop(id)
201         timer.cancel()
202         df.callback(resp)
203
204 class NotNowError(Exception):
205     pass
206
207 class DeferredCacher(object):
208     def __init__(self, func, backing=None):
209         if backing is None:
210             backing = {}
211         
212         self.func = func
213         self.backing = backing
214         self.waiting = {}
215     
216     @defer.inlineCallbacks
217     def __call__(self, key):
218         if key in self.waiting:
219             yield self.waiting[key]
220         
221         if key in self.backing:
222             defer.returnValue(self.backing[key])
223         else:
224             self.waiting[key] = defer.Deferred()
225             try:
226                 value = yield self.func(key)
227             finally:
228                 self.waiting.pop(key).callback(None)
229         
230         self.backing[key] = value
231         defer.returnValue(value)
232     
233     def call_now(self, key):
234         if key in self.waiting:
235             raise NotNowError()
236         
237         if key in self.backing:
238             return self.backing[key]
239         else:
240             self.waiting[key] = defer.Deferred()
241             def cb(value):
242                 self.backing[key] = value
243                 self.waiting.pop(key).callback(None)
244             def eb(fail):
245                 self.waiting.pop(key).callback(None)
246                 fail.printTraceback()
247             self.func(key).addCallback(cb).addErrback(eb)
248             raise NotNowError()
249
250 def pubkey_to_address(pubkey, testnet):
251     if len(pubkey) != 65:
252         raise ValueError('invalid pubkey')
253     version = 111 if testnet else 0
254     key_hash = chr(version) + hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
255     checksum = hashlib.sha256(hashlib.sha256(key_hash).digest()).digest()[:4]
256     return base58_encode(key_hash + checksum)
257
258 def base58_encode(data):
259     return '1'*(len(data) - len(data.lstrip(chr(0)))) + natural_to_string(string_to_natural(data), '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz')
260
261 def natural_to_string(n, alphabet=None, min_width=1):
262     if alphabet is None:
263         s = '%x' % (n,)
264         if len(s) % 2:
265             s = '\0' + x
266         return s.decode('hex').rjust(min_width, '\x00')
267     res = []
268     while n:
269         n, x = divmod(n, len(alphabet))
270         res.append(alphabet[x])
271     res.reverse()
272     return ''.join(res).rjust(min_width, '\x00')
273
274 def string_to_natural(s, alphabet=None):
275     if alphabet is None:
276         s = s.encode('hex')
277         return int(s, 16)
278     if not s or (s != alphabet[0] and s.startswith(alphabet[0])):
279         raise ValueError()
280     return sum(alphabet.index(char) * len(alphabet)**i for i, char in enumerate(reversed(s)))
281
282
283 class DictWrapper(object):
284     def encode_key(self, key):
285         return key
286     def decode_key(self, encoded_key):
287         return encoded_key
288     def encode_value(self, value):
289         return value
290     def decode_value(self, encoded_value):
291         return encoded_value
292     
293     def __init__(self, inner):
294         self.inner = inner
295     
296     def __len__(self):
297         return len(self.inner)
298     
299     def __contains__(self, key):
300         return self.encode_key(key) in self.inner
301     
302     def __getitem__(self, key):
303         return self.decode_value(self.inner[self.encode_key(key)])
304     def __setitem__(self, key, value):
305         self.inner[self.encode_key(key)] = self.encode_value(value)
306     def __delitem__(self, key):
307         del self.inner[self.encode_key(key)]
308     
309     def __iter__(self):
310         for encoded_key in self.inner:
311             yield self.decode_key(encoded_key)
312     def iterkeys(self):
313         return iter(self)
314     def keys(self):
315         return list(self.iterkeys())
316     
317     def itervalue(self):
318         for encoded_value in self.inner.itervalues():
319             yield self.decode_value(encoded_value)
320     def values(self):
321         return list(self.itervalue())
322     
323     def iteritems(self):
324         for key, value in self.inner.iteritems():
325             yield self.decode_key(key), self.decode_value(value)
326     def items(self):
327         return list(self.iteritems())
328
329 def update_dict(d, **replace):
330     d = d.copy()
331     for k, v in replace.iteritems():
332         if v is None:
333             del d[k]
334         else:
335             d[k] = v
336     return d