71fe633fb2d3044d950159575bcfc2a1b591d5dd
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4 import random
5 import sys
6
7 from twisted.internet import defer, reactor
8 from twisted.python import failure, log
9
10 def sleep(t):
11     d = defer.Deferred()
12     reactor.callLater(t, d.callback, None)
13     return d
14
15 def run_repeatedly(f, *args, **kwargs):
16     current_dc = [None]
17     def step():
18         delay = f(*args, **kwargs)
19         current_dc[0] = reactor.callLater(delay, step)
20     step()
21     def stop():
22         current_dc[0].cancel()
23     return stop
24
25 class RetrySilentlyException(Exception):
26     pass
27
28 def retry(message='Error:', delay=3, max_retries=None, traceback=True):
29     '''
30     @retry('Error getting block:', 1)
31     @defer.inlineCallbacks
32     def get_block(hash):
33         ...
34     '''
35     
36     def retry2(func):
37         @defer.inlineCallbacks
38         def f(*args, **kwargs):
39             for i in itertools.count():
40                 try:
41                     result = yield func(*args, **kwargs)
42                 except Exception, e:
43                     if i == max_retries:
44                         raise
45                     if not isinstance(e, RetrySilentlyException):
46                         if traceback:
47                             log.err(None, message)
48                         else:
49                             print >>sys.stderr, message, e
50                     yield sleep(delay)
51                 else:
52                     defer.returnValue(result)
53         return f
54     return retry2
55
56 class ReplyMatcher(object):
57     '''
58     Converts request/got response interface to deferred interface
59     '''
60     
61     def __init__(self, func, timeout=5):
62         self.func = func
63         self.timeout = timeout
64         self.map = {}
65     
66     def __call__(self, id):
67         if id not in self.map:
68             self.func(id)
69         df = defer.Deferred()
70         def timeout():
71             self.map[id].remove((df, timer))
72             if not self.map[id]:
73                 del self.map[id]
74             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
75         timer = reactor.callLater(self.timeout, timeout)
76         self.map.setdefault(id, set()).add((df, timer))
77         return df
78     
79     def got_response(self, id, resp):
80         if id not in self.map:
81             return
82         for df, timer in self.map.pop(id):
83             df.callback(resp)
84             timer.cancel()
85
86 class GenericDeferrer(object):
87     '''
88     Converts query with identifier/got response interface to deferred interface
89     '''
90     
91     def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
92         self.max_id = max_id
93         self.func = func
94         self.timeout = timeout
95         self.on_timeout = on_timeout
96         self.map = {}
97     
98     def __call__(self, *args, **kwargs):
99         while True:
100             id = random.randrange(self.max_id)
101             if id not in self.map:
102                 break
103         def cancel(df):
104             df, timer = self.map.pop(id)
105             timer.cancel()
106         df = defer.Deferred(cancel)
107         def timeout():
108             self.map.pop(id)
109             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
110             self.on_timeout()
111         timer = reactor.callLater(self.timeout, timeout)
112         self.map[id] = df, timer
113         self.func(id, *args, **kwargs)
114         return df
115     
116     def got_response(self, id, resp):
117         if id not in self.map:
118             return
119         df, timer = self.map.pop(id)
120         timer.cancel()
121         df.callback(resp)
122     
123     def respond_all(self, resp):
124         while self.map:
125             id, (df, timer) = self.map.popitem()
126             timer.cancel()
127             df.errback(resp)
128
129 class NotNowError(Exception):
130     pass
131
132 class DeferredCacher(object):
133     '''
134     like memoize, but for functions that return Deferreds
135     
136     @DeferredCacher
137     def f(x):
138         ...
139         return df
140     
141     @DeferredCacher.with_backing(bsddb.hashopen(...))
142     def f(x):
143         ...
144         return df
145     '''
146     
147     @classmethod
148     def with_backing(cls, backing):
149         return lambda func: cls(func, backing)
150     
151     def __init__(self, func, backing=None):
152         if backing is None:
153             backing = {}
154         
155         self.func = func
156         self.backing = backing
157         self.waiting = {}
158     
159     @defer.inlineCallbacks
160     def __call__(self, key):
161         if key in self.waiting:
162             yield self.waiting[key]
163         
164         if key in self.backing:
165             defer.returnValue(self.backing[key])
166         else:
167             self.waiting[key] = defer.Deferred()
168             try:
169                 value = yield self.func(key)
170             finally:
171                 self.waiting.pop(key).callback(None)
172         
173         self.backing[key] = value
174         defer.returnValue(value)
175     
176     _nothing = object()
177     def call_now(self, key, default=_nothing):
178         if key in self.backing:
179             return self.backing[key]
180         if key not in self.waiting:
181             self.waiting[key] = defer.Deferred()
182             def cb(value):
183                 self.backing[key] = value
184                 self.waiting.pop(key).callback(None)
185             def eb(fail):
186                 self.waiting.pop(key).callback(None)
187                 if fail.check(RetrySilentlyException):
188                     return
189                 print
190                 print 'Error when requesting noncached value:'
191                 fail.printTraceback()
192                 print
193             self.func(key).addCallback(cb).addErrback(eb)
194         if default is not self._nothing:
195             return default
196         raise NotNowError(key)