best_share_hash=self.node.best_share_hash_func(),
)
- reactor.callLater(10, self._connect_timeout)
- self.timeout_delayed = reactor.callLater(100, self._timeout)
-
- old_dataReceived = self.dataReceived
- def new_dataReceived(data):
- if not self.timeout_delayed.called:
- self.timeout_delayed.reset(100)
- old_dataReceived(data)
- self.dataReceived = new_dataReceived
+ self.timeout_delayed = reactor.callLater(10, self._connect_timeout)
def _connect_timeout(self):
- if not self.connected2 and self.transport.connected:
- print 'Handshake timed out, disconnecting from %s:%i' % self.addr
- self.transport.loseConnection()
+ self.timeout_delayed = None
+ print 'Handshake timed out, disconnecting from %s:%i' % self.addr
+ self.transport.loseConnection()
def packetReceived(self, command, payload2):
try:
self.node.bans[self.transport.getPeer().host] = time.time() + 60*60
def _timeout(self):
- if self.transport.connected:
- print 'Connection timed out, disconnecting from %s:%i' % self.addr
- self.transport.loseConnection()
-
- @defer.inlineCallbacks
- def _think(self):
- while self.connected2:
- self.send_ping()
- yield deferral.sleep(random.expovariate(1/100))
-
- @defer.inlineCallbacks
- def _think2(self):
- while self.connected2:
- self.send_addrme(port=self.node.port)
- #print 'sending addrme'
- yield deferral.sleep(random.expovariate(1/(100*len(self.node.peers) + 1)))
+ self.timeout_delayed = None
+ print 'Connection timed out, disconnecting from %s:%i' % self.addr
+ self.transport.loseConnection()
message_version = pack.ComposedType([
('version', pack.IntType(32)),
self.nonce = nonce
self.connected2 = True
+
+ self.timeout_delayed.cancel()
+ self.timeout_delayed = reactor.callLater(100, self._timeout)
+
+ old_dataReceived = self.dataReceived
+ def new_dataReceived(data):
+ if self.timeout_delayed is not None:
+ self.timeout_delayed.reset(100)
+ old_dataReceived(data)
+ self.dataReceived = new_dataReceived
+
self.factory.proto_connected(self)
- self._think()
- self._think2()
+ self._stop_thread = deferral.run_repeatedly(lambda: [
+ self.send_ping(),
+ random.expovariate(1/100)][-1])
+
+ self._stop_thread2 = deferral.run_repeatedly(lambda: [
+ self.send_addrme(port=self.node.port),
+ random.expovariate(1/(100*len(self.node.peers) + 1))][-1])
if best_share_hash is not None:
self.node.handle_share_hashes([best_share_hash], self)
self.node.handle_bestblock(header, self)
def connectionLost(self, reason):
+ if self.timeout_delayed is not None:
+ self.timeout_delayed.cancel()
if self.connected2:
self.factory.proto_disconnected(self, reason)
+ self._stop_thread()
+ self._stop_thread2()
self.connected2 = False
self.factory.proto_lost_connection(self, reason)
if p2pool.DEBUG:
assert self.running
self.running = False
- self.listen_port.stopListening()
+ return self.listen_port.stopListening()
class ClientFactory(protocol.ClientFactory):
def __init__(self, node, desired_conns, max_attempts):
def start(self):
assert not self.running
self.running = True
- self._think()
+ self._stop_thinking = deferral.run_repeatedly(self._think)
def stop(self):
assert self.running
self.running = False
+ self._stop_thinking()
- @defer.inlineCallbacks
def _think(self):
- while self.running:
- try:
- if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
- (host, port), = self.node.get_good_peers(1)
-
- if self._host_to_ident(host) in self.attempts:
- pass
- elif host in self.node.bans and self.node.bans[host] > time.time():
- pass
- else:
- #print 'Trying to connect to', host, port
- reactor.connectTCP(host, port, self, timeout=5)
- except:
- log.err()
-
- yield deferral.sleep(random.expovariate(1/1))
+ try:
+ if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and self.node.addr_store:
+ (host, port), = self.node.get_good_peers(1)
+
+ if self._host_to_ident(host) in self.attempts:
+ pass
+ elif host in self.node.bans and self.node.bans[host] > time.time():
+ pass
+ else:
+ #print 'Trying to connect to', host, port
+ reactor.connectTCP(host, port, self, timeout=5)
+ except:
+ log.err()
+
+ return random.expovariate(1/1)
class SingleClientFactory(protocol.ReconnectingClientFactory):
def __init__(self, node):
self.running = True
- self._think2()
+ self._stop_thinking = deferral.run_repeatedly(self._think)
- @defer.inlineCallbacks
- def _think2(self):
- while self.running:
- try:
- if len(self.addr_store) < self.preferred_storage and self.peers:
- random.choice(self.peers.values()).send_getaddrs(count=8)
- except:
- log.err()
-
- yield deferral.sleep(random.expovariate(1/20))
+ def _think(self):
+ try:
+ if len(self.addr_store) < self.preferred_storage and self.peers:
+ random.choice(self.peers.values()).send_getaddrs(count=8)
+ except:
+ log.err()
+
+ return random.expovariate(1/20)
+ @defer.inlineCallbacks
def stop(self):
if not self.running:
raise ValueError('already stopped')
self.running = False
- self.clientfactory.stop()
- self.serverfactory.stop()
+ self._stop_thinking()
+ yield self.clientfactory.stop()
+ yield self.serverfactory.stop()
for singleclientconnector in self.singleclientconnectors:
- singleclientconnector.factory.stopTrying() # XXX will this disconnect a current connection?
+ yield singleclientconnector.factory.stopTrying()
+ yield singleclientconnector.disconnect()
del self.singleclientconnectors
def got_conn(self, conn):