From: Forrest Voight Date: Tue, 17 Jan 2012 20:48:39 +0000 (-0500) Subject: separated client and server handling into factories. incoming and outgoing connection... X-Git-Tag: 0.8.2~10 X-Git-Url: https://git.novaco.in/?a=commitdiff_plain;h=d23ec412042260952d46af21ad4efd021828911e;p=p2pool.git separated client and server handling into factories. incoming and outgoing connections are now separately limited --- diff --git a/p2pool/p2p.py b/p2pool/p2p.py index 0a67683..9e04941 100644 --- a/p2pool/p2p.py +++ b/p2pool/p2p.py @@ -30,7 +30,7 @@ class Protocol(bitcoin_p2p.BaseProtocol): def connectionMade(self): bitcoin_p2p.BaseProtocol.connectionMade(self) - self.node.conns.add(self) + self.factory.proto_made_connection(self) self.addr = self.transport.getPeer().host, self.transport.getPeer().port @@ -114,7 +114,7 @@ class Protocol(bitcoin_p2p.BaseProtocol): self.nonce = nonce self.connected2 = True - self.node.got_conn(self) + self.factory.proto_connected(self) self._think() self._think2() @@ -226,25 +226,57 @@ class Protocol(bitcoin_p2p.BaseProtocol): att(self.send_shares, shares=[share.as_share() for share in shares]) def connectionLost(self, reason): - self.node.conns.remove(self) if self.connected2: - self.node.lost_conn(self) + self.factory.proto_disconnected(self) self.connected2 = False + self.factory.proto_lost_connection(self) class ServerFactory(protocol.ServerFactory): - def __init__(self, node): + def __init__(self, node, max_conns): self.node = node + self.max_conns = max_conns + + self.conns = set() + self.running = False def buildProtocol(self, addr): - if len(self.node.conns) >= self.node.max_conns: + if len(self.conns) >= self.max_conns: return None p = Protocol(self.node) p.factory = self return p + + def proto_made_connection(self, proto): + self.conns.add(proto) + def proto_lost_connection(self, proto): + self.conns.remove(proto) + + def proto_connected(self, proto): + self.node.got_conn(proto) + def proto_disconnected(self, proto): + self.node.lost_conn(proto) + + def start(self): + assert not self.running + self.running = True + + self.listen_port = reactor.listenTCP(self.node.port, self) + def stop(self): + assert self.running + self.running = False + + self.listen_port.stopListening() class ClientFactory(protocol.ClientFactory): - def __init__(self, node): + def __init__(self, node, desired_conns, max_attempts, preferred_addrs): self.node = node + self.desired_conns = desired_conns + self.max_attempts = max_attempts + self.preferred_addrs = preferred_addrs + + self.attempts = {} + self.conns = set() + self.running = False def buildProtocol(self, addr): p = Protocol(self.node) @@ -252,62 +284,86 @@ class ClientFactory(protocol.ClientFactory): return p def startedConnecting(self, connector): - self.node.attempt_started(connector) + host, port = connector.getDestination().host, connector.getDestination().port + if (host, port) in self.attempts: + raise ValueError('already have attempt') + self.attempts[host, port] = connector def clientConnectionFailed(self, connector, reason): - self.node.attempt_failed(connector) + self.clientConnectionLost(connector, reason) def clientConnectionLost(self, connector, reason): - self.node.attempt_ended(connector) + host, port = connector.getDestination().host, connector.getDestination().port + if (host, port) not in self.attempts: + raise ValueError('''don't have attempt''') + if connector is not self.attempts[host, port]: + raise ValueError('wrong connector') + del self.attempts[host, port] + + def proto_made_connection(self, proto): + pass + def proto_lost_connection(self, proto): + pass + + def proto_connected(self, proto): + self.conns.add(proto) + self.node.got_conn(proto) + def proto_disconnected(self, proto): + self.conns.remove(proto) + self.node.lost_conn(proto) + + def start(self): + assert not self.running + self.running = True + self._think() + def stop(self): + assert self.running + self.running = False + + @defer.inlineCallbacks + def _think(self): + while self.running: + try: + if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.node.addr_store)): + if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.node.addr_store): + host, port = random.choice(list(self.preferred_addrs)) + else: + (host, port), = self.node.get_good_peers(1) + + if (host, port) not in self.attempts: + #print 'Trying to connect to', host, port + reactor.connectTCP(host, port, self, timeout=5) + except: + log.err() + + yield deferral.sleep(random.expovariate(1/1)) class Node(object): - def __init__(self, best_share_hash_func, port, net, addr_store={}, preferred_addrs=set(), desired_outgoing=10, max_outgoing_attempts=30, max_incomming=50, preferred_storage=1000): + def __init__(self, best_share_hash_func, port, net, addr_store={}, preferred_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000): self.best_share_hash_func = best_share_hash_func self.port = port self.net = net self.addr_store = dict(addr_store) - self.preferred_addrs = preferred_addrs - self.desired_peers = desired_peers - self.max_conns = max_conns - self.max_attempts = max_attempts self.preferred_storage = preferred_storage self.nonce = random.randrange(2**64) - self.conns = set() - self.attempts = {} self.peers = {} + self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts, preferred_addrs) + self.serverfactory = ServerFactory(self, max_incoming_conns) self.running = False def start(self): if self.running: raise ValueError('already running') - self.running = True + self.clientfactory.start() + self.serverfactory.start() - self.listen_port = reactor.listenTCP(self.port, ServerFactory(self)) + self.running = True - self._think() self._think2() @defer.inlineCallbacks - def _think(self): - while self.running: - try: - if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)): - if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store): - host, port = random.choice(list(self.preferred_addrs)) - else: - (host, port), = self.get_good_peers(1) - - if (host, port) not in self.attempts: - #print 'Trying to connect to', host, port - reactor.connectTCP(host, port, ClientFactory(self), timeout=5) - except: - log.err() - - yield deferral.sleep(random.expovariate(1/1)) - - @defer.inlineCallbacks def _think2(self): while self.running: try: @@ -324,26 +380,8 @@ class Node(object): self.running = False - self.listen_port.stopListening() - - - def attempt_started(self, connector): - host, port = connector.getDestination().host, connector.getDestination().port - if (host, port) in self.attempts: - raise ValueError('already have attempt') - self.attempts[host, port] = connector - - def attempt_failed(self, connector): - self.attempt_ended(connector) - - def attempt_ended(self, connector): - host, port = connector.getDestination().host, connector.getDestination().port - if (host, port) not in self.attempts: - raise ValueError('''don't have attempt''') - if connector is not self.attempts[host, port]: - raise ValueError('wrong connector') - del self.attempts[host, port] - + self.clientfactory.stop() + self.serverfactory.stop() def got_conn(self, conn): if conn.nonce in self.peers: