Revert "remove deferral.sleep's canceller, as it wasn't used anywhere and caused...
[p2pool.git] / p2pool / util / deferral.py
index 6f75b76..aed3adc 100644 (file)
@@ -8,8 +8,8 @@ from twisted.internet import defer, reactor
 from twisted.python import failure, log
 
 def sleep(t):
-    d = defer.Deferred()
-    reactor.callLater(t, d.callback, None)
+    d = defer.Deferred(canceller=lambda d_: dc.cancel())
+    dc = reactor.callLater(t, d.callback, None)
     return d
 
 def run_repeatedly(f, *args, **kwargs):
@@ -88,10 +88,11 @@ class GenericDeferrer(object):
     Converts query with identifier/got response interface to deferred interface
     '''
     
-    def __init__(self, max_id, func, timeout=5):
+    def __init__(self, max_id, func, timeout=5, on_timeout=lambda: None):
         self.max_id = max_id
         self.func = func
         self.timeout = timeout
+        self.on_timeout = on_timeout
         self.map = {}
     
     def __call__(self, *args, **kwargs):
@@ -99,13 +100,20 @@ class GenericDeferrer(object):
             id = random.randrange(self.max_id)
             if id not in self.map:
                 break
-        df = defer.Deferred()
+        def cancel(df):
+            df, timer = self.map.pop(id)
+            timer.cancel()
+        try:
+            df = defer.Deferred(cancel)
+        except TypeError:
+            df = defer.Deferred() # handle older versions of Twisted
         def timeout():
             self.map.pop(id)
             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
+            self.on_timeout()
         timer = reactor.callLater(self.timeout, timeout)
-        self.func(id, *args, **kwargs)
         self.map[id] = df, timer
+        self.func(id, *args, **kwargs)
         return df
     
     def got_response(self, id, resp):
@@ -114,6 +122,12 @@ class GenericDeferrer(object):
         df, timer = self.map.pop(id)
         timer.cancel()
         df.callback(resp)
+    
+    def respond_all(self, resp):
+        while self.map:
+            id, (df, timer) = self.map.popitem()
+            timer.cancel()
+            df.errback(resp)
 
 class NotNowError(Exception):
     pass