Revert "removed GenericDeferrer and DeferredCacher from util.deferral"
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
7
8 def sleep(t):
9     d = defer.Deferred()
10     reactor.callLater(t, d.callback, None)
11     return d
12
13 def retry(message, delay, max_retries=None):
14     '''
15     @retry('Error getting block:', 1)
16     @defer.inlineCallbacks
17     def get_block(hash):
18         ...
19     '''
20     
21     def retry2(func):
22         @defer.inlineCallbacks
23         def f(*args, **kwargs):
24             for i in itertools.count():
25                 try:
26                     result = yield func(*args, **kwargs)
27                 except:
28                     if i == max_retries:
29                         raise
30                     log.err(None, message)
31                     yield sleep(delay)
32                 else:
33                     defer.returnValue(result)
34         return f
35     return retry2
36
37 class ReplyMatcher(object):
38     '''
39     Converts request/got response interface to deferred interface
40     '''
41     
42     def __init__(self, func, timeout=5):
43         self.func = func
44         self.timeout = timeout
45         self.map = {}
46     
47     def __call__(self, id):
48         if id not in self.map:
49             self.func(id)
50         df = defer.Deferred()
51         def timeout():
52             self.map[id].remove((df, timer))
53             if not self.map[id]:
54                 del self.map[id]
55             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
56         timer = reactor.callLater(self.timeout, timeout)
57         self.map.setdefault(id, set()).add((df, timer))
58         return df
59     
60     def got_response(self, id, resp):
61         if id not in self.map:
62             return
63         for df, timer in self.map.pop(id):
64             df.callback(resp)
65             timer.cancel()
66
67 class GenericDeferrer(object):
68     '''
69     Converts query with identifier/got response interface to deferred interface
70     '''
71     
72     def __init__(self, max_id, func, timeout=5):
73         self.max_id = max_id
74         self.func = func
75         self.timeout = timeout
76         self.map = {}
77     
78     def __call__(self, *args, **kwargs):
79         while True:
80             id = random.randrange(self.max_id)
81             if id not in self.map:
82                 break
83         df = defer.Deferred()
84         def timeout():
85             self.map.pop(id)
86             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
87         timer = reactor.callLater(self.timeout, timeout)
88         self.func(id, *args, **kwargs)
89         self.map[id] = df, timer
90         return df
91     
92     def got_response(self, id, resp):
93         if id not in self.map:
94             return
95         df, timer = self.map.pop(id)
96         timer.cancel()
97         df.callback(resp)
98
99 class NotNowError(Exception):
100     pass
101
102 class DeferredCacher(object):
103     '''
104     like memoize, but for functions that return Deferreds
105     
106     @DeferredCacher
107     def f(x):
108         ...
109         return df
110     
111     @DeferredCacher.with_backing(bsddb.hashopen(...))
112     def f(x):
113         ...
114         return df
115     '''
116     
117     @classmethod
118     def with_backing(cls, backing):
119         return lambda func: cls(func, backing)
120     
121     def __init__(self, func, backing=None):
122         if backing is None:
123             backing = {}
124         
125         self.func = func
126         self.backing = backing
127         self.waiting = {}
128     
129     @defer.inlineCallbacks
130     def __call__(self, key):
131         if key in self.waiting:
132             yield self.waiting[key]
133         
134         if key in self.backing:
135             defer.returnValue(self.backing[key])
136         else:
137             self.waiting[key] = defer.Deferred()
138             try:
139                 value = yield self.func(key)
140             finally:
141                 self.waiting.pop(key).callback(None)
142         
143         self.backing[key] = value
144         defer.returnValue(value)
145     
146     _nothing = object()
147     def call_now(self, key, default=_nothing):
148         if key in self.waiting:
149             if default is not self._nothing:
150                 return default
151             raise NotNowError(key)
152         
153         if key in self.backing:
154             return self.backing[key]
155         else:
156             self.waiting[key] = defer.Deferred()
157             def cb(value):
158                 self.backing[key] = value
159                 self.waiting.pop(key).callback(None)
160             def eb(fail):
161                 self.waiting.pop(key).callback(None)
162                 print
163                 print 'Error when requesting noncached value:'
164                 fail.printTraceback()
165                 print
166             self.func(key).addCallback(cb).addErrback(eb)
167             if default is not self._nothing:
168                 return default
169             raise NotNowError(key)