for df, timer in self.map.pop(id):
df.callback(resp)
timer.cancel()
+
+class GenericDeferrer(object):
+ '''
+ Converts query with identifier/got response interface to deferred interface
+ '''
+
+ def __init__(self, max_id, func, timeout=5):
+ self.max_id = max_id
+ self.func = func
+ self.timeout = timeout
+ self.map = {}
+
+ def __call__(self, *args, **kwargs):
+ while True:
+ id = random.randrange(self.max_id)
+ if id not in self.map:
+ break
+ df = defer.Deferred()
+ def timeout():
+ self.map.pop(id)
+ df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+ timer = reactor.callLater(self.timeout, timeout)
+ self.func(id, *args, **kwargs)
+ self.map[id] = df, timer
+ return df
+
+ def got_response(self, id, resp):
+ if id not in self.map:
+ return
+ df, timer = self.map.pop(id)
+ timer.cancel()
+ df.callback(resp)
+
+class NotNowError(Exception):
+ pass
+
+class DeferredCacher(object):
+ '''
+ like memoize, but for functions that return Deferreds
+
+ @DeferredCacher
+ def f(x):
+ ...
+ return df
+
+ @DeferredCacher.with_backing(bsddb.hashopen(...))
+ def f(x):
+ ...
+ return df
+ '''
+
+ @classmethod
+ def with_backing(cls, backing):
+ return lambda func: cls(func, backing)
+
+ def __init__(self, func, backing=None):
+ if backing is None:
+ backing = {}
+
+ self.func = func
+ self.backing = backing
+ self.waiting = {}
+
+ @defer.inlineCallbacks
+ def __call__(self, key):
+ if key in self.waiting:
+ yield self.waiting[key]
+
+ if key in self.backing:
+ defer.returnValue(self.backing[key])
+ else:
+ self.waiting[key] = defer.Deferred()
+ try:
+ value = yield self.func(key)
+ finally:
+ self.waiting.pop(key).callback(None)
+
+ self.backing[key] = value
+ defer.returnValue(value)
+
+ _nothing = object()
+ def call_now(self, key, default=_nothing):
+ if key in self.waiting:
+ if default is not self._nothing:
+ return default
+ raise NotNowError(key)
+
+ if key in self.backing:
+ return self.backing[key]
+ else:
+ self.waiting[key] = defer.Deferred()
+ def cb(value):
+ self.backing[key] = value
+ self.waiting.pop(key).callback(None)
+ def eb(fail):
+ self.waiting.pop(key).callback(None)
+ print
+ print 'Error when requesting noncached value:'
+ fail.printTraceback()
+ print
+ self.func(key).addCallback(cb).addErrback(eb)
+ if default is not self._nothing:
+ return default
+ raise NotNowError(key)