from __future__ import division
+import itertools
import random
+import sys
from twisted.internet import defer, reactor
from twisted.python import failure, log
reactor.callLater(t, d.callback, None)
return d
-def retry(message, delay):
+def run_repeatedly(f, *args, **kwargs):
+ current_dc = [None]
+ def step():
+ delay = f(*args, **kwargs)
+ current_dc[0] = reactor.callLater(delay, step)
+ step()
+ def stop():
+ current_dc[0].cancel()
+ return stop
+
+class RetrySilentlyException(Exception):
+ pass
+
+def retry(message='Error:', delay=3, max_retries=None, traceback=True):
'''
@retry('Error getting block:', 1)
@defer.inlineCallbacks
def retry2(func):
@defer.inlineCallbacks
def f(*args, **kwargs):
- while True:
+ for i in itertools.count():
try:
result = yield func(*args, **kwargs)
- except:
- log.err(None, message)
+ except Exception, e:
+ if i == max_retries:
+ raise
+ if not isinstance(e, RetrySilentlyException):
+ if traceback:
+ log.err(None, message)
+ else:
+ print >>sys.stderr, message, e
yield sleep(delay)
else:
defer.returnValue(result)
self.map = {}
def __call__(self, id):
- self.func(id)
- uniq = random.randrange(2**256)
+ if id not in self.map:
+ self.func(id)
df = defer.Deferred()
def timeout():
- df, timer = self.map[id].pop(uniq)
- df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
+ self.map[id].remove((df, timer))
if not self.map[id]:
del self.map[id]
- self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
+ df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
+ timer = reactor.callLater(self.timeout, timeout)
+ self.map.setdefault(id, set()).add((df, timer))
return df
def got_response(self, id, resp):
if id not in self.map:
return
- for df, timer in self.map.pop(id).itervalues():
- timer.cancel()
+ for df, timer in self.map.pop(id):
df.callback(resp)
+ timer.cancel()
class GenericDeferrer(object):
'''
self.backing[key] = value
defer.returnValue(value)
- def call_now(self, key):
- if key in self.waiting:
- raise NotNowError()
-
+ _nothing = object()
+ def call_now(self, key, default=_nothing):
if key in self.backing:
return self.backing[key]
- else:
+ if key not in self.waiting:
self.waiting[key] = defer.Deferred()
def cb(value):
self.backing[key] = value
self.waiting.pop(key).callback(None)
def eb(fail):
self.waiting.pop(key).callback(None)
+ if fail.check(RetrySilentlyException):
+ return
print
print 'Error when requesting noncached value:'
fail.printTraceback()
print
self.func(key).addCallback(cb).addErrback(eb)
- raise NotNowError()
+ if default is not self._nothing:
+ return default
+ raise NotNowError(key)