give a hint about where TimeoutError messages are coming from
[p2pool.git] / p2pool / util / variable.py
1 import itertools
2
3 from twisted.internet import defer, reactor
4 from twisted.python import failure, log
5
6 class Event(object):
7     def __init__(self):
8         self.observers = {}
9         self.id_generator = itertools.count()
10         self._once = None
11     
12     def watch(self, func):
13         id = self.id_generator.next()
14         self.observers[id] = func
15         return id
16     def unwatch(self, id):
17         self.observers.pop(id)
18     
19     @property
20     def once(self):
21         res = self._once
22         if res is None:
23             res = self._once = Event()
24         return res
25     
26     def happened(self, *event):
27         for id, func in sorted(self.observers.iteritems()):
28             try:
29                 func(*event)
30             except:
31                 log.err(None, "Error while processing Event callbacks:")
32         
33         if self._once is not None:
34             self._once.happened(*event)
35             self._once = None
36     
37     def get_deferred(self, timeout=None):
38         once = self.once
39         df = defer.Deferred()
40         id1 = once.watch(lambda *event: df.callback(event))
41         if timeout is not None:
42             def do_timeout():
43                 df.errback(failure.Failure(defer.TimeoutError('in Event.get_deferred')))
44                 once.unwatch(id1)
45                 once.unwatch(x)
46             delay = reactor.callLater(timeout, do_timeout)
47             x = once.watch(lambda *event: delay.cancel())
48         return df
49
50 class Variable(object):
51     def __init__(self, value):
52         self.value = value
53         self.changed = Event()
54     
55     def set(self, value):
56         if value == self.value:
57             return
58         
59         self.value = value
60         self.changed.happened(value)
61     
62     def get_not_none(self):
63         if self.value is not None:
64             return defer.succeed(self.value)
65         else:
66             df = defer.Deferred()
67             self.changed.once.watch(df.callback)
68             return df