revised all error handlers
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import random
4
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
7
8 def sleep(t):
9     d = defer.Deferred()
10     reactor.callLater(t, d.callback, None)
11     return d
12
13 def retry(message, delay):
14     '''
15     @retry('Error getting block:', 1)
16     @defer.inlineCallbacks
17     def get_block(hash):
18         ...
19     '''
20     
21     def retry2(func):
22         @defer.inlineCallbacks
23         def f(*args, **kwargs):
24             while True:
25                 try:
26                     result = yield func(*args, **kwargs)
27                 except:
28                     log.err(None, message)
29                     yield sleep(delay)
30                 else:
31                     defer.returnValue(result)
32         return f
33     return retry2
34
35 class ReplyMatcher(object):
36     '''
37     Converts request/got response interface to deferred interface
38     '''
39     
40     def __init__(self, func, timeout=5):
41         self.func = func
42         self.timeout = timeout
43         self.map = {}
44     
45     def __call__(self, id):
46         self.func(id)
47         uniq = random.randrange(2**256)
48         df = defer.Deferred()
49         def timeout():
50             df, timer = self.map[id].pop(uniq)
51             df.errback(failure.Failure(defer.TimeoutError()))
52             if not self.map[id]:
53                 del self.map[id]
54         self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
55         return df
56     
57     def got_response(self, id, resp):
58         if id not in self.map:
59             return
60         for df, timer in self.map.pop(id).itervalues():
61             timer.cancel()
62             df.callback(resp)
63
64 class GenericDeferrer(object):
65     '''
66     Converts query with identifier/got response interface to deferred interface
67     '''
68     
69     def __init__(self, max_id, func, timeout=5):
70         self.max_id = max_id
71         self.func = func
72         self.timeout = timeout
73         self.map = {}
74     
75     def __call__(self, *args, **kwargs):
76         while True:
77             id = random.randrange(self.max_id)
78             if id not in self.map:
79                 break
80         df = defer.Deferred()
81         def timeout():
82             self.map.pop(id)
83             df.errback(failure.Failure(defer.TimeoutError()))
84         timer = reactor.callLater(self.timeout, timeout)
85         self.func(id, *args, **kwargs)
86         self.map[id] = df, timer
87         return df
88     
89     def got_response(self, id, resp):
90         if id not in self.map:
91             return
92         df, timer = self.map.pop(id)
93         timer.cancel()
94         df.callback(resp)
95
96 class NotNowError(Exception):
97     pass
98
99 class DeferredCacher(object):
100     '''
101     like memoize, but for functions that return Deferreds
102     
103     @DeferredCacher
104     def f(x):
105         ...
106         return df
107     
108     @DeferredCacher.with_backing(bsddb.hashopen(...))
109     def f(x):
110         ...
111         return df
112     '''
113     
114     @classmethod
115     def with_backing(cls, backing):
116         return lambda func: cls(func, backing)
117     
118     def __init__(self, func, backing=None):
119         if backing is None:
120             backing = {}
121         
122         self.func = func
123         self.backing = backing
124         self.waiting = {}
125     
126     @defer.inlineCallbacks
127     def __call__(self, key):
128         if key in self.waiting:
129             yield self.waiting[key]
130         
131         if key in self.backing:
132             defer.returnValue(self.backing[key])
133         else:
134             self.waiting[key] = defer.Deferred()
135             try:
136                 value = yield self.func(key)
137             finally:
138                 self.waiting.pop(key).callback(None)
139         
140         self.backing[key] = value
141         defer.returnValue(value)
142     
143     def call_now(self, key):
144         if key in self.waiting:
145             raise NotNowError()
146         
147         if key in self.backing:
148             return self.backing[key]
149         else:
150             self.waiting[key] = defer.Deferred()
151             def cb(value):
152                 self.backing[key] = value
153                 self.waiting.pop(key).callback(None)
154             def eb(fail):
155                 self.waiting.pop(key).callback(None)
156                 print
157                 print 'Error when requesting noncached value:'
158                 fail.printTraceback()
159                 print
160             self.func(key).addCallback(cb).addErrback(eb)
161             raise NotNowError()