From 002d037ca16eca5b98df5d42254e0ad9b19a1e0d Mon Sep 17 00:00:00 2001 From: Forrest Voight Date: Sat, 23 Jun 2012 03:20:10 -0400 Subject: [PATCH] made p2p.Node clean up all delayedcalls. test.test_p2p now passes. --- p2pool/p2p.py | 125 +++++++++++++++++++++++------------------------ p2pool/test/test_p2p.py | 2 +- p2pool/util/deferral.py | 14 +++++- 3 files changed, 75 insertions(+), 66 deletions(-) diff --git a/p2pool/p2p.py b/p2pool/p2p.py index add928c..9899f5f 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -48,20 +48,12 @@ class Protocol(p2protocol.Protocol): best_share_hash=self.node.best_share_hash_func(), ) - reactor.callLater(10, self._connect_timeout) - self.timeout_delayed = reactor.callLater(100, self._timeout) - - old_dataReceived = self.dataReceived - def new_dataReceived(data): - if not self.timeout_delayed.called: - self.timeout_delayed.reset(100) - old_dataReceived(data) - self.dataReceived = new_dataReceived + self.timeout_delayed = reactor.callLater(10, self._connect_timeout) def _connect_timeout(self): - if not self.connected2 and self.transport.connected: - print 'Handshake timed out, disconnecting from %s:%i' % self.addr - self.transport.loseConnection() + self.timeout_delayed = None + print 'Handshake timed out, disconnecting from %s:%i' % self.addr + self.transport.loseConnection() def packetReceived(self, command, payload2): try: @@ -81,22 +73,9 @@ class Protocol(p2protocol.Protocol): self.node.bans[self.transport.getPeer().host] = time.time() + 60*60 def _timeout(self): - if self.transport.connected: - print 'Connection timed out, disconnecting from %s:%i' % self.addr - self.transport.loseConnection() - - @defer.inlineCallbacks - def _think(self): - while self.connected2: - self.send_ping() - yield deferral.sleep(random.expovariate(1/100)) - - @defer.inlineCallbacks - def _think2(self): - while self.connected2: - self.send_addrme(port=self.node.port) - #print 'sending addrme' - yield deferral.sleep(random.expovariate(1/(100*len(self.node.peers) + 1))) + self.timeout_delayed = None + print 'Connection timed out, disconnecting from %s:%i' % self.addr + self.transport.loseConnection() message_version = pack.ComposedType([ ('version', pack.IntType(32)), @@ -128,10 +107,26 @@ class Protocol(p2protocol.Protocol): self.nonce = nonce self.connected2 = True + + self.timeout_delayed.cancel() + self.timeout_delayed = reactor.callLater(100, self._timeout) + + old_dataReceived = self.dataReceived + def new_dataReceived(data): + if self.timeout_delayed is not None: + self.timeout_delayed.reset(100) + old_dataReceived(data) + self.dataReceived = new_dataReceived + self.factory.proto_connected(self) - self._think() - self._think2() + self._stop_thread = deferral.run_repeatedly(lambda: [ + self.send_ping(), + random.expovariate(1/100)][-1]) + + self._stop_thread2 = deferral.run_repeatedly(lambda: [ + self.send_addrme(port=self.node.port), + random.expovariate(1/(100*len(self.node.peers) + 1))][-1]) if best_share_hash is not None: self.node.handle_share_hashes([best_share_hash], self) @@ -246,8 +241,12 @@ class Protocol(p2protocol.Protocol): self.node.handle_bestblock(header, self) def connectionLost(self, reason): + if self.timeout_delayed is not None: + self.timeout_delayed.cancel() if self.connected2: self.factory.proto_disconnected(self, reason) + self._stop_thread() + self._stop_thread2() self.connected2 = False self.factory.proto_lost_connection(self, reason) if p2pool.DEBUG: @@ -303,7 +302,7 @@ class ServerFactory(protocol.ServerFactory): assert self.running self.running = False - self.listen_port.stopListening() + return self.listen_port.stopListening() class ClientFactory(protocol.ClientFactory): def __init__(self, node, desired_conns, max_attempts): @@ -351,29 +350,28 @@ class ClientFactory(protocol.ClientFactory): def start(self): assert not self.running self.running = True - self._think() + self._stop_thinking = deferral.run_repeatedly(self._think) def stop(self): assert self.running self.running = False + self._stop_thinking() - @defer.inlineCallbacks def _think(self): - while self.running: - try: - if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store: - (host, port), = self.node.get_good_peers(1) - - if self._host_to_ident(host) in self.attempts: - pass - elif host in self.node.bans and self.node.bans[host] > time.time(): - pass - else: - #print 'Trying to connect to', host, port - reactor.connectTCP(host, port, self, timeout=5) - except: - log.err() - - yield deferral.sleep(random.expovariate(1/1)) + try: + if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store: + (host, port), = self.node.get_good_peers(1) + + if self._host_to_ident(host) in self.attempts: + pass + elif host in self.node.bans and self.node.bans[host] > time.time(): + pass + else: + #print 'Trying to connect to', host, port + reactor.connectTCP(host, port, self, timeout=5) + except: + log.err() + + return random.expovariate(1/1) class SingleClientFactory(protocol.ReconnectingClientFactory): def __init__(self, node): @@ -421,29 +419,30 @@ class Node(object): self.running = True - self._think2() + self._stop_thinking = deferral.run_repeatedly(self._think) - @defer.inlineCallbacks - def _think2(self): - while self.running: - try: - if len(self.addr_store) < self.preferred_storage and self.peers: - random.choice(self.peers.values()).send_getaddrs(count=8) - except: - log.err() - - yield deferral.sleep(random.expovariate(1/20)) + def _think(self): + try: + if len(self.addr_store) < self.preferred_storage and self.peers: + random.choice(self.peers.values()).send_getaddrs(count=8) + except: + log.err() + + return random.expovariate(1/20) + @defer.inlineCallbacks def stop(self): if not self.running: raise ValueError('already stopped') self.running = False - self.clientfactory.stop() - self.serverfactory.stop() + self._stop_thinking() + yield self.clientfactory.stop() + yield self.serverfactory.stop() for singleclientconnector in self.singleclientconnectors: - singleclientconnector.factory.stopTrying() # XXX will this disconnect a current connection? + yield singleclientconnector.factory.stopTrying() + yield singleclientconnector.disconnect() del self.singleclientconnectors def got_conn(self, conn): diff --git a/p2pool/test/test_p2p.py b/p2pool/test/test_p2p.py index d8c18ff..35b91dd 100644 --- a/p2pool/test/test_p2p.py +++ b/p2pool/test/test_p2p.py @@ -33,4 +33,4 @@ class Test(unittest.TestCase): try: yield df finally: - n.stop() + yield n.stop() diff --git a/p2pool/util/deferral.py b/p2pool/util/deferral.py index 20f5ec8..a4759a0 100644 --- a/p2pool/util/deferral.py +++ b/p2pool/util/deferral.py @@ -7,10 +7,20 @@ from twisted.internet import defer, reactor from twisted.python import failure, log def sleep(t): - d = defer.Deferred() - reactor.callLater(t, d.callback, None) + d = defer.Deferred(canceller=lambda d_: dc.cancel()) + dc = reactor.callLater(t, d.callback, None) return d +def run_repeatedly(f, *args, **kwargs): + current_dc = [None] + def step(): + delay = f(*args, **kwargs) + current_dc[0] = reactor.callLater(delay, step) + step() + def stop(): + current_dc[0].cancel() + return stop + class RetrySilentlyException(Exception): pass -- 1.7.1