1 from __future__ import division
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
10 reactor.callLater(t, d.callback, None)
13 def retry(message, delay, max_retries=None):
15 @retry('Error getting block:', 1)
16 @defer.inlineCallbacks
22 @defer.inlineCallbacks
23 def f(*args, **kwargs):
24 for i in itertools.count():
26 result = yield func(*args, **kwargs)
30 log.err(None, message)
33 defer.returnValue(result)
37 class ReplyMatcher(object):
39 Converts request/got response interface to deferred interface
42 def __init__(self, func, timeout=5):
44 self.timeout = timeout
47 def __call__(self, id):
48 if id not in self.map:
52 self.map[id].remove((df, timer))
55 df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
56 timer = reactor.callLater(self.timeout, timeout)
57 self.map.setdefault(id, set()).add((df, timer))
60 def got_response(self, id, resp):
61 if id not in self.map:
63 for df, timer in self.map.pop(id):
67 class GenericDeferrer(object):
69 Converts query with identifier/got response interface to deferred interface
72 def __init__(self, max_id, func, timeout=5):
75 self.timeout = timeout
78 def __call__(self, *args, **kwargs):
80 id = random.randrange(self.max_id)
81 if id not in self.map:
86 df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
87 timer = reactor.callLater(self.timeout, timeout)
88 self.func(id, *args, **kwargs)
89 self.map[id] = df, timer
92 def got_response(self, id, resp):
93 if id not in self.map:
95 df, timer = self.map.pop(id)
99 class NotNowError(Exception):
102 class DeferredCacher(object):
104 like memoize, but for functions that return Deferreds
111 @DeferredCacher.with_backing(bsddb.hashopen(...))
118 def with_backing(cls, backing):
119 return lambda func: cls(func, backing)
121 def __init__(self, func, backing=None):
126 self.backing = backing
129 @defer.inlineCallbacks
130 def __call__(self, key):
131 if key in self.waiting:
132 yield self.waiting[key]
134 if key in self.backing:
135 defer.returnValue(self.backing[key])
137 self.waiting[key] = defer.Deferred()
139 value = yield self.func(key)
141 self.waiting.pop(key).callback(None)
143 self.backing[key] = value
144 defer.returnValue(value)
147 def call_now(self, key, default=_nothing):
148 if key in self.waiting:
149 if default is not self._nothing:
151 raise NotNowError(key)
153 if key in self.backing:
154 return self.backing[key]
156 self.waiting[key] = defer.Deferred()
158 self.backing[key] = value
159 self.waiting.pop(key).callback(None)
161 self.waiting.pop(key).callback(None)
163 print 'Error when requesting noncached value:'
164 fail.printTraceback()
166 self.func(key).addCallback(cb).addErrback(eb)
167 if default is not self._nothing:
169 raise NotNowError(key)