broadcast shares in serial
[p2pool.git] / p2pool / util / variable.py
index 660fdb0..f11b94f 100644 (file)
@@ -8,6 +8,7 @@ class Event(object):
         self.observers = {}
         self.id_generator = itertools.count()
         self._once = None
+        self.times = 0
     
     def watch(self, func):
         id = self.id_generator.next()
@@ -24,15 +25,18 @@ class Event(object):
         return res
     
     def happened(self, *event):
+        self.times += 1
+        
+        once, self._once = self._once, None
+        
         for id, func in sorted(self.observers.iteritems()):
             try:
                 func(*event)
             except:
                 log.err(None, "Error while processing Event callbacks:")
         
-        if self._once is not None:
-            self._once.happened(*event)
-            self._once = None
+        if once is not None:
+            once.happened(*event)
     
     def get_deferred(self, timeout=None):
         once = self.once
@@ -51,18 +55,23 @@ class Variable(object):
     def __init__(self, value):
         self.value = value
         self.changed = Event()
+        self.transitioned = Event()
     
     def set(self, value):
         if value == self.value:
             return
         
+        oldvalue = self.value
         self.value = value
         self.changed.happened(value)
+        self.transitioned.happened(oldvalue, value)
+    
+    @defer.inlineCallbacks
+    def get_when_satisfies(self, func):
+        while True:
+            if func(self.value):
+                defer.returnValue(self.value)
+            yield self.changed.once.get_deferred()
     
     def get_not_none(self):
-        if self.value is not None:
-            return defer.succeed(self.value)
-        else:
-            df = defer.Deferred()
-            self.changed.once.watch(df.callback)
-            return df
+        return self.get_when_satisfies(lambda val: val is not None)