replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and...
authorForrest Voight <forrest@forre.st>
Fri, 2 Aug 2013 17:57:43 +0000 (13:57 -0400)
committerForrest Voight <forrest@forre.st>
Sun, 4 Aug 2013 01:17:32 +0000 (21:17 -0400)
p2pool/bitcoin/height_tracker.py
p2pool/bitcoin/p2p.py
p2pool/main.py
p2pool/node.py
p2pool/util/deferral.py
p2pool/util/expiring_dict.py
p2pool/web.py

index b54b8e6..e5119fb 100644 (file)
@@ -1,4 +1,4 @@
-from twisted.internet import defer, task
+from twisted.internet import defer
 from twisted.python import log
 
 import p2pool
@@ -29,16 +29,16 @@ class HeightTracker(object):
         self._watch2 = self._factory.new_block.watch(self._request)
         
         self._requested = set()
-        self._clear_task = task.LoopingCall(self._requested.clear)
+        self._clear_task = deferral.RobustLoopingCall(self._requested.clear)
         self._clear_task.start(60)
         
         self._last_notified_size = 0
         
         self.updated = variable.Event()
         
-        self._think_task = task.LoopingCall(self._think)
+        self._think_task = deferral.RobustLoopingCall(self._think)
         self._think_task.start(15)
-        self._think2_task = task.LoopingCall(self._think2)
+        self._think2_task = deferral.RobustLoopingCall(self._think2)
         self._think2_task.start(15)
     
     def _think(self):
index cdb98de..83a3403 100644 (file)
@@ -6,7 +6,7 @@ import random
 import sys
 import time
 
-from twisted.internet import protocol, task
+from twisted.internet import protocol
 
 import p2pool
 from . import data as bitcoin_data
@@ -59,7 +59,7 @@ class Protocol(p2protocol.Protocol):
         if hasattr(self.factory, 'gotConnection'):
             self.factory.gotConnection(self)
         
-        self.pinger = task.LoopingCall(self.send_ping)
+        self.pinger = deferral.RobustLoopingCall(self.send_ping)
         self.pinger.start(30)
     
     message_inv = pack.ComposedType([
index aae66ae..a9ae470 100644 (file)
@@ -14,7 +14,7 @@ import urlparse
 if '--iocp' in sys.argv:
     from twisted.internet import iocpreactor
     iocpreactor.install()
-from twisted.internet import defer, reactor, protocol, task, tcp
+from twisted.internet import defer, reactor, protocol, tcp
 from twisted.web import server
 from twisted.python import log
 from nattraverso import portmapper, ipdiscover
@@ -58,7 +58,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
             bitcoind_warning_var.set(errors if errors != '' else None)
         yield poll_warnings()
-        task.LoopingCall(poll_warnings).start(20*60)
+        deferral.RobustLoopingCall(poll_warnings).start(20*60)
         
         print '    ...success!'
         print '    Current block hash: %x' % (temp_work['previous_block'],)
@@ -136,7 +136,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 ss.add_share(share)
                 if share.hash in node.tracker.verified.items:
                     ss.add_verified_hash(share.hash)
-        task.LoopingCall(save_shares).start(60)
+        deferral.RobustLoopingCall(save_shares).start(60)
         
         print '    ...success!'
         print
@@ -186,7 +186,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
         def save_addrs():
             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
                 f.write(json.dumps(node.p2p_node.addr_store.items()))
-        task.LoopingCall(save_addrs).start(60)
+        deferral.RobustLoopingCall(save_addrs).start(60)
         
         print '    ...success!'
         print
@@ -247,7 +247,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint):
                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
             ))
             signal.siginterrupt(signal.SIGALRM, False)
-            task.LoopingCall(signal.alarm, 30).start(1)
+            deferral.RobustLoopingCall(signal.alarm, 30).start(1)
         
         if args.irc_announce:
             from twisted.words.protocols import irc
@@ -543,7 +543,7 @@ def run():
             logfile.reopen()
             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
         signal.signal(signal.SIGUSR1, sigusr1)
-    task.LoopingCall(logfile.reopen).start(5)
+    deferral.RobustLoopingCall(logfile.reopen).start(5)
     
     class ErrorReporter(object):
         def __init__(self):
index d73a0f2..2b25fbe 100644 (file)
@@ -2,7 +2,7 @@ import random
 import sys
 import time
 
-from twisted.internet import defer, reactor, task
+from twisted.internet import defer, reactor
 from twisted.python import log
 
 from p2pool import data as p2pool_data, p2p
@@ -283,11 +283,11 @@ class Node(object):
                     if tx_hash in self.known_txs_var.value:
                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
             self.known_txs_var.set(new_known_txs)
-        t = task.LoopingCall(forget_old_txs)
+        t = deferral.RobustLoopingCall(forget_old_txs)
         t.start(10)
         stop_signal.watch(t.stop)
         
-        t = task.LoopingCall(self.clean_tracker)
+        t = deferral.RobustLoopingCall(self.clean_tracker)
         t.start(5)
         stop_signal.watch(t.stop)
     
index aed3adc..4d2ea47 100644 (file)
@@ -197,3 +197,97 @@ class DeferredCacher(object):
         if default is not self._nothing:
             return default
         raise NotNowError(key)
+
+def deferred_has_been_called(df):
+    still_running = True
+    res2 = []
+    def cb(res):
+        if still_running:
+            res2[:] = [res]
+        else:
+            return res
+    df.addBoth(cb)
+    still_running = False
+    if res2:
+        return True, res2[0]
+    return False, None
+def inlineCallbacks(f):
+    from functools import wraps
+    @wraps(f)
+    def _(*args, **kwargs):
+        gen = f(*args, **kwargs)
+        stop_running = [False]
+        def cancelled(df_):
+            assert df_ is df
+            stop_running[0] = True
+            if currently_waiting_on:
+                currently_waiting_on[0].cancel()
+        df = defer.Deferred(cancelled)
+        currently_waiting_on = []
+        def it(cur):
+            while True:
+                try:
+                    if isinstance(cur, failure.Failure):
+                        res = cur.throwExceptionIntoGenerator(gen) # external code is run here
+                    else:
+                        res = gen.send(cur) # external code is run here
+                    if stop_running[0]:
+                        return
+                except StopIteration:
+                    df.callback(None)
+                except defer._DefGen_Return as e:
+                    # XXX should make sure direct child threw
+                    df.callback(e.value)
+                except:
+                    df.errback()
+                else:
+                    if isinstance(res, defer.Deferred):
+                        called, res2 = deferred_has_been_called(res)
+                        if called:
+                            cur = res2
+                            continue
+                        else:
+                            currently_waiting_on[:] = [res]
+                            def gotResult(res2):
+                                assert currently_waiting_on[0] is res
+                                currently_waiting_on[:] = []
+                                if stop_running[0]:
+                                    return
+                                it(res2)
+                            res.addBoth(gotResult) # external code is run between this and gotResult
+                    else:
+                        cur = res
+                        continue
+                break
+        it(None)
+        return df
+    return _
+
+
+
+class RobustLoopingCall(object):
+    def __init__(self, func, *args, **kwargs):
+        self.func, self.args, self.kwargs = func, args, kwargs
+        
+        self.running = False
+    
+    def start(self, period):
+        assert not self.running
+        self.running = True
+        self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
+    
+    @inlineCallbacks
+    def _worker(self, period):
+        assert self.running
+        while self.running:
+            try:
+                self.func(*self.args, **self.kwargs)
+            except:
+                log.err()
+            yield sleep(period)
+    
+    def stop(self):
+        assert self.running
+        self.running = False
+        self._df.cancel()
+        return self._df
index 92fb5b3..8a3c9ee 100644 (file)
@@ -3,7 +3,7 @@ from __future__ import division
 import time
 import weakref
 
-from twisted.internet import task
+from p2pool.util import deferral
 
 class Node(object):
     def __init__(self, contents, prev=None, next=None):
@@ -106,7 +106,7 @@ class ExpiringDict(object):
         self.d = dict() # key -> node, value
         
         self_ref = weakref.ref(self, lambda _: expire_loop.stop() if expire_loop.running else None)
-        self._expire_loop = expire_loop = task.LoopingCall(lambda: self_ref().expire())
+        self._expire_loop = expire_loop = deferral.RobustLoopingCall(lambda: self_ref().expire())
         expire_loop.start(1)
     
     def stop(self):
index 6b9f575..b7d80b4 100644 (file)
@@ -7,14 +7,14 @@ import sys
 import time
 import traceback
 
-from twisted.internet import defer, task
+from twisted.internet import defer
 from twisted.python import log
 from twisted.web import resource, static
 
 import p2pool
 from bitcoin import data as bitcoin_data
 from . import data as p2pool_data
-from util import deferred_resource, graph, math, memory, pack, variable
+from util import deferral, deferred_resource, graph, math, memory, pack, variable
 
 def _atomic_read(filename):
     try:
@@ -264,7 +264,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve
         
         with open(os.path.join(datadir_path, 'stats'), 'wb') as f:
             f.write(json.dumps(stat_log))
-    x = task.LoopingCall(update_stat_log)
+    x = deferral.RobustLoopingCall(update_stat_log)
     x.start(5*60)
     stop_event.watch(x.stop)
     new_root.putChild('log', WebInterface(lambda: stat_log))
@@ -387,7 +387,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve
         'getwork_latency': graph.DataStreamDescription(dataview_descriptions),
         'memory_usage': graph.DataStreamDescription(dataview_descriptions),
     }, hd_obj)
-    x = task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj())))
+    x = deferral.RobustLoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj())))
     x.start(100)
     stop_event.watch(x.stop)
     @wb.pseudoshare_received.watch
@@ -437,7 +437,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve
         except:
             if p2pool.DEBUG:
                 traceback.print_exc()
-    x = task.LoopingCall(add_point)
+    x = deferral.RobustLoopingCall(add_point)
     x.start(5)
     stop_event.watch(x.stop)
     @node.bitcoind_work.changed.watch