from __future__ import division
import itertools
+import random
import sys
from twisted.internet import defer, reactor
from twisted.python import failure, log
def sleep(t):
- d = defer.Deferred()
- reactor.callLater(t, d.callback, None)
+ d = defer.Deferred(canceller=lambda d_: dc.cancel())
+ dc = reactor.callLater(t, d.callback, None)
return d
+def run_repeatedly(f, *args, **kwargs):
+ current_dc = [None]
+ def step():
+ delay = f(*args, **kwargs)
+ current_dc[0] = reactor.callLater(delay, step)
+ step()
+ def stop():
+ current_dc[0].cancel()
+ return stop
+
class RetrySilentlyException(Exception):
pass
Converts query with identifier/got response interface to deferred interface
'''
- def __init__(self, max_id, func, timeout=5):
+ def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
self.max_id = max_id
self.func = func
self.timeout = timeout
+ self.on_timeout = on_timeout
self.map = {}
def __call__(self, *args, **kwargs):
id = random.randrange(self.max_id)
if id not in self.map:
break
- df = defer.Deferred()
+ def cancel(df):
+ df, timer = self.map.pop(id)
+ timer.cancel()
+ try:
+ df = defer.Deferred(cancel)
+ except TypeError:
+ df = defer.Deferred() # handle older versions of Twisted
def timeout():
self.map.pop(id)
df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+ self.on_timeout()
timer = reactor.callLater(self.timeout, timeout)
- self.func(id, *args, **kwargs)
self.map[id] = df, timer
+ self.func(id, *args, **kwargs)
return df
def got_response(self, id, resp):
df, timer = self.map.pop(id)
timer.cancel()
df.callback(resp)
+
+ def respond_all(self, resp):
+ while self.map:
+ id, (df, timer) = self.map.popitem()
+ timer.cancel()
+ df.errback(resp)
class NotNowError(Exception):
pass
self.waiting.pop(key).callback(None)
def eb(fail):
self.waiting.pop(key).callback(None)
+ if fail.check(RetrySilentlyException):
+ return
print
print 'Error when requesting noncached value:'
fail.printTraceback()