replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and...
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4 import random
5 import sys
6
7 from twisted.internet import defer, reactor
8 from twisted.python import failure, log
9
10 def sleep(t):
11     d = defer.Deferred(canceller=lambda d_: dc.cancel())
12     dc = reactor.callLater(t, d.callback, None)
13     return d
14
15 def run_repeatedly(f, *args, **kwargs):
16     current_dc = [None]
17     def step():
18         delay = f(*args, **kwargs)
19         current_dc[0] = reactor.callLater(delay, step)
20     step()
21     def stop():
22         current_dc[0].cancel()
23     return stop
24
25 class RetrySilentlyException(Exception):
26     pass
27
28 def retry(message='Error:', delay=3, max_retries=None, traceback=True):
29     '''
30     @retry('Error getting block:', 1)
31     @defer.inlineCallbacks
32     def get_block(hash):
33         ...
34     '''
35     
36     def retry2(func):
37         @defer.inlineCallbacks
38         def f(*args, **kwargs):
39             for i in itertools.count():
40                 try:
41                     result = yield func(*args, **kwargs)
42                 except Exception, e:
43                     if i == max_retries:
44                         raise
45                     if not isinstance(e, RetrySilentlyException):
46                         if traceback:
47                             log.err(None, message)
48                         else:
49                             print >>sys.stderr, message, e
50                     yield sleep(delay)
51                 else:
52                     defer.returnValue(result)
53         return f
54     return retry2
55
56 class ReplyMatcher(object):
57     '''
58     Converts request/got response interface to deferred interface
59     '''
60     
61     def __init__(self, func, timeout=5):
62         self.func = func
63         self.timeout = timeout
64         self.map = {}
65     
66     def __call__(self, id):
67         if id not in self.map:
68             self.func(id)
69         df = defer.Deferred()
70         def timeout():
71             self.map[id].remove((df, timer))
72             if not self.map[id]:
73                 del self.map[id]
74             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
75         timer = reactor.callLater(self.timeout, timeout)
76         self.map.setdefault(id, set()).add((df, timer))
77         return df
78     
79     def got_response(self, id, resp):
80         if id not in self.map:
81             return
82         for df, timer in self.map.pop(id):
83             df.callback(resp)
84             timer.cancel()
85
86 class GenericDeferrer(object):
87     '''
88     Converts query with identifier/got response interface to deferred interface
89     '''
90     
91     def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
92         self.max_id = max_id
93         self.func = func
94         self.timeout = timeout
95         self.on_timeout = on_timeout
96         self.map = {}
97     
98     def __call__(self, *args, **kwargs):
99         while True:
100             id = random.randrange(self.max_id)
101             if id not in self.map:
102                 break
103         def cancel(df):
104             df, timer = self.map.pop(id)
105             timer.cancel()
106         try:
107             df = defer.Deferred(cancel)
108         except TypeError:
109             df = defer.Deferred() # handle older versions of Twisted
110         def timeout():
111             self.map.pop(id)
112             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
113             self.on_timeout()
114         timer = reactor.callLater(self.timeout, timeout)
115         self.map[id] = df, timer
116         self.func(id, *args, **kwargs)
117         return df
118     
119     def got_response(self, id, resp):
120         if id not in self.map:
121             return
122         df, timer = self.map.pop(id)
123         timer.cancel()
124         df.callback(resp)
125     
126     def respond_all(self, resp):
127         while self.map:
128             id, (df, timer) = self.map.popitem()
129             timer.cancel()
130             df.errback(resp)
131
132 class NotNowError(Exception):
133     pass
134
135 class DeferredCacher(object):
136     '''
137     like memoize, but for functions that return Deferreds
138     
139     @DeferredCacher
140     def f(x):
141         ...
142         return df
143     
144     @DeferredCacher.with_backing(bsddb.hashopen(...))
145     def f(x):
146         ...
147         return df
148     '''
149     
150     @classmethod
151     def with_backing(cls, backing):
152         return lambda func: cls(func, backing)
153     
154     def __init__(self, func, backing=None):
155         if backing is None:
156             backing = {}
157         
158         self.func = func
159         self.backing = backing
160         self.waiting = {}
161     
162     @defer.inlineCallbacks
163     def __call__(self, key):
164         if key in self.waiting:
165             yield self.waiting[key]
166         
167         if key in self.backing:
168             defer.returnValue(self.backing[key])
169         else:
170             self.waiting[key] = defer.Deferred()
171             try:
172                 value = yield self.func(key)
173             finally:
174                 self.waiting.pop(key).callback(None)
175         
176         self.backing[key] = value
177         defer.returnValue(value)
178     
179     _nothing = object()
180     def call_now(self, key, default=_nothing):
181         if key in self.backing:
182             return self.backing[key]
183         if key not in self.waiting:
184             self.waiting[key] = defer.Deferred()
185             def cb(value):
186                 self.backing[key] = value
187                 self.waiting.pop(key).callback(None)
188             def eb(fail):
189                 self.waiting.pop(key).callback(None)
190                 if fail.check(RetrySilentlyException):
191                     return
192                 print
193                 print 'Error when requesting noncached value:'
194                 fail.printTraceback()
195                 print
196             self.func(key).addCallback(cb).addErrback(eb)
197         if default is not self._nothing:
198             return default
199         raise NotNowError(key)
200
201 def deferred_has_been_called(df):
202     still_running = True
203     res2 = []
204     def cb(res):
205         if still_running:
206             res2[:] = [res]
207         else:
208             return res
209     df.addBoth(cb)
210     still_running = False
211     if res2:
212         return True, res2[0]
213     return False, None
214 def inlineCallbacks(f):
215     from functools import wraps
216     @wraps(f)
217     def _(*args, **kwargs):
218         gen = f(*args, **kwargs)
219         stop_running = [False]
220         def cancelled(df_):
221             assert df_ is df
222             stop_running[0] = True
223             if currently_waiting_on:
224                 currently_waiting_on[0].cancel()
225         df = defer.Deferred(cancelled)
226         currently_waiting_on = []
227         def it(cur):
228             while True:
229                 try:
230                     if isinstance(cur, failure.Failure):
231                         res = cur.throwExceptionIntoGenerator(gen) # external code is run here
232                     else:
233                         res = gen.send(cur) # external code is run here
234                     if stop_running[0]:
235                         return
236                 except StopIteration:
237                     df.callback(None)
238                 except defer._DefGen_Return as e:
239                     # XXX should make sure direct child threw
240                     df.callback(e.value)
241                 except:
242                     df.errback()
243                 else:
244                     if isinstance(res, defer.Deferred):
245                         called, res2 = deferred_has_been_called(res)
246                         if called:
247                             cur = res2
248                             continue
249                         else:
250                             currently_waiting_on[:] = [res]
251                             def gotResult(res2):
252                                 assert currently_waiting_on[0] is res
253                                 currently_waiting_on[:] = []
254                                 if stop_running[0]:
255                                     return
256                                 it(res2)
257                             res.addBoth(gotResult) # external code is run between this and gotResult
258                     else:
259                         cur = res
260                         continue
261                 break
262         it(None)
263         return df
264     return _
265
266
267
268 class RobustLoopingCall(object):
269     def __init__(self, func, *args, **kwargs):
270         self.func, self.args, self.kwargs = func, args, kwargs
271         
272         self.running = False
273     
274     def start(self, period):
275         assert not self.running
276         self.running = True
277         self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
278     
279     @inlineCallbacks
280     def _worker(self, period):
281         assert self.running
282         while self.running:
283             try:
284                 self.func(*self.args, **self.kwargs)
285             except:
286                 log.err()
287             yield sleep(period)
288     
289     def stop(self):
290         assert self.running
291         self.running = False
292         self._df.cancel()
293         return self._df