reworking
[p2pool.git] / p2pool / util / deferral.py
1 from __future__ import division
2
3 import random
4 import traceback
5
6 from twisted.internet import defer, reactor
7 from twisted.python import failure
8
9 def sleep(t):
10     d = defer.Deferred()
11     reactor.callLater(t, d.callback, None)
12     return d
13
14 def retry(message, delay):
15     '''
16     @retry('Error getting block:', 1)
17     @defer.inlineCallbacks
18     def get_block(hash):
19         ...
20     '''
21     
22     def retry2(func):
23         @defer.inlineCallbacks
24         def f(*args, **kwargs):
25             while True:
26                 try:
27                     result = yield func(*args, **kwargs)
28                 except:
29                     print
30                     print message
31                     traceback.print_exc()
32                     print
33                     
34                     yield sleep(delay)
35                 else:
36                     defer.returnValue(result)
37         return f
38     return retry2
39
40 class ReplyMatcher(object):
41     '''
42     Converts request/got response interface to deferred interface
43     '''
44     
45     def __init__(self, func, timeout=5):
46         self.func = func
47         self.timeout = timeout
48         self.map = {}
49     
50     def __call__(self, id):
51         self.func(id)
52         uniq = random.randrange(2**256)
53         df = defer.Deferred()
54         def timeout():
55             df, timer = self.map[id].pop(uniq)
56             df.errback(failure.Failure(defer.TimeoutError()))
57             if not self.map[id]:
58                 del self.map[id]
59         self.map.setdefault(id, {})[uniq] = (df, reactor.callLater(self.timeout, timeout))
60         return df
61     
62     def got_response(self, id, resp):
63         if id not in self.map:
64             return
65         for df, timer in self.map.pop(id).itervalues():
66             timer.cancel()
67             df.callback(resp)
68
69 class GenericDeferrer(object):
70     '''
71     Converts query with identifier/got response interface to deferred interface
72     '''
73     
74     def __init__(self, max_id, func, timeout=5):
75         self.max_id = max_id
76         self.func = func
77         self.timeout = timeout
78         self.map = {}
79     
80     def __call__(self, *args, **kwargs):
81         while True:
82             id = random.randrange(self.max_id)
83             if id not in self.map:
84                 break
85         df = defer.Deferred()
86         def timeout():
87             self.map.pop(id)
88             df.errback(failure.Failure(defer.TimeoutError()))
89         timer = reactor.callLater(self.timeout, timeout)
90         self.func(id, *args, **kwargs)
91         self.map[id] = df, timer
92         return df
93     
94     def got_response(self, id, resp):
95         if id not in self.map:
96             return
97         df, timer = self.map.pop(id)
98         timer.cancel()
99         df.callback(resp)
100
101 class NotNowError(Exception):
102     pass
103
104 class DeferredCacher(object):
105     '''
106     like memoize, but for functions that return Deferreds
107     
108     @DeferredCacher
109     def f(x):
110         ...
111         return df
112     
113     @DeferredCacher.with_backing(bsddb.hashopen(...))
114     def f(x):
115         ...
116         return df
117     '''
118     
119     @classmethod
120     def with_backing(cls, backing):
121         return lambda func: cls(func, backing)
122     
123     def __init__(self, func, backing=None):
124         if backing is None:
125             backing = {}
126         
127         self.func = func
128         self.backing = backing
129         self.waiting = {}
130     
131     @defer.inlineCallbacks
132     def __call__(self, key):
133         if key in self.waiting:
134             yield self.waiting[key]
135         
136         if key in self.backing:
137             defer.returnValue(self.backing[key])
138         else:
139             self.waiting[key] = defer.Deferred()
140             try:
141                 value = yield self.func(key)
142             finally:
143                 self.waiting.pop(key).callback(None)
144         
145         self.backing[key] = value
146         defer.returnValue(value)
147     
148     def call_now(self, key):
149         if key in self.waiting:
150             raise NotNowError()
151         
152         if key in self.backing:
153             return self.backing[key]
154         else:
155             self.waiting[key] = defer.Deferred()
156             def cb(value):
157                 self.backing[key] = value
158                 self.waiting.pop(key).callback(None)
159             def eb(fail):
160                 self.waiting.pop(key).callback(None)
161                 fail.printTraceback()
162             self.func(key).addCallback(cb).addErrback(eb)
163             raise NotNowError()