f11b94f96de835ff38be5f545fc73efffcf7b070
[p2pool.git] / p2pool / util / variable.py
1 import itertools
2
3 from twisted.internet import defer, reactor
4 from twisted.python import failure, log
5
6 class Event(object):
7     def __init__(self):
8         self.observers = {}
9         self.id_generator = itertools.count()
10         self._once = None
11         self.times = 0
12     
13     def watch(self, func):
14         id = self.id_generator.next()
15         self.observers[id] = func
16         return id
17     def unwatch(self, id):
18         self.observers.pop(id)
19     
20     @property
21     def once(self):
22         res = self._once
23         if res is None:
24             res = self._once = Event()
25         return res
26     
27     def happened(self, *event):
28         self.times += 1
29         
30         once, self._once = self._once, None
31         
32         for id, func in sorted(self.observers.iteritems()):
33             try:
34                 func(*event)
35             except:
36                 log.err(None, "Error while processing Event callbacks:")
37         
38         if once is not None:
39             once.happened(*event)
40     
41     def get_deferred(self, timeout=None):
42         once = self.once
43         df = defer.Deferred()
44         id1 = once.watch(lambda *event: df.callback(event))
45         if timeout is not None:
46             def do_timeout():
47                 df.errback(failure.Failure(defer.TimeoutError('in Event.get_deferred')))
48                 once.unwatch(id1)
49                 once.unwatch(x)
50             delay = reactor.callLater(timeout, do_timeout)
51             x = once.watch(lambda *event: delay.cancel())
52         return df
53
54 class Variable(object):
55     def __init__(self, value):
56         self.value = value
57         self.changed = Event()
58         self.transitioned = Event()
59     
60     def set(self, value):
61         if value == self.value:
62             return
63         
64         oldvalue = self.value
65         self.value = value
66         self.changed.happened(value)
67         self.transitioned.happened(oldvalue, value)
68     
69     @defer.inlineCallbacks
70     def get_when_satisfies(self, func):
71         while True:
72             if func(self.value):
73                 defer.returnValue(self.value)
74             yield self.changed.once.get_deferred()
75     
76     def get_not_none(self):
77         return self.get_when_satisfies(lambda val: val is not None)