retry merged block submittal
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
7
8 def sleep(t):
9     d = defer.Deferred()
10     reactor.callLater(t, d.callback, None)
11     return d
12
13 def retry(message, delay, max_retries=None):
14     '''
15     @retry('Error getting block:', 1)
16     @defer.inlineCallbacks
17     def get_block(hash):
18         ...
19     '''
20     
21     def retry2(func):
22         @defer.inlineCallbacks
23         def f(*args, **kwargs):
24             for i in itertools.count():
25                 try:
26                     result = yield func(*args, **kwargs)
27                 except:
28                     if i == max_retries:
29                         raise
30                     log.err(None, message)
31                     yield sleep(delay)
32                 else:
33                     defer.returnValue(result)
34         return f
35     return retry2
36
37 class ReplyMatcher(object):
38     '''
39     Converts request/got response interface to deferred interface
40     '''
41     
42     def __init__(self, func, timeout=5):
43         self.func = func
44         self.timeout = timeout
45         self.map = {}
46     
47     def __call__(self, id):
48         if id not in self.map:
49             self.func(id)
50         df = defer.Deferred()
51         def timeout():
52             self.map[id].remove((df, timer))
53             if not self.map[id]:
54                 del self.map[id]
55             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
56         timer = reactor.callLater(self.timeout, timeout)
57         self.map.setdefault(id, set()).add((df, timer))
58         return df
59     
60     def got_response(self, id, resp):
61         if id not in self.map:
62             return
63         for df, timer in self.map.pop(id):
64             df.callback(resp)
65             timer.cancel()