improved deferral.ReplyMatcher
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 from twisted.internet import defer, reactor
4 from twisted.python import failure, log
5
6 def sleep(t):
7     d = defer.Deferred()
8     reactor.callLater(t, d.callback, None)
9     return d
10
11 def retry(message, delay):
12     '''
13     @retry('Error getting block:', 1)
14     @defer.inlineCallbacks
15     def get_block(hash):
16         ...
17     '''
18     
19     def retry2(func):
20         @defer.inlineCallbacks
21         def f(*args, **kwargs):
22             while True:
23                 try:
24                     result = yield func(*args, **kwargs)
25                 except:
26                     log.err(None, message)
27                     yield sleep(delay)
28                 else:
29                     defer.returnValue(result)
30         return f
31     return retry2
32
33 class ReplyMatcher(object):
34     '''
35     Converts request/got response interface to deferred interface
36     '''
37     
38     def __init__(self, func, timeout=5):
39         self.func = func
40         self.timeout = timeout
41         self.map = {}
42     
43     def __call__(self, id):
44         self.func(id)
45         df = defer.Deferred()
46         def timeout():
47             self.map[id].remove((df, timer))
48             if not self.map[id]:
49                 del self.map[id]
50             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
51         timer = reactor.callLater(self.timeout, timeout)
52         self.map.setdefault(id, set()).add((df, timer))
53         return df
54     
55     def got_response(self, id, resp):
56         if id not in self.map:
57             return
58         for df, timer in self.map.pop(id):
59             df.callback(resp)
60             timer.cancel()