-from twisted.internet import defer, task
+from twisted.internet import defer
from twisted.python import log
import p2pool
self._watch2 = self._factory.new_block.watch(self._request)
self._requested = set()
- self._clear_task = task.LoopingCall(self._requested.clear)
+ self._clear_task = deferral.RobustLoopingCall(self._requested.clear)
self._clear_task.start(60)
self._last_notified_size = 0
self.updated = variable.Event()
- self._think_task = task.LoopingCall(self._think)
+ self._think_task = deferral.RobustLoopingCall(self._think)
self._think_task.start(15)
- self._think2_task = task.LoopingCall(self._think2)
+ self._think2_task = deferral.RobustLoopingCall(self._think2)
self._think2_task.start(15)
def _think(self):
import sys
import time
-from twisted.internet import protocol, task
+from twisted.internet import protocol
import p2pool
from . import data as bitcoin_data
if hasattr(self.factory, 'gotConnection'):
self.factory.gotConnection(self)
- self.pinger = task.LoopingCall(self.send_ping)
+ self.pinger = deferral.RobustLoopingCall(self.send_ping)
self.pinger.start(30)
message_inv = pack.ComposedType([
if '--iocp' in sys.argv:
from twisted.internet import iocpreactor
iocpreactor.install()
-from twisted.internet import defer, reactor, protocol, task, tcp
+from twisted.internet import defer, reactor, protocol, tcp
from twisted.web import server
from twisted.python import log
from nattraverso import portmapper, ipdiscover
errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
bitcoind_warning_var.set(errors if errors != '' else None)
yield poll_warnings()
- task.LoopingCall(poll_warnings).start(20*60)
+ deferral.RobustLoopingCall(poll_warnings).start(20*60)
print ' ...success!'
print ' Current block hash: %x' % (temp_work['previous_block'],)
ss.add_share(share)
if share.hash in node.tracker.verified.items:
ss.add_verified_hash(share.hash)
- task.LoopingCall(save_shares).start(60)
+ deferral.RobustLoopingCall(save_shares).start(60)
print ' ...success!'
print
def save_addrs():
with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
f.write(json.dumps(node.p2p_node.addr_store.items()))
- task.LoopingCall(save_addrs).start(60)
+ deferral.RobustLoopingCall(save_addrs).start(60)
print ' ...success!'
print
sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
))
signal.siginterrupt(signal.SIGALRM, False)
- task.LoopingCall(signal.alarm, 30).start(1)
+ deferral.RobustLoopingCall(signal.alarm, 30).start(1)
if args.irc_announce:
from twisted.words.protocols import irc
logfile.reopen()
print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
signal.signal(signal.SIGUSR1, sigusr1)
- task.LoopingCall(logfile.reopen).start(5)
+ deferral.RobustLoopingCall(logfile.reopen).start(5)
class ErrorReporter(object):
def __init__(self):
import sys
import time
-from twisted.internet import defer, reactor, task
+from twisted.internet import defer, reactor
from twisted.python import log
from p2pool import data as p2pool_data, p2p
if tx_hash in self.known_txs_var.value:
new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
self.known_txs_var.set(new_known_txs)
- t = task.LoopingCall(forget_old_txs)
+ t = deferral.RobustLoopingCall(forget_old_txs)
t.start(10)
stop_signal.watch(t.stop)
- t = task.LoopingCall(self.clean_tracker)
+ t = deferral.RobustLoopingCall(self.clean_tracker)
t.start(5)
stop_signal.watch(t.stop)
if default is not self._nothing:
return default
raise NotNowError(key)
+
+def deferred_has_been_called(df):
+ still_running = True
+ res2 = []
+ def cb(res):
+ if still_running:
+ res2[:] = [res]
+ else:
+ return res
+ df.addBoth(cb)
+ still_running = False
+ if res2:
+ return True, res2[0]
+ return False, None
+def inlineCallbacks(f):
+ from functools import wraps
+ @wraps(f)
+ def _(*args, **kwargs):
+ gen = f(*args, **kwargs)
+ stop_running = [False]
+ def cancelled(df_):
+ assert df_ is df
+ stop_running[0] = True
+ if currently_waiting_on:
+ currently_waiting_on[0].cancel()
+ df = defer.Deferred(cancelled)
+ currently_waiting_on = []
+ def it(cur):
+ while True:
+ try:
+ if isinstance(cur, failure.Failure):
+ res = cur.throwExceptionIntoGenerator(gen) # external code is run here
+ else:
+ res = gen.send(cur) # external code is run here
+ if stop_running[0]:
+ return
+ except StopIteration:
+ df.callback(None)
+ except defer._DefGen_Return as e:
+ # XXX should make sure direct child threw
+ df.callback(e.value)
+ except:
+ df.errback()
+ else:
+ if isinstance(res, defer.Deferred):
+ called, res2 = deferred_has_been_called(res)
+ if called:
+ cur = res2
+ continue
+ else:
+ currently_waiting_on[:] = [res]
+ def gotResult(res2):
+ assert currently_waiting_on[0] is res
+ currently_waiting_on[:] = []
+ if stop_running[0]:
+ return
+ it(res2)
+ res.addBoth(gotResult) # external code is run between this and gotResult
+ else:
+ cur = res
+ continue
+ break
+ it(None)
+ return df
+ return _
+
+
+
+class RobustLoopingCall(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func, self.args, self.kwargs = func, args, kwargs
+
+ self.running = False
+
+ def start(self, period):
+ assert not self.running
+ self.running = True
+ self._df = self._worker(period).addErrback(lambda fail: fail.trap(defer.CancelledError))
+
+ @inlineCallbacks
+ def _worker(self, period):
+ assert self.running
+ while self.running:
+ try:
+ self.func(*self.args, **self.kwargs)
+ except:
+ log.err()
+ yield sleep(period)
+
+ def stop(self):
+ assert self.running
+ self.running = False
+ self._df.cancel()
+ return self._df
import time
import weakref
-from twisted.internet import task
+from p2pool.util import deferral
class Node(object):
def __init__(self, contents, prev=None, next=None):
self.d = dict() # key -> node, value
self_ref = weakref.ref(self, lambda _: expire_loop.stop() if expire_loop.running else None)
- self._expire_loop = expire_loop = task.LoopingCall(lambda: self_ref().expire())
+ self._expire_loop = expire_loop = deferral.RobustLoopingCall(lambda: self_ref().expire())
expire_loop.start(1)
def stop(self):
import time
import traceback
-from twisted.internet import defer, task
+from twisted.internet import defer
from twisted.python import log
from twisted.web import resource, static
import p2pool
from bitcoin import data as bitcoin_data
from . import data as p2pool_data
-from util import deferred_resource, graph, math, memory, pack, variable
+from util import deferral, deferred_resource, graph, math, memory, pack, variable
def _atomic_read(filename):
try:
with open(os.path.join(datadir_path, 'stats'), 'wb') as f:
f.write(json.dumps(stat_log))
- x = task.LoopingCall(update_stat_log)
+ x = deferral.RobustLoopingCall(update_stat_log)
x.start(5*60)
stop_event.watch(x.stop)
new_root.putChild('log', WebInterface(lambda: stat_log))
'getwork_latency': graph.DataStreamDescription(dataview_descriptions),
'memory_usage': graph.DataStreamDescription(dataview_descriptions),
}, hd_obj)
- x = task.LoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj())))
+ x = deferral.RobustLoopingCall(lambda: _atomic_write(hd_path, json.dumps(hd.to_obj())))
x.start(100)
stop_event.watch(x.stop)
@wb.pseudoshare_received.watch
except:
if p2pool.DEBUG:
traceback.print_exc()
- x = task.LoopingCall(add_point)
+ x = deferral.RobustLoopingCall(add_point)
x.start(5)
stop_event.watch(x.stop)
@node.bitcoind_work.changed.watch