1 from __future__ import division
6 from twisted.internet import defer, reactor
7 from twisted.python import failure, log
11 reactor.callLater(t, d.callback, None)
14 class RetrySilentlyException(Exception):
17 def retry(message='Error:', delay=3, max_retries=None, traceback=True):
19 @retry('Error getting block:', 1)
20 @defer.inlineCallbacks
26 @defer.inlineCallbacks
27 def f(*args, **kwargs):
28 for i in itertools.count():
30 result = yield func(*args, **kwargs)
34 if not isinstance(e, RetrySilentlyException):
36 log.err(None, message)
38 print >>sys.stderr, message, e
41 defer.returnValue(result)
45 class ReplyMatcher(object):
47 Converts request/got response interface to deferred interface
50 def __init__(self, func, timeout=5):
52 self.timeout = timeout
55 def __call__(self, id):
56 if id not in self.map:
60 self.map[id].remove((df, timer))
63 df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
64 timer = reactor.callLater(self.timeout, timeout)
65 self.map.setdefault(id, set()).add((df, timer))
68 def got_response(self, id, resp):
69 if id not in self.map:
71 for df, timer in self.map.pop(id):
75 class GenericDeferrer(object):
77 Converts query with identifier/got response interface to deferred interface
80 def __init__(self, max_id, func, timeout=5):
83 self.timeout = timeout
86 def __call__(self, *args, **kwargs):
88 id = random.randrange(self.max_id)
89 if id not in self.map:
94 df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
95 timer = reactor.callLater(self.timeout, timeout)
96 self.func(id, *args, **kwargs)
97 self.map[id] = df, timer
100 def got_response(self, id, resp):
101 if id not in self.map:
103 df, timer = self.map.pop(id)
107 class NotNowError(Exception):
110 class DeferredCacher(object):
112 like memoize, but for functions that return Deferreds
119 @DeferredCacher.with_backing(bsddb.hashopen(...))
126 def with_backing(cls, backing):
127 return lambda func: cls(func, backing)
129 def __init__(self, func, backing=None):
134 self.backing = backing
137 @defer.inlineCallbacks
138 def __call__(self, key):
139 if key in self.waiting:
140 yield self.waiting[key]
142 if key in self.backing:
143 defer.returnValue(self.backing[key])
145 self.waiting[key] = defer.Deferred()
147 value = yield self.func(key)
149 self.waiting.pop(key).callback(None)
151 self.backing[key] = value
152 defer.returnValue(value)
155 def call_now(self, key, default=_nothing):
156 if key in self.backing:
157 return self.backing[key]
158 if key not in self.waiting:
159 self.waiting[key] = defer.Deferred()
161 self.backing[key] = value
162 self.waiting.pop(key).callback(None)
164 self.waiting.pop(key).callback(None)
165 if fail.check(RetrySilentlyException):
168 print 'Error when requesting noncached value:'
169 fail.printTraceback()
171 self.func(key).addCallback(cb).addErrback(eb)
172 if default is not self._nothing:
174 raise NotNowError(key)