1 from __future__ import division
7 from twisted.internet import defer, reactor
8 from twisted.python import failure, log
11 d = defer.Deferred(canceller=lambda d_: dc.cancel())
12 dc = reactor.callLater(t, d.callback, None)
15 def run_repeatedly(f, *args, **kwargs):
18 delay = f(*args, **kwargs)
19 current_dc[0] = reactor.callLater(delay, step)
22 current_dc[0].cancel()
25 class RetrySilentlyException(Exception):
28 def retry(message='Error:', delay=3, max_retries=None, traceback=True):
30 @retry('Error getting block:', 1)
31 @defer.inlineCallbacks
37 @defer.inlineCallbacks
38 def f(*args, **kwargs):
39 for i in itertools.count():
41 result = yield func(*args, **kwargs)
45 if not isinstance(e, RetrySilentlyException):
47 log.err(None, message)
49 print >>sys.stderr, message, e
52 defer.returnValue(result)
56 class ReplyMatcher(object):
58 Converts request/got response interface to deferred interface
61 def __init__(self, func, timeout=5):
63 self.timeout = timeout
66 def __call__(self, id):
67 if id not in self.map:
71 self.map[id].remove((df, timer))
74 df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
75 timer = reactor.callLater(self.timeout, timeout)
76 self.map.setdefault(id, set()).add((df, timer))
79 def got_response(self, id, resp):
80 if id not in self.map:
82 for df, timer in self.map.pop(id):
86 class GenericDeferrer(object):
88 Converts query with identifier/got response interface to deferred interface
91 def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
94 self.timeout = timeout
95 self.on_timeout = on_timeout
98 def __call__(self, *args, **kwargs):
100 id = random.randrange(self.max_id)
101 if id not in self.map:
104 df, timer = self.map.pop(id)
107 df = defer.Deferred(cancel)
109 df = defer.Deferred() # handle older versions of Twisted
112 df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
114 timer = reactor.callLater(self.timeout, timeout)
115 self.map[id] = df, timer
116 self.func(id, *args, **kwargs)
119 def got_response(self, id, resp):
120 if id not in self.map:
122 df, timer = self.map.pop(id)
126 def respond_all(self, resp):
128 id, (df, timer) = self.map.popitem()
132 class NotNowError(Exception):
135 class DeferredCacher(object):
137 like memoize, but for functions that return Deferreds
144 @DeferredCacher.with_backing(bsddb.hashopen(...))
151 def with_backing(cls, backing):
152 return lambda func: cls(func, backing)
154 def __init__(self, func, backing=None):
159 self.backing = backing
162 @defer.inlineCallbacks
163 def __call__(self, key):
164 if key in self.waiting:
165 yield self.waiting[key]
167 if key in self.backing:
168 defer.returnValue(self.backing[key])
170 self.waiting[key] = defer.Deferred()
172 value = yield self.func(key)
174 self.waiting.pop(key).callback(None)
176 self.backing[key] = value
177 defer.returnValue(value)
180 def call_now(self, key, default=_nothing):
181 if key in self.backing:
182 return self.backing[key]
183 if key not in self.waiting:
184 self.waiting[key] = defer.Deferred()
186 self.backing[key] = value
187 self.waiting.pop(key).callback(None)
189 self.waiting.pop(key).callback(None)
190 if fail.check(RetrySilentlyException):
193 print 'Error when requesting noncached value:'
194 fail.printTraceback()
196 self.func(key).addCallback(cb).addErrback(eb)
197 if default is not self._nothing:
199 raise NotNowError(key)
201 def deferred_has_been_called(df):
210 still_running = False
214 def inlineCallbacks(f):
215 from functools import wraps
217 def _(*args, **kwargs):
218 gen = f(*args, **kwargs)
219 stop_running = [False]
222 stop_running[0] = True
223 if currently_waiting_on:
224 currently_waiting_on[0].cancel()
225 df = defer.Deferred(cancelled)
226 currently_waiting_on = []
230 if isinstance(cur, failure.Failure):
231 res = cur.throwExceptionIntoGenerator(gen) # external code is run here
233 res = gen.send(cur) # external code is run here
236 except StopIteration:
238 except defer._DefGen_Return as e:
239 # XXX should make sure direct child threw
244 if isinstance(res, defer.Deferred):
245 called, res2 = deferred_has_been_called(res)
250 currently_waiting_on[:] = [res]
252 assert currently_waiting_on[0] is res
253 currently_waiting_on[:] = []
257 res.addBoth(gotResult) # external code is run between this and gotResult
268 class RobustLoopingCall(object):
269 def __init__(self, func, *args, **kwargs):
270 self.func, self.args, self.kwargs = func, args, kwargs
274 def start(self, period):
275 assert not self.running
277 self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
280 def _worker(self, period):
284 self.func(*self.args, **self.kwargs)