broadcast shares in serial
[p2pool.git] / p2pool / util / variable.py
index cdc64d7..f11b94f 100644 (file)
@@ -1,13 +1,14 @@
 import itertools
 
 from twisted.internet import defer, reactor
-from twisted.python import failure
+from twisted.python import failure, log
 
 class Event(object):
     def __init__(self):
         self.observers = {}
-        self.one_time_observers = {}
         self.id_generator = itertools.count()
+        self._once = None
+        self.times = 0
     
     def watch(self, func):
         id = self.id_generator.next()
@@ -16,50 +17,61 @@ class Event(object):
     def unwatch(self, id):
         self.observers.pop(id)
     
-    def watch_one_time(self, func):
-        id = self.id_generator.next()
-        self.one_time_observers[id] = func
-        return id
-    def unwatch_one_time(self, id):
-        self.one_time_observers.pop(id)
+    @property
+    def once(self):
+        res = self._once
+        if res is None:
+            res = self._once = Event()
+        return res
     
-    def happened(self, event=None):
-        for func in self.observers.itervalues():
-            func(event)
+    def happened(self, *event):
+        self.times += 1
+        
+        once, self._once = self._once, None
         
-        one_time_observers = self.one_time_observers
-        self.one_time_observers = {}
-        for func in one_time_observers.itervalues():
-            func(event)
+        for id, func in sorted(self.observers.iteritems()):
+            try:
+                func(*event)
+            except:
+                log.err(None, "Error while processing Event callbacks:")
+        
+        if once is not None:
+            once.happened(*event)
     
     def get_deferred(self, timeout=None):
+        once = self.once
         df = defer.Deferred()
-        id1 = self.watch_one_time(df.callback)
+        id1 = once.watch(lambda *event: df.callback(event))
         if timeout is not None:
             def do_timeout():
-                df.errback(failure.Failure(defer.TimeoutError()))
-                self.unwatch_one_time(id1)
-                self.unwatch_one_time(x)
+                df.errback(failure.Failure(defer.TimeoutError('in Event.get_deferred')))
+                once.unwatch(id1)
+                once.unwatch(x)
             delay = reactor.callLater(timeout, do_timeout)
-            x = self.watch_one_time(lambda value: delay.cancel())
+            x = once.watch(lambda *event: delay.cancel())
         return df
 
 class Variable(object):
     def __init__(self, value):
         self.value = value
         self.changed = Event()
+        self.transitioned = Event()
     
     def set(self, value):
         if value == self.value:
             return
         
+        oldvalue = self.value
         self.value = value
         self.changed.happened(value)
+        self.transitioned.happened(oldvalue, value)
+    
+    @defer.inlineCallbacks
+    def get_when_satisfies(self, func):
+        while True:
+            if func(self.value):
+                defer.returnValue(self.value)
+            yield self.changed.once.get_deferred()
     
     def get_not_none(self):
-        if self.value is not None:
-            return defer.succeed(self.value)
-        else:
-            df = defer.Deferred()
-            self.changed.watch_one_time(df.callback)
-            return df
+        return self.get_when_satisfies(lambda val: val is not None)