replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and...
[p2pool.git] / p2pool / util / deferral.py
index 90e9918..4d2ea47 100644 (file)
@@ -1,16 +1,31 @@
 from __future__ import division
 
+import itertools
 import random
+import sys
 
 from twisted.internet import defer, reactor
 from twisted.python import failure, log
 
 def sleep(t):
-    d = defer.Deferred()
-    reactor.callLater(t, d.callback, None)
+    d = defer.Deferred(canceller=lambda d_: dc.cancel())
+    dc = reactor.callLater(t, d.callback, None)
     return d
 
-def retry(message, delay):
+def run_repeatedly(f, *args, **kwargs):
+    current_dc = [None]
+    def step():
+        delay = f(*args, **kwargs)
+        current_dc[0] = reactor.callLater(delay, step)
+    step()
+    def stop():
+        current_dc[0].cancel()
+    return stop
+
+class RetrySilentlyException(Exception):
+    pass
+
+def retry(message='Error:', delay=3, max_retries=None, traceback=True):
     '''
     @retry('Error getting block:', 1)
     @defer.inlineCallbacks
@@ -21,15 +36,17 @@ def retry(message, delay):
     def retry2(func):
         @defer.inlineCallbacks
         def f(*args, **kwargs):
-            while True:
+            for i in itertools.count():
                 try:
                     result = yield func(*args, **kwargs)
-                except:
-                    print
-                    print message
-                    log.err()
-                    print
-                    
+                except Exception, e:
+                    if i == max_retries:
+                        raise
+                    if not isinstance(e, RetrySilentlyException):
+                        if traceback:
+                            log.err(None, message)
+                        else:
+                            print >>sys.stderr, message, e
                     yield sleep(delay)
                 else:
                     defer.returnValue(result)
@@ -47,33 +64,35 @@ class ReplyMatcher(object):
         self.map = {}
     
     def __call__(self, id):
-        self.func(id)
-        uniq = random.randrange(2**256)
+        if id not in self.map:
+            self.func(id)
         df = defer.Deferred()
         def timeout():
-            df, timer = self.map[id].pop(uniq)
-            df.errback(failure.Failure(defer.TimeoutError()))
+            self.map[id].remove((df, timer))
             if not self.map[id]:
                 del self.map[id]
-        self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
+            df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
+        timer = reactor.callLater(self.timeout, timeout)
+        self.map.setdefault(id, set()).add((df, timer))
         return df
     
     def got_response(self, id, resp):
         if id not in self.map:
             return
-        for df, timer in self.map.pop(id).itervalues():
-            timer.cancel()
+        for df, timer in self.map.pop(id):
             df.callback(resp)
+            timer.cancel()
 
 class GenericDeferrer(object):
     '''
     Converts query with identifier/got response interface to deferred interface
     '''
     
-    def __init__(self, max_id, func, timeout=5):
+    def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
         self.max_id = max_id
         self.func = func
         self.timeout = timeout
+        self.on_timeout = on_timeout
         self.map = {}
     
     def __call__(self, *args, **kwargs):
@@ -81,13 +100,20 @@ class GenericDeferrer(object):
             id = random.randrange(self.max_id)
             if id not in self.map:
                 break
-        df = defer.Deferred()
+        def cancel(df):
+            df, timer = self.map.pop(id)
+            timer.cancel()
+        try:
+            df = defer.Deferred(cancel)
+        except TypeError:
+            df = defer.Deferred() # handle older versions of Twisted
         def timeout():
             self.map.pop(id)
-            df.errback(failure.Failure(defer.TimeoutError()))
+            df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+            self.on_timeout()
         timer = reactor.callLater(self.timeout, timeout)
-        self.func(id, *args, **kwargs)
         self.map[id] = df, timer
+        self.func(id, *args, **kwargs)
         return df
     
     def got_response(self, id, resp):
@@ -96,6 +122,12 @@ class GenericDeferrer(object):
         df, timer = self.map.pop(id)
         timer.cancel()
         df.callback(resp)
+    
+    def respond_all(self, resp):
+        while self.map:
+            id, (df, timer) = self.map.popitem()
+            timer.cancel()
+            df.errback(resp)
 
 class NotNowError(Exception):
     pass
@@ -144,22 +176,118 @@ class DeferredCacher(object):
         self.backing[key] = value
         defer.returnValue(value)
     
-    def call_now(self, key):
-        if key in self.waiting:
-            raise NotNowError()
-        
+    _nothing = object()
+    def call_now(self, key, default=_nothing):
         if key in self.backing:
             return self.backing[key]
-        else:
+        if key not in self.waiting:
             self.waiting[key] = defer.Deferred()
             def cb(value):
                 self.backing[key] = value
                 self.waiting.pop(key).callback(None)
             def eb(fail):
                 self.waiting.pop(key).callback(None)
+                if fail.check(RetrySilentlyException):
+                    return
                 print
                 print 'Error when requesting noncached value:'
                 fail.printTraceback()
                 print
             self.func(key).addCallback(cb).addErrback(eb)
-            raise NotNowError()
+        if default is not self._nothing:
+            return default
+        raise NotNowError(key)
+
+def deferred_has_been_called(df):
+    still_running = True
+    res2 = []
+    def cb(res):
+        if still_running:
+            res2[:] = [res]
+        else:
+            return res
+    df.addBoth(cb)
+    still_running = False
+    if res2:
+        return True, res2[0]
+    return False, None
+def inlineCallbacks(f):
+    from functools import wraps
+    @wraps(f)
+    def _(*args, **kwargs):
+        gen = f(*args, **kwargs)
+        stop_running = [False]
+        def cancelled(df_):
+            assert df_ is df
+            stop_running[0] = True
+            if currently_waiting_on:
+                currently_waiting_on[0].cancel()
+        df = defer.Deferred(cancelled)
+        currently_waiting_on = []
+        def it(cur):
+            while True:
+                try:
+                    if isinstance(cur, failure.Failure):
+                        res = cur.throwExceptionIntoGenerator(gen) # external code is run here
+                    else:
+                        res = gen.send(cur) # external code is run here
+                    if stop_running[0]:
+                        return
+                except StopIteration:
+                    df.callback(None)
+                except defer._DefGen_Return as e:
+                    # XXX should make sure direct child threw
+                    df.callback(e.value)
+                except:
+                    df.errback()
+                else:
+                    if isinstance(res, defer.Deferred):
+                        called, res2 = deferred_has_been_called(res)
+                        if called:
+                            cur = res2
+                            continue
+                        else:
+                            currently_waiting_on[:] = [res]
+                            def gotResult(res2):
+                                assert currently_waiting_on[0] is res
+                                currently_waiting_on[:] = []
+                                if stop_running[0]:
+                                    return
+                                it(res2)
+                            res.addBoth(gotResult) # external code is run between this and gotResult
+                    else:
+                        cur = res
+                        continue
+                break
+        it(None)
+        return df
+    return _
+
+
+
+class RobustLoopingCall(object):
+    def __init__(self, func, *args, **kwargs):
+        self.func, self.args, self.kwargs = func, args, kwargs
+        
+        self.running = False
+    
+    def start(self, period):
+        assert not self.running
+        self.running = True
+        self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
+    
+    @inlineCallbacks
+    def _worker(self, period):
+        assert self.running
+        while self.running:
+            try:
+                self.func(*self.args, **self.kwargs)
+            except:
+                log.err()
+            yield sleep(period)
+    
+    def stop(self):
+        assert self.running
+        self.running = False
+        self._df.cancel()
+        return self._df