cleaned up util.deferral.DeferredCacher and bitcoin.p2p.HeightTracker
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import itertools
4
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
7
8 def sleep(t):
9     d = defer.Deferred()
10     reactor.callLater(t, d.callback, None)
11     return d
12
13 class RetrySilentlyException(Exception):
14     pass
15
16 def retry(message='Error:', delay=3, max_retries=None):
17     '''
18     @retry('Error getting block:', 1)
19     @defer.inlineCallbacks
20     def get_block(hash):
21         ...
22     '''
23     
24     def retry2(func):
25         @defer.inlineCallbacks
26         def f(*args, **kwargs):
27             for i in itertools.count():
28                 try:
29                     result = yield func(*args, **kwargs)
30                 except Exception, e:
31                     if i == max_retries:
32                         raise
33                     if not isinstance(e, RetrySilentlyException):
34                         log.err(None, message)
35                     yield sleep(delay)
36                 else:
37                     defer.returnValue(result)
38         return f
39     return retry2
40
41 class ReplyMatcher(object):
42     '''
43     Converts request/got response interface to deferred interface
44     '''
45     
46     def __init__(self, func, timeout=5):
47         self.func = func
48         self.timeout = timeout
49         self.map = {}
50     
51     def __call__(self, id):
52         if id not in self.map:
53             self.func(id)
54         df = defer.Deferred()
55         def timeout():
56             self.map[id].remove((df, timer))
57             if not self.map[id]:
58                 del self.map[id]
59             df.errback(failure.Failure(defer.TimeoutError('in ReplyMatcher')))
60         timer = reactor.callLater(self.timeout, timeout)
61         self.map.setdefault(id, set()).add((df, timer))
62         return df
63     
64     def got_response(self, id, resp):
65         if id not in self.map:
66             return
67         for df, timer in self.map.pop(id):
68             df.callback(resp)
69             timer.cancel()
70
71 class GenericDeferrer(object):
72     '''
73     Converts query with identifier/got response interface to deferred interface
74     '''
75     
76     def __init__(self, max_id, func, timeout=5):
77         self.max_id = max_id
78         self.func = func
79         self.timeout = timeout
80         self.map = {}
81     
82     def __call__(self, *args, **kwargs):
83         while True:
84             id = random.randrange(self.max_id)
85             if id not in self.map:
86                 break
87         df = defer.Deferred()
88         def timeout():
89             self.map.pop(id)
90             df.errback(failure.Failure(defer.TimeoutError('in GenericDeferrer')))
91         timer = reactor.callLater(self.timeout, timeout)
92         self.func(id, *args, **kwargs)
93         self.map[id] = df, timer
94         return df
95     
96     def got_response(self, id, resp):
97         if id not in self.map:
98             return
99         df, timer = self.map.pop(id)
100         timer.cancel()
101         df.callback(resp)
102
103 class NotNowError(Exception):
104     pass
105
106 class DeferredCacher(object):
107     '''
108     like memoize, but for functions that return Deferreds
109     
110     @DeferredCacher
111     def f(x):
112         ...
113         return df
114     
115     @DeferredCacher.with_backing(bsddb.hashopen(...))
116     def f(x):
117         ...
118         return df
119     '''
120     
121     @classmethod
122     def with_backing(cls, backing):
123         return lambda func: cls(func, backing)
124     
125     def __init__(self, func, backing=None):
126         if backing is None:
127             backing = {}
128         
129         self.func = func
130         self.backing = backing
131         self.waiting = {}
132     
133     @defer.inlineCallbacks
134     def __call__(self, key):
135         if key in self.waiting:
136             yield self.waiting[key]
137         
138         if key in self.backing:
139             defer.returnValue(self.backing[key])
140         else:
141             self.waiting[key] = defer.Deferred()
142             try:
143                 value = yield self.func(key)
144             finally:
145                 self.waiting.pop(key).callback(None)
146         
147         self.backing[key] = value
148         defer.returnValue(value)
149     
150     _nothing = object()
151     def call_now(self, key, default=_nothing):
152         if key in self.backing:
153             return self.backing[key]
154         if key not in self.waiting:
155             self.waiting[key] = defer.Deferred()
156             def cb(value):
157                 self.backing[key] = value
158                 self.waiting.pop(key).callback(None)
159             def eb(fail):
160                 self.waiting.pop(key).callback(None)
161                 print
162                 print 'Error when requesting noncached value:'
163                 fail.printTraceback()
164                 print
165             self.func(key).addCallback(cb).addErrback(eb)
166         if default is not self._nothing:
167             return default
168         raise NotNowError(key)