From: Forrest Voight Date: Fri, 2 Aug 2013 17:57:43 +0000 (-0400) Subject: replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and... X-Git-Tag: 13.3~4 X-Git-Url: https://git.novaco.in/?p=p2pool.git;a=commitdiff_plain;h=0a3493d6873cfef4fb189d39e64dfbc6e162e2a7 replace task.LoopingCall's with deferral.RobustLoopingCall that catches errors and continues --- diff --git a/p2pool/bitcoin/height_tracker.py b/p2pool/bitcoin/height_tracker.py index b54b8e6..e5119fb 100644 --- a/p2pool/bitcoin/height_tracker.py +++ b/p2pool/bitcoin/height_tracker.py @@ -1,4 +1,4 @@ -from twisted.internet import defer, task +from twisted.internet import defer from twisted.python import log import p2pool @@ -29,16 +29,16 @@ class HeightTracker(object): self._watch2 = self._factory.new_block.watch(self._request) self._requested = set() - self._clear_task = task.LoopingCall(self._requested.clear) + self._clear_task = deferral.RobustLoopingCall(self._requested.clear) self._clear_task.start(60) self._last_notified_size = 0 self.updated = variable.Event() - self._think_task = task.LoopingCall(self._think) + self._think_task = deferral.RobustLoopingCall(self._think) self._think_task.start(15) - self._think2_task = task.LoopingCall(self._think2) + self._think2_task = deferral.RobustLoopingCall(self._think2) self._think2_task.start(15) def _think(self): diff --git a/p2pool/bitcoin/p2p.py b/p2pool/bitcoin/p2p.py index cdb98de..83a3403 100644 --- a/p2pool/bitcoin/p2p.py +++ b/p2pool/bitcoin/p2p.py @@ -6,7 +6,7 @@ import random import sys import time -from twisted.internet import protocol, task +from twisted.internet import protocol import p2pool from . import data as bitcoin_data @@ -59,7 +59,7 @@ class Protocol(p2protocol.Protocol): if hasattr(self.factory, 'gotConnection'): self.factory.gotConnection(self) - self.pinger = task.LoopingCall(self.send_ping) + self.pinger = deferral.RobustLoopingCall(self.send_ping) self.pinger.start(30) message_inv = pack.ComposedType([ diff --git a/p2pool/main.py b/p2pool/main.py index aae66ae..a9ae470 100644 --- a/p2pool/main.py +++ b/p2pool/main.py @@ -14,7 +14,7 @@ import urlparse if '--iocp' in sys.argv: from twisted.internet import iocpreactor iocpreactor.install() -from twisted.internet import defer, reactor, protocol, task, tcp +from twisted.internet import defer, reactor, protocol, tcp from twisted.web import server from twisted.python import log from nattraverso import portmapper, ipdiscover @@ -58,7 +58,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors'] bitcoind_warning_var.set(errors if errors != '' else None) yield poll_warnings() - task.LoopingCall(poll_warnings).start(20*60) + deferral.RobustLoopingCall(poll_warnings).start(20*60) print ' ...success!' print ' Current block hash: %x' % (temp_work['previous_block'],) @@ -136,7 +136,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): ss.add_share(share) if share.hash in node.tracker.verified.items: ss.add_verified_hash(share.hash) - task.LoopingCall(save_shares).start(60) + deferral.RobustLoopingCall(save_shares).start(60) print ' ...success!' print @@ -186,7 +186,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): def save_addrs(): with open(os.path.join(datadir_path, 'addrs'), 'wb') as f: f.write(json.dumps(node.p2p_node.addr_store.items())) - task.LoopingCall(save_addrs).start(60) + deferral.RobustLoopingCall(save_addrs).start(60) print ' ...success!' print @@ -247,7 +247,7 @@ def main(args, net, datadir_path, merged_urls, worker_endpoint): sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack()) )) signal.siginterrupt(signal.SIGALRM, False) - task.LoopingCall(signal.alarm, 30).start(1) + deferral.RobustLoopingCall(signal.alarm, 30).start(1) if args.irc_announce: from twisted.words.protocols import irc @@ -543,7 +543,7 @@ def run(): logfile.reopen() print '...and reopened %r after catching SIGUSR1.' % (args.logfile,) signal.signal(signal.SIGUSR1, sigusr1) - task.LoopingCall(logfile.reopen).start(5) + deferral.RobustLoopingCall(logfile.reopen).start(5) class ErrorReporter(object): def __init__(self): diff --git a/p2pool/node.py b/p2pool/node.py index d73a0f2..2b25fbe 100644 --- a/p2pool/node.py +++ b/p2pool/node.py @@ -2,7 +2,7 @@ import random import sys import time -from twisted.internet import defer, reactor, task +from twisted.internet import defer, reactor from twisted.python import log from p2pool import data as p2pool_data, p2p @@ -283,11 +283,11 @@ class Node(object): if tx_hash in self.known_txs_var.value: new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash] self.known_txs_var.set(new_known_txs) - t = task.LoopingCall(forget_old_txs) + t = deferral.RobustLoopingCall(forget_old_txs) t.start(10) stop_signal.watch(t.stop) - t = task.LoopingCall(self.clean_tracker) + t = deferral.RobustLoopingCall(self.clean_tracker) t.start(5) stop_signal.watch(t.stop) diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py index aed3adc..4d2ea47 100644 --- a/p2pool/util/deferral.py +++ b/p2pool/util/deferral.py @@ -197,3 +197,97 @@ class DeferredCacher(object): if default is not self._nothing: return default raise NotNowError(key) + +def deferred_has_been_called(df): + still_running = True + res2 = [] + def cb(res): + if still_running: + res2[:] = [res] + else: + return res + df.addBoth(cb) + still_running = False + if res2: + return True, res2[0] + return False, None +def inlineCallbacks(f): + from functools import wraps + @wraps(f) + def _(*args, **kwargs): + gen = f(*args, **kwargs) + stop_running = [False] + def cancelled(df_): + assert df_ is df + stop_running[0] = True + if currently_waiting_on: + currently_waiting_on[0].cancel() + df = defer.Deferred(cancelled) + currently_waiting_on = [] + def it(cur): + while True: + try: + if isinstance(cur, failure.Failure): + res = cur.throwExceptionIntoGenerator(gen) # external code is run here + else: + res = gen.send(cur) # external code is run here + if stop_running[0]: + return + except StopIteration: + df.callback(None) + except defer._DefGen_Return as e: + # XXX should make sure direct child threw + df.callback(e.value) + except: + df.errback() + else: + if isinstance(res, defer.Deferred): + called, res2 = deferred_has_been_called(res) + if called: + cur = res2 + continue + else: + currently_waiting_on[:] = [res] + def gotResult(res2): + assert currently_waiting_on[0] is res + currently_waiting_on[:] = [] + if stop_running[0]: + return + it(res2) + res.addBoth(gotResult) # external code is run between this and gotResult + else: + cur = res + continue + break + it(None) + return df + return _ + + + +class RobustLoopingCall(object): + def __init__(self, func, *args, **kwargs): + self.func, self.args, self.kwargs = func, args, kwargs + + self.running = False + + def start(self, period): + assert not self.running + self.running = True + self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError)) + + @inlineCallbacks + def _worker(self, period): + assert self.running + while self.running: + try: + self.func(*self.args, **self.kwargs) + except: + log.err() + yield sleep(period) + + def stop(self): + assert self.running + self.running = False + self._df.cancel() + return self._df diff --git a/p2pool/util/expiring_dict.py b/p2pool/util/expiring_dict.py index 92fb5b3..8a3c9ee 100644 --- a/p2pool/util/expiring_dict.py +++ b/p2pool/util/expiring_dict.py @@ -3,7 +3,7 @@ from __future__ import division import time import weakref -from twisted.internet import task +from p2pool.util import deferral class Node(object): def __init__(self, contents, prev=None, next=None): @@ -106,7 +106,7 @@ class ExpiringDict(object): self.d = dict() # key -> node, value self_ref = weakref.ref(self, lambda _: expire_loop.stop() if expire_loop.running else None) - self._expire_loop = expire_loop = task.LoopingCall(lambda: self_ref().expire()) + self._expire_loop = expire_loop = deferral.RobustLoopingCall(lambda: self_ref().expire()) expire_loop.start(1) def stop(self): diff --git a/p2pool/web.py b/p2pool/web.py index 6b9f575..b7d80b4 100644 --- a/p2pool/web.py +++ b/p2pool/web.py @@ -7,14 +7,14 @@ import sys import time import traceback -from twisted.internet import defer, task +from twisted.internet import defer from twisted.python import log from twisted.web import resource, static import p2pool from bitcoin import data as bitcoin_data from . import data as p2pool_data -from util import deferred_resource, graph, math, memory, pack, variable +from util import deferral, deferred_resource, graph, math, memory, pack, variable def _atomic_read(filename): try: @@ -264,7 +264,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve with open(os.path.join(datadir_path, 'stats'), 'wb') as f: f.write(json.dumps(stat_log)) - x = task.LoopingCall(update_stat_log) + x = deferral.RobustLoopingCall(update_stat_log) x.start(5*60) stop_event.watch(x.stop) new_root.putChild('log', WebInterface(lambda: stat_log)) @@ -387,7 +387,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve 'getwork_latency': graph.DataStreamDescription(dataview_descriptions), 'memory_usage': graph.DataStreamDescription(dataview_descriptions), }, hd_obj) - x = task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))) + x = deferral.RobustLoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj()))) x.start(100) stop_event.watch(x.stop) @wb.pseudoshare_received.watch @@ -437,7 +437,7 @@ def get_web_root(wb, datadir_path, bitcoind_warning_var, stop_event=variable.Eve except: if p2pool.DEBUG: traceback.print_exc() - x = task.LoopingCall(add_point) + x = deferral.RobustLoopingCall(add_point) x.start(5) stop_event.watch(x.stop) @node.bitcoind_work.changed.watch