60a0add5a4db45c79c6a3055934d2236a5d01cb8
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import random
4
5 from twisted.internet import defer, reactor
6 from twisted.python import failure, log
7
8 def sleep(t):
9     d = defer.Deferred()
10     reactor.callLater(t, d.callback, None)
11     return d
12
13 def retry(message, delay):
14     '''
15     @retry('Error getting block:', 1)
16     @defer.inlineCallbacks
17     def get_block(hash):
18         ...
19     '''
20     
21     def retry2(func):
22         @defer.inlineCallbacks
23         def f(*args, **kwargs):
24             while True:
25                 try:
26                     result = yield func(*args, **kwargs)
27                 except:
28                     print
29                     print message
30                     log.err()
31                     print
32                     
33                     yield sleep(delay)
34                 else:
35                     defer.returnValue(result)
36         return f
37     return retry2
38
39 class ReplyMatcher(object):
40     '''
41     Converts request/got response interface to deferred interface
42     '''
43     
44     def __init__(self, func, timeout=5):
45         self.func = func
46         self.timeout = timeout
47         self.map = {}
48     
49     def __call__(self, id):
50         self.func(id)
51         uniq = random.randrange(2**256)
52         df = defer.Deferred()
53         def timeout():
54             df, timer = self.map[id].pop(uniq)
55             df.errback(failure.Failure(defer.TimeoutError()))
56             if not self.map[id]:
57                 del self.map[id]
58         self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
59         return df
60     
61     def got_response(self, id, resp):
62         if id not in self.map:
63             return
64         for df, timer in self.map.pop(id).itervalues():
65             timer.cancel()
66             df.callback(resp)
67
68 class GenericDeferrer(object):
69     '''
70     Converts query with identifier/got response interface to deferred interface
71     '''
72     
73     def __init__(self, max_id, func, timeout=5):
74         self.max_id = max_id
75         self.func = func
76         self.timeout = timeout
77         self.map = {}
78     
79     def __call__(self, *args, **kwargs):
80         while True:
81             id = random.randrange(self.max_id)
82             if id not in self.map:
83                 break
84         df = defer.Deferred()
85         def timeout():
86             self.map.pop(id)
87             df.errback(failure.Failure(defer.TimeoutError()))
88         timer = reactor.callLater(self.timeout, timeout)
89         self.func(id, *args, **kwargs)
90         self.map[id] = df, timer
91         return df
92     
93     def got_response(self, id, resp):
94         if id not in self.map:
95             return
96         df, timer = self.map.pop(id)
97         timer.cancel()
98         df.callback(resp)
99
100 class NotNowError(Exception):
101     pass
102
103 class DeferredCacher(object):
104     '''
105     like memoize, but for functions that return Deferreds
106     
107     @DeferredCacher
108     def f(x):
109         ...
110         return df
111     
112     @DeferredCacher.with_backing(bsddb.hashopen(...))
113     def f(x):
114         ...
115         return df
116     '''
117     
118     @classmethod
119     def with_backing(cls, backing):
120         return lambda func: cls(func, backing)
121     
122     def __init__(self, func, backing=None):
123         if backing is None:
124             backing = {}
125         
126         self.func = func
127         self.backing = backing
128         self.waiting = {}
129     
130     @defer.inlineCallbacks
131     def __call__(self, key):
132         if key in self.waiting:
133             yield self.waiting[key]
134         
135         if key in self.backing:
136             defer.returnValue(self.backing[key])
137         else:
138             self.waiting[key] = defer.Deferred()
139             try:
140                 value = yield self.func(key)
141             finally:
142                 self.waiting.pop(key).callback(None)
143         
144         self.backing[key] = value
145         defer.returnValue(value)
146     
147     def call_now(self, key):
148         if key in self.waiting:
149             raise NotNowError()
150         
151         if key in self.backing:
152             return self.backing[key]
153         else:
154             self.waiting[key] = defer.Deferred()
155             def cb(value):
156                 self.backing[key] = value
157                 self.waiting.pop(key).callback(None)
158             def eb(fail):
159                 self.waiting.pop(key).callback(None)
160                 print
161                 print "Error when requesting noncached value:"
162                 fail.printTraceback()
163                 print
164             self.func(key).addCallback(cb).addErrback(eb)
165             raise NotNowError()