for df, timer in self.map.pop(id).itervalues():
timer.cancel()
df.callback(resp)
-
-class GenericDeferrer(object):
- '''
- Converts query with identifier/got response interface to deferred interface
- '''
-
- def __init__(self, max_id, func, timeout=5):
- self.max_id = max_id
- self.func = func
- self.timeout = timeout
- self.map = {}
-
- def __call__(self, *args, **kwargs):
- while True:
- id = random.randrange(self.max_id)
- if id not in self.map:
- break
- df = defer.Deferred()
- def timeout():
- self.map.pop(id)
- df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
- timer = reactor.callLater(self.timeout, timeout)
- self.func(id, *args, **kwargs)
- self.map[id] = df, timer
- return df
-
- def got_response(self, id, resp):
- if id not in self.map:
- return
- df, timer = self.map.pop(id)
- timer.cancel()
- df.callback(resp)
-
-class NotNowError(Exception):
- pass
-
-class DeferredCacher(object):
- '''
- like memoize, but for functions that return Deferreds
-
- @DeferredCacher
- def f(x):
- ...
- return df
-
- @DeferredCacher.with_backing(bsddb.hashopen(...))
- def f(x):
- ...
- return df
- '''
-
- @classmethod
- def with_backing(cls, backing):
- return lambda func: cls(func, backing)
-
- def __init__(self, func, backing=None):
- if backing is None:
- backing = {}
-
- self.func = func
- self.backing = backing
- self.waiting = {}
-
- @defer.inlineCallbacks
- def __call__(self, key):
- if key in self.waiting:
- yield self.waiting[key]
-
- if key in self.backing:
- defer.returnValue(self.backing[key])
- else:
- self.waiting[key] = defer.Deferred()
- try:
- value = yield self.func(key)
- finally:
- self.waiting.pop(key).callback(None)
-
- self.backing[key] = value
- defer.returnValue(value)
-
- _nothing = object()
- def call_now(self, key, default=_nothing):
- if key in self.waiting:
- if default is not self._nothing:
- return default
- raise NotNowError(key)
-
- if key in self.backing:
- return self.backing[key]
- else:
- self.waiting[key] = defer.Deferred()
- def cb(value):
- self.backing[key] = value
- self.waiting.pop(key).callback(None)
- def eb(fail):
- self.waiting.pop(key).callback(None)
- print
- print 'Error when requesting noncached value:'
- fail.printTraceback()
- print
- self.func(key).addCallback(cb).addErrback(eb)
- if default is not self._nothing:
- return default
- raise NotNowError(key)