X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=p2pool%2Futil%2Fdeferral.py;h=4d2ea472317c402cffe20dc6f71f67d7442db394;hb=0a3493d6873cfef4fb189d39e64dfbc6e162e2a7;hp=310480950b13142671fbc42365f67540697599e3;hpb=b2d2ec72ba65b3b40f041d74c73f1d6ebee27d87;p=p2pool.git diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py index 3104809..4d2ea47 100644 --- a/p2pool/util/deferral.py +++ b/p2pool/util/deferral.py @@ -88,10 +88,11 @@ class GenericDeferrer(object): Converts query with identifier/got response interface to deferred interface ''' - def __init__(self, max_id, func, timeout=5): + def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None): self.max_id = max_id self.func = func self.timeout = timeout + self.on_timeout = on_timeout self.map = {} def __call__(self, *args, **kwargs): @@ -99,13 +100,20 @@ class GenericDeferrer(object): id = random.randrange(self.max_id) if id not in self.map: break - df = defer.Deferred() + def cancel(df): + df, timer = self.map.pop(id) + timer.cancel() + try: + df = defer.Deferred(cancel) + except TypeError: + df = defer.Deferred() # handle older versions of Twisted def timeout(): self.map.pop(id) df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer'))) + self.on_timeout() timer = reactor.callLater(self.timeout, timeout) - self.func(id, *args, **kwargs) self.map[id] = df, timer + self.func(id, *args, **kwargs) return df def got_response(self, id, resp): @@ -114,6 +122,12 @@ class GenericDeferrer(object): df, timer = self.map.pop(id) timer.cancel() df.callback(resp) + + def respond_all(self, resp): + while self.map: + id, (df, timer) = self.map.popitem() + timer.cancel() + df.errback(resp) class NotNowError(Exception): pass @@ -183,3 +197,97 @@ class DeferredCacher(object): if default is not self._nothing: return default raise NotNowError(key) + +def deferred_has_been_called(df): + still_running = True + res2 = [] + def cb(res): + if still_running: + res2[:] = [res] + else: + return res + df.addBoth(cb) + still_running = False + if res2: + return True, res2[0] + return False, None +def inlineCallbacks(f): + from functools import wraps + @wraps(f) + def _(*args, **kwargs): + gen = f(*args, **kwargs) + stop_running = [False] + def cancelled(df_): + assert df_ is df + stop_running[0] = True + if currently_waiting_on: + currently_waiting_on[0].cancel() + df = defer.Deferred(cancelled) + currently_waiting_on = [] + def it(cur): + while True: + try: + if isinstance(cur, failure.Failure): + res = cur.throwExceptionIntoGenerator(gen) # external code is run here + else: + res = gen.send(cur) # external code is run here + if stop_running[0]: + return + except StopIteration: + df.callback(None) + except defer._DefGen_Return as e: + # XXX should make sure direct child threw + df.callback(e.value) + except: + df.errback() + else: + if isinstance(res, defer.Deferred): + called, res2 = deferred_has_been_called(res) + if called: + cur = res2 + continue + else: + currently_waiting_on[:] = [res] + def gotResult(res2): + assert currently_waiting_on[0] is res + currently_waiting_on[:] = [] + if stop_running[0]: + return + it(res2) + res.addBoth(gotResult) # external code is run between this and gotResult + else: + cur = res + continue + break + it(None) + return df + return _ + + + +class RobustLoopingCall(object): + def __init__(self, func, *args, **kwargs): + self.func, self.args, self.kwargs = func, args, kwargs + + self.running = False + + def start(self, period): + assert not self.running + self.running = True + self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError)) + + @inlineCallbacks + def _worker(self, period): + assert self.running + while self.running: + try: + self.func(*self.args, **self.kwargs) + except: + log.err() + yield sleep(period) + + def stop(self): + assert self.running + self.running = False + self._df.cancel() + return self._df