separated client and server handling into factories. incoming and outgoing connection...
authorForrest Voight <forrest@forre.st>
Tue, 17 Jan 2012 20:48:39 +0000 (15:48 -0500)
committerForrest Voight <forrest@forre.st>
Tue, 17 Jan 2012 20:48:39 +0000 (15:48 -0500)
p2pool/p2p.py

index 0a67683..9e04941 100644 (file)
@@ -30,7 +30,7 @@ class Protocol(bitcoin_p2p.BaseProtocol):
     def connectionMade(self):
         bitcoin_p2p.BaseProtocol.connectionMade(self)
         
-        self.node.conns.add(self)
+        self.factory.proto_made_connection(self)
         
         self.addr = self.transport.getPeer().host, self.transport.getPeer().port
         
@@ -114,7 +114,7 @@ class Protocol(bitcoin_p2p.BaseProtocol):
         
         self.nonce = nonce
         self.connected2 = True
-        self.node.got_conn(self)
+        self.factory.proto_connected(self)
         
         self._think()
         self._think2()
@@ -226,25 +226,57 @@ class Protocol(bitcoin_p2p.BaseProtocol):
             att(self.send_shares, shares=[share.as_share() for share in shares])
     
     def connectionLost(self, reason):
-        self.node.conns.remove(self)
         if self.connected2:
-            self.node.lost_conn(self)
+            self.factory.proto_disconnected(self)
             self.connected2 = False
+        self.factory.proto_lost_connection(self)
 
 class ServerFactory(protocol.ServerFactory):
-    def __init__(self, node):
+    def __init__(self, node, max_conns):
         self.node = node
+        self.max_conns = max_conns
+        
+        self.conns = set()
+        self.running = False
     
     def buildProtocol(self, addr):
-        if len(self.node.conns) >= self.node.max_conns:
+        if len(self.conns) >= self.max_conns:
             return None
         p = Protocol(self.node)
         p.factory = self
         return p
+    
+    def proto_made_connection(self, proto):
+        self.conns.add(proto)
+    def proto_lost_connection(self, proto):
+        self.conns.remove(proto)
+    
+    def proto_connected(self, proto):
+        self.node.got_conn(proto)
+    def proto_disconnected(self, proto):
+        self.node.lost_conn(proto)
+    
+    def start(self):
+        assert not self.running
+        self.running = True
+        
+        self.listen_port = reactor.listenTCP(self.node.port, self)
+    def stop(self):
+        assert self.running
+        self.running = False
+        
+        self.listen_port.stopListening()
 
 class ClientFactory(protocol.ClientFactory):
-    def __init__(self, node):
+    def __init__(self, node, desired_conns, max_attempts, preferred_addrs):
         self.node = node
+        self.desired_conns = desired_conns
+        self.max_attempts = max_attempts
+        self.preferred_addrs = preferred_addrs
+        
+        self.attempts = {}
+        self.conns = set()
+        self.running = False
     
     def buildProtocol(self, addr):
         p = Protocol(self.node)
@@ -252,62 +284,86 @@ class ClientFactory(protocol.ClientFactory):
         return p
     
     def startedConnecting(self, connector):
-        self.node.attempt_started(connector)
+        host, port = connector.getDestination().host, connector.getDestination().port
+        if (host, port) in self.attempts:
+            raise ValueError('already have attempt')
+        self.attempts[host, port] = connector
     
     def clientConnectionFailed(self, connector, reason):
-        self.node.attempt_failed(connector)
+        self.clientConnectionLost(connector, reason)
     
     def clientConnectionLost(self, connector, reason):
-        self.node.attempt_ended(connector)
+        host, port = connector.getDestination().host, connector.getDestination().port
+        if (host, port) not in self.attempts:
+            raise ValueError('''don't have attempt''')
+        if connector is not self.attempts[host, port]:
+            raise ValueError('wrong connector')
+        del self.attempts[host, port]
+    
+    def proto_made_connection(self, proto):
+        pass
+    def proto_lost_connection(self, proto):
+        pass
+    
+    def proto_connected(self, proto):
+        self.conns.add(proto)
+        self.node.got_conn(proto)
+    def proto_disconnected(self, proto):
+        self.conns.remove(proto)
+        self.node.lost_conn(proto)
+    
+    def start(self):
+        assert not self.running
+        self.running = True
+        self._think()
+    def stop(self):
+        assert self.running
+        self.running = False
+    
+    @defer.inlineCallbacks
+    def _think(self):
+        while self.running:
+            try:
+                if len(self.conns) < self.desired_conns and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.node.addr_store)):
+                    if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.node.addr_store):
+                        host, port = random.choice(list(self.preferred_addrs))
+                    else:
+                        (host, port), = self.node.get_good_peers(1)
+                    
+                    if (host, port) not in self.attempts:
+                        #print 'Trying to connect to', host, port
+                        reactor.connectTCP(host, port, self, timeout=5)
+            except:
+                log.err()
+            
+            yield deferral.sleep(random.expovariate(1/1))
 
 class Node(object):
-    def __init__(self, best_share_hash_func, port, net, addr_store={}, preferred_addrs=set(), desired_outgoing=10, max_outgoing_attempts=30, max_incomming=50, preferred_storage=1000):
+    def __init__(self, best_share_hash_func, port, net, addr_store={}, preferred_addrs=set(), desired_outgoing_conns=10, max_outgoing_attempts=30, max_incoming_conns=50, preferred_storage=1000):
         self.best_share_hash_func = best_share_hash_func
         self.port = port
         self.net = net
         self.addr_store = dict(addr_store)
-        self.preferred_addrs = preferred_addrs
-        self.desired_peers = desired_peers
-        self.max_conns = max_conns
-        self.max_attempts = max_attempts
         self.preferred_storage = preferred_storage
         
         self.nonce = random.randrange(2**64)
-        self.conns = set()
-        self.attempts = {}
         self.peers = {}
+        self.clientfactory = ClientFactory(self, desired_outgoing_conns, max_outgoing_attempts, preferred_addrs)
+        self.serverfactory = ServerFactory(self, max_incoming_conns)
         self.running = False
     
     def start(self):
         if self.running:
             raise ValueError('already running')
         
-        self.running = True
+        self.clientfactory.start()
+        self.serverfactory.start()
         
-        self.listen_port = reactor.listenTCP(self.port, ServerFactory(self))
+        self.running = True
         
-        self._think()
         self._think2()
     
     @defer.inlineCallbacks
-    def _think(self):
-        while self.running:
-            try:
-                if len(self.peers) < self.desired_peers and len(self.attempts) < self.max_attempts and (len(self.preferred_addrs) or len(self.addr_store)):
-                    if (random.randrange(2) and len(self.preferred_addrs)) or not len(self.addr_store):
-                        host, port = random.choice(list(self.preferred_addrs))
-                    else:
-                        (host, port), = self.get_good_peers(1)
-                    
-                    if (host, port) not in self.attempts:
-                        #print 'Trying to connect to', host, port
-                        reactor.connectTCP(host, port, ClientFactory(self), timeout=5)
-            except:
-                log.err()
-            
-            yield deferral.sleep(random.expovariate(1/1))
-    
-    @defer.inlineCallbacks
     def _think2(self):
         while self.running:
             try:
@@ -324,26 +380,8 @@ class Node(object):
         
         self.running = False
         
-        self.listen_port.stopListening()
-    
-    
-    def attempt_started(self, connector):
-        host, port = connector.getDestination().host, connector.getDestination().port
-        if (host, port) in self.attempts:
-            raise ValueError('already have attempt')
-        self.attempts[host, port] = connector
-    
-    def attempt_failed(self, connector):
-        self.attempt_ended(connector)
-    
-    def attempt_ended(self, connector):
-        host, port = connector.getDestination().host, connector.getDestination().port
-        if (host, port) not in self.attempts:
-            raise ValueError('''don't have attempt''')
-        if connector is not self.attempts[host, port]:
-            raise ValueError('wrong connector')
-        del self.attempts[host, port]
-    
+        self.clientfactory.stop()
+        self.serverfactory.stop()
     
     def got_conn(self, conn):
         if conn.nonce in self.peers: