cleaned up p2p/rpc port listener retryers
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4 import sys
5
6 from twisted.internet import defer, reactor
7 from twisted.python import failure, log
8
9 def sleep(t):
10     d = defer.Deferred()
11     reactor.callLater(t, d.callback, None)
12     return d
13
14 class RetrySilentlyException(Exception):
15     pass
16
17 def retry(message='Error:', delay=3, max_retries=None, traceback=True):
18     '''
19     @retry('Error getting block:', 1)
20     @defer.inlineCallbacks
21     def get_block(hash):
22         ...
23     '''
24     
25     def retry2(func):
26         @defer.inlineCallbacks
27         def f(*args, **kwargs):
28             for i in itertools.count():
29                 try:
30                     result = yield func(*args, **kwargs)
31                 except Exception, e:
32                     if i == max_retries:
33                         raise
34                     if not isinstance(e, RetrySilentlyException):
35                         if traceback:
36                             log.err(None, message)
37                         else:
38                             print >>sys.stderr, message, e
39                     yield sleep(delay)
40                 else:
41                     defer.returnValue(result)
42         return f
43     return retry2
44
45 class ReplyMatcher(object):
46     '''
47     Converts request/got response interface to deferred interface
48     '''
49     
50     def __init__(self, func, timeout=5):
51         self.func = func
52         self.timeout = timeout
53         self.map = {}
54     
55     def __call__(self, id):
56         if id not in self.map:
57             self.func(id)
58         df = defer.Deferred()
59         def timeout():
60             self.map[id].remove((df, timer))
61             if not self.map[id]:
62                 del self.map[id]
63             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
64         timer = reactor.callLater(self.timeout, timeout)
65         self.map.setdefault(id, set()).add((df, timer))
66         return df
67     
68     def got_response(self, id, resp):
69         if id not in self.map:
70             return
71         for df, timer in self.map.pop(id):
72             df.callback(resp)
73             timer.cancel()
74
75 class GenericDeferrer(object):
76     '''
77     Converts query with identifier/got response interface to deferred interface
78     '''
79     
80     def __init__(self, max_id, func, timeout=5):
81         self.max_id = max_id
82         self.func = func
83         self.timeout = timeout
84         self.map = {}
85     
86     def __call__(self, *args, **kwargs):
87         while True:
88             id = random.randrange(self.max_id)
89             if id not in self.map:
90                 break
91         df = defer.Deferred()
92         def timeout():
93             self.map.pop(id)
94             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
95         timer = reactor.callLater(self.timeout, timeout)
96         self.func(id, *args, **kwargs)
97         self.map[id] = df, timer
98         return df
99     
100     def got_response(self, id, resp):
101         if id not in self.map:
102             return
103         df, timer = self.map.pop(id)
104         timer.cancel()
105         df.callback(resp)
106
107 class NotNowError(Exception):
108     pass
109
110 class DeferredCacher(object):
111     '''
112     like memoize, but for functions that return Deferreds
113     
114     @DeferredCacher
115     def f(x):
116         ...
117         return df
118     
119     @DeferredCacher.with_backing(bsddb.hashopen(...))
120     def f(x):
121         ...
122         return df
123     '''
124     
125     @classmethod
126     def with_backing(cls, backing):
127         return lambda func: cls(func, backing)
128     
129     def __init__(self, func, backing=None):
130         if backing is None:
131             backing = {}
132         
133         self.func = func
134         self.backing = backing
135         self.waiting = {}
136     
137     @defer.inlineCallbacks
138     def __call__(self, key):
139         if key in self.waiting:
140             yield self.waiting[key]
141         
142         if key in self.backing:
143             defer.returnValue(self.backing[key])
144         else:
145             self.waiting[key] = defer.Deferred()
146             try:
147                 value = yield self.func(key)
148             finally:
149                 self.waiting.pop(key).callback(None)
150         
151         self.backing[key] = value
152         defer.returnValue(value)
153     
154     _nothing = object()
155     def call_now(self, key, default=_nothing):
156         if key in self.backing:
157             return self.backing[key]
158         if key not in self.waiting:
159             self.waiting[key] = defer.Deferred()
160             def cb(value):
161                 self.backing[key] = value
162                 self.waiting.pop(key).callback(None)
163             def eb(fail):
164                 self.waiting.pop(key).callback(None)
165                 print
166                 print 'Error when requesting noncached value:'
167                 fail.printTraceback()
168                 print
169             self.func(key).addCallback(cb).addErrback(eb)
170         if default is not self._nothing:
171             return default
172         raise NotNowError(key)