split Tracker's subset_of handling into separate SubsetTracker class
[p2pool.git] / p2pool / util / variable.py
1 import itertools
2
3 from twisted.internet import defer, reactor
4 from twisted.python import failure, log
5
6 class Event(object):
7     def __init__(self):
8         self.observers = {}
9         self.id_generator = itertools.count()
10         self._once = None
11         self.times = 0
12     
13     def run_and_watch(self, func):
14         func()
15         return self.watch(func)
16     def watch(self, func):
17         id = self.id_generator.next()
18         self.observers[id] = func
19         return id
20     def unwatch(self, id):
21         self.observers.pop(id)
22     
23     @property
24     def once(self):
25         res = self._once
26         if res is None:
27             res = self._once = Event()
28         return res
29     
30     def happened(self, *event):
31         self.times += 1
32         
33         once, self._once = self._once, None
34         
35         for id, func in sorted(self.observers.iteritems()):
36             try:
37                 func(*event)
38             except:
39                 log.err(None, "Error while processing Event callbacks:")
40         
41         if once is not None:
42             once.happened(*event)
43     
44     def get_deferred(self, timeout=None):
45         once = self.once
46         df = defer.Deferred()
47         id1 = once.watch(lambda *event: df.callback(event))
48         if timeout is not None:
49             def do_timeout():
50                 df.errback(failure.Failure(defer.TimeoutError('in Event.get_deferred')))
51                 once.unwatch(id1)
52                 once.unwatch(x)
53             delay = reactor.callLater(timeout, do_timeout)
54             x = once.watch(lambda *event: delay.cancel())
55         return df
56
57 class Variable(object):
58     def __init__(self, value):
59         self.value = value
60         self.changed = Event()
61         self.transitioned = Event()
62     
63     def set(self, value):
64         if value == self.value:
65             return
66         
67         oldvalue = self.value
68         self.value = value
69         self.changed.happened(value)
70         self.transitioned.happened(oldvalue, value)
71     
72     @defer.inlineCallbacks
73     def get_when_satisfies(self, func):
74         while True:
75             if func(self.value):
76                 defer.returnValue(self.value)
77             yield self.changed.once.get_deferred()
78     
79     def get_not_none(self):
80         return self.get_when_satisfies(lambda val: val is not None)