1 from __future__ import division
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
10 reactor.callLater(t, d.callback, None)
13 class RetrySilentlyException(Exception):
16 def retry(message, delay, max_retries=None):
18 @retry('Error getting block:', 1)
19 @defer.inlineCallbacks
25 @defer.inlineCallbacks
26 def f(*args, **kwargs):
27 for i in itertools.count():
29 result = yield func(*args, **kwargs)
33 if not isinstance(e, RetrySilentlyException):
34 log.err(None, message)
37 defer.returnValue(result)
41 class ReplyMatcher(object):
43 Converts request/got response interface to deferred interface
46 def __init__(self, func, timeout=5):
48 self.timeout = timeout
51 def __call__(self, id):
52 if id not in self.map:
56 self.map[id].remove((df, timer))
59 df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
60 timer = reactor.callLater(self.timeout, timeout)
61 self.map.setdefault(id, set()).add((df, timer))
64 def got_response(self, id, resp):
65 if id not in self.map:
67 for df, timer in self.map.pop(id):
71 class GenericDeferrer(object):
73 Converts query with identifier/got response interface to deferred interface
76 def __init__(self, max_id, func, timeout=5):
79 self.timeout = timeout
82 def __call__(self, *args, **kwargs):
84 id = random.randrange(self.max_id)
85 if id not in self.map:
90 df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
91 timer = reactor.callLater(self.timeout, timeout)
92 self.func(id, *args, **kwargs)
93 self.map[id] = df, timer
96 def got_response(self, id, resp):
97 if id not in self.map:
99 df, timer = self.map.pop(id)
103 class NotNowError(Exception):
106 class DeferredCacher(object):
108 like memoize, but for functions that return Deferreds
115 @DeferredCacher.with_backing(bsddb.hashopen(...))
122 def with_backing(cls, backing):
123 return lambda func: cls(func, backing)
125 def __init__(self, func, backing=None):
130 self.backing = backing
133 @defer.inlineCallbacks
134 def __call__(self, key):
135 if key in self.waiting:
136 yield self.waiting[key]
138 if key in self.backing:
139 defer.returnValue(self.backing[key])
141 self.waiting[key] = defer.Deferred()
143 value = yield self.func(key)
145 self.waiting.pop(key).callback(None)
147 self.backing[key] = value
148 defer.returnValue(value)
151 def call_now(self, key, default=_nothing):
152 if key in self.waiting:
153 if default is not self._nothing:
155 raise NotNowError(key)
157 if key in self.backing:
158 return self.backing[key]
160 self.waiting[key] = defer.Deferred()
162 self.backing[key] = value
163 self.waiting.pop(key).callback(None)
165 self.waiting.pop(key).callback(None)
167 print 'Error when requesting noncached value:'
168 fail.printTraceback()
170 self.func(key).addCallback(cb).addErrback(eb)
171 if default is not self._nothing:
173 raise NotNowError(key)