1 from __future__ import division
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
10 reactor.callLater(t, d.callback, None)
13 def retry(message, delay):
15 @retry('Error getting block:', 1)
16 @defer.inlineCallbacks
22 @defer.inlineCallbacks
23 def f(*args, **kwargs):
26 result = yield func(*args, **kwargs)
28 log.err(None, message)
31 defer.returnValue(result)
35 class ReplyMatcher(object):
37 Converts request/got response interface to deferred interface
40 def __init__(self, func, timeout=5):
42 self.timeout = timeout
45 def __call__(self, id):
47 uniq = random.randrange(2**256)
50 df, timer = self.map[id].pop(uniq)
51 df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
54 self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
57 def got_response(self, id, resp):
58 if id not in self.map:
60 for df, timer in self.map.pop(id).itervalues():
64 class GenericDeferrer(object):
66 Converts query with identifier/got response interface to deferred interface
69 def __init__(self, max_id, func, timeout=5):
72 self.timeout = timeout
75 def __call__(self, *args, **kwargs):
77 id = random.randrange(self.max_id)
78 if id not in self.map:
83 df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
84 timer = reactor.callLater(self.timeout, timeout)
85 self.func(id, *args, **kwargs)
86 self.map[id] = df, timer
89 def got_response(self, id, resp):
90 if id not in self.map:
92 df, timer = self.map.pop(id)
96 class NotNowError(Exception):
99 class DeferredCacher(object):
101 like memoize, but for functions that return Deferreds
108 @DeferredCacher.with_backing(bsddb.hashopen(...))
115 def with_backing(cls, backing):
116 return lambda func: cls(func, backing)
118 def __init__(self, func, backing=None):
123 self.backing = backing
126 @defer.inlineCallbacks
127 def __call__(self, key):
128 if key in self.waiting:
129 yield self.waiting[key]
131 if key in self.backing:
132 defer.returnValue(self.backing[key])
134 self.waiting[key] = defer.Deferred()
136 value = yield self.func(key)
138 self.waiting.pop(key).callback(None)
140 self.backing[key] = value
141 defer.returnValue(value)
144 def call_now(self, key, default=_nothing):
145 if key in self.waiting:
146 if default is not self._nothing:
148 raise NotNowError(key)
150 if key in self.backing:
151 return self.backing[key]
153 self.waiting[key] = defer.Deferred()
155 self.backing[key] = value
156 self.waiting.pop(key).callback(None)
158 self.waiting.pop(key).callback(None)
160 print 'Error when requesting noncached value:'
161 fail.printTraceback()
163 self.func(key).addCallback(cb).addErrback(eb)
164 if default is not self._nothing:
166 raise NotNowError(key)