if default is not self._nothing:
return default
raise NotNowError(key)
+
+def deferred_has_been_called(df):
+ still_running = True
+ res2 = []
+ def cb(res):
+ if still_running:
+ res2[:] = [res]
+ else:
+ return res
+ df.addBoth(cb)
+ still_running = False
+ if res2:
+ return True, res2[0]
+ return False, None
+def inlineCallbacks(f):
+ from functools import wraps
+ @wraps(f)
+ def _(*args, **kwargs):
+ gen = f(*args, **kwargs)
+ stop_running = [False]
+ def cancelled(df_):
+ assert df_ is df
+ stop_running[0] = True
+ if currently_waiting_on:
+ currently_waiting_on[0].cancel()
+ df = defer.Deferred(cancelled)
+ currently_waiting_on = []
+ def it(cur):
+ while True:
+ try:
+ if isinstance(cur, failure.Failure):
+ res = cur.throwExceptionIntoGenerator(gen) # external code is run here
+ else:
+ res = gen.send(cur) # external code is run here
+ if stop_running[0]:
+ return
+ except StopIteration:
+ df.callback(None)
+ except defer._DefGen_Return as e:
+ # XXX should make sure direct child threw
+ df.callback(e.value)
+ except:
+ df.errback()
+ else:
+ if isinstance(res, defer.Deferred):
+ called, res2 = deferred_has_been_called(res)
+ if called:
+ cur = res2
+ continue
+ else:
+ currently_waiting_on[:] = [res]
+ def gotResult(res2):
+ assert currently_waiting_on[0] is res
+ currently_waiting_on[:] = []
+ if stop_running[0]:
+ return
+ it(res2)
+ res.addBoth(gotResult) # external code is run between this and gotResult
+ else:
+ cur = res
+ continue
+ break
+ it(None)
+ return df
+ return _
+
+
+
+class RobustLoopingCall(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func, self.args, self.kwargs = func, args, kwargs
+
+ self.running = False
+
+ def start(self, period):
+ assert not self.running
+ self.running = True
+ self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
+
+ @inlineCallbacks
+ def _worker(self, period):
+ assert self.running
+ while self.running:
+ try:
+ self.func(*self.args, **self.kwargs)
+ except:
+ log.err()
+ yield sleep(period)
+
+ def stop(self):
+ assert self.running
+ self.running = False
+ self._df.cancel()
+ return self._df