replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and...
[p2pool.git] / p2pool / util / deferral.py
index aed3adc..4d2ea47 100644 (file)
@@ -197,3 +197,97 @@ class DeferredCacher(object):
         if default is not self._nothing:
             return default
         raise NotNowError(key)
+
+def deferred_has_been_called(df):
+    still_running = True
+    res2 = []
+    def cb(res):
+        if still_running:
+            res2[:] = [res]
+        else:
+            return res
+    df.addBoth(cb)
+    still_running = False
+    if res2:
+        return True, res2[0]
+    return False, None
+def inlineCallbacks(f):
+    from functools import wraps
+    @wraps(f)
+    def _(*args, **kwargs):
+        gen = f(*args, **kwargs)
+        stop_running = [False]
+        def cancelled(df_):
+            assert df_ is df
+            stop_running[0] = True
+            if currently_waiting_on:
+                currently_waiting_on[0].cancel()
+        df = defer.Deferred(cancelled)
+        currently_waiting_on = []
+        def it(cur):
+            while True:
+                try:
+                    if isinstance(cur, failure.Failure):
+                        res = cur.throwExceptionIntoGenerator(gen) # external code is run here
+                    else:
+                        res = gen.send(cur) # external code is run here
+                    if stop_running[0]:
+                        return
+                except StopIteration:
+                    df.callback(None)
+                except defer._DefGen_Return as e:
+                    # XXX should make sure direct child threw
+                    df.callback(e.value)
+                except:
+                    df.errback()
+                else:
+                    if isinstance(res, defer.Deferred):
+                        called, res2 = deferred_has_been_called(res)
+                        if called:
+                            cur = res2
+                            continue
+                        else:
+                            currently_waiting_on[:] = [res]
+                            def gotResult(res2):
+                                assert currently_waiting_on[0] is res
+                                currently_waiting_on[:] = []
+                                if stop_running[0]:
+                                    return
+                                it(res2)
+                            res.addBoth(gotResult) # external code is run between this and gotResult
+                    else:
+                        cur = res
+                        continue
+                break
+        it(None)
+        return df
+    return _
+
+
+
+class RobustLoopingCall(object):
+    def __init__(self, func, *args, **kwargs):
+        self.func, self.args, self.kwargs = func, args, kwargs
+        
+        self.running = False
+    
+    def start(self, period):
+        assert not self.running
+        self.running = True
+        self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
+    
+    @inlineCallbacks
+    def _worker(self, period):
+        assert self.running
+        while self.running:
+            try:
+                self.func(*self.args, **self.kwargs)
+            except:
+                log.err()
+            yield sleep(period)
+    
+    def stop(self):
+        assert self.running
+        self.running = False
+        self._df.cancel()
+        return self._df