guarantee ordering and display errors from variable.Event
[p2pool.git] / p2pool / util / variable.py
1 import itertools
2
3 from twisted.internet import defer, reactor
4 from twisted.python import failure, log
5
6 class Event(object):
7     def __init__(self):
8         self.observers = {}
9         self.one_time_observers = {}
10         self.id_generator = itertools.count()
11     
12     def watch(self, func):
13         id = self.id_generator.next()
14         self.observers[id] = func
15         return id
16     def unwatch(self, id):
17         self.observers.pop(id)
18     
19     def watch_one_time(self, func):
20         id = self.id_generator.next()
21         self.one_time_observers[id] = func
22         return id
23     def unwatch_one_time(self, id):
24         self.one_time_observers.pop(id)
25     
26     def happened(self, event=None):
27         for id, func in sorted(self.observers.iteritems()):
28             try:
29                 func(event)
30             except:
31                 log.err(None, "Error while processing Event callbacks:")
32         
33         one_time_observers = self.one_time_observers
34         self.one_time_observers = {}
35         for id, func in sorted(one_time_observers.iteritems()):
36             try:
37                 func(event)
38             except:
39                 log.err(None, "Error while processing Event callbacks:")
40     
41     def get_deferred(self, timeout=None):
42         df = defer.Deferred()
43         id1 = self.watch_one_time(df.callback)
44         if timeout is not None:
45             def do_timeout():
46                 df.errback(failure.Failure(defer.TimeoutError()))
47                 self.unwatch_one_time(id1)
48                 self.unwatch_one_time(x)
49             delay = reactor.callLater(timeout, do_timeout)
50             x = self.watch_one_time(lambda value: delay.cancel())
51         return df
52
53 class Variable(object):
54     def __init__(self, value):
55         self.value = value
56         self.changed = Event()
57     
58     def set(self, value):
59         if value == self.value:
60             return
61         
62         self.value = value
63         self.changed.happened(value)
64     
65     def get_not_none(self):
66         if self.value is not None:
67             return defer.succeed(self.value)
68         else:
69             df = defer.Deferred()
70             self.changed.watch_one_time(df.callback)
71             return df