X-Git-Url: https://git.novaco.in/?a=blobdiff_plain;f=nattraverso%2Fpynupnp%2Fupnp.py;fp=nattraverso%2Fpynupnp%2Fupnp.py;h=ade388d914a2d9b859568fad0e9530f945f2951a;hb=2b3f73c66a245a488271927bdb6b362371e43563;hp=0000000000000000000000000000000000000000;hpb=48bda1810ff8577333936ba8affd80cf8684109f;p=p2pool.git diff --git a/nattraverso/pynupnp/upnp.py b/nattraverso/pynupnp/upnp.py new file mode 100644 index 0000000..ade388d --- /dev/null +++ b/nattraverso/pynupnp/upnp.py @@ -0,0 +1,544 @@ +""" +This module is the heart of the upnp support. Device discover, ip discovery +and port mappings are implemented here. + +@author: Raphael Slinckx +@author: Anthony Baxter +@copyright: Copyright 2005 +@license: LGPL +@contact: U{raphael@slinckx.net} +@version: 0.1.0 +""" +__revision__ = "$id" + +import socket, random, urlparse, logging + +from twisted.internet import reactor, defer +from twisted.web import client +from twisted.internet.protocol import DatagramProtocol +from twisted.internet.error import CannotListenError + +from nattraverso.pynupnp.soap import SoapProxy +from nattraverso.pynupnp.upnpxml import UPnPXml +from nattraverso import ipdiscover, portmapper + +class UPnPError(Exception): + """ + A generic UPnP error, with a descriptive message as content. + """ + pass + +def search_upnp_device (): + """ + Check the network for an UPnP device. Returns a deferred + with the L{UPnPDevice} instance as result, if found. + + @return: A deferred called with the L{UPnPDevice} instance + @rtype: L{twisted.internet.defer.Deferred} + """ + try: + return UPnPProtocol().search_device() + except Exception, msg: + return defer.fail(UPnPError(msg)) + +class UPnPMapper(portmapper.NATMapper): + """ + This is the UPnP port mapper implementing the + L{NATMapper} interface. + + @see: L{NATMapper} + """ + + def __init__(self, upnp): + """ + Creates the mapper, with the given L{UPnPDevice} instance. + + @param upnp: L{UPnPDevice} instance + """ + self._mapped = {} + self._upnp = upnp + + def map(self, port): + """ + See interface + """ + self._check_valid_port(port) + + #Port is already mapped + if port in self._mapped: + return defer.succeed(self._mapped[port]) + + #Trigger a new mapping creation, first fetch local ip. + result = ipdiscover.get_local_ip() + self._mapped[port] = result + return result.addCallback(self._map_got_local_ip, port) + + def info(self, port): + """ + See interface + """ + # If the mapping exists, everything's ok + if port in self._mapped: + return self._mapped[port] + else: + raise ValueError('Port %r is not currently mapped'%(port)) + + def unmap(self, port): + """ + See interface + """ + if port in self._mapped: + existing = self._mapped[port] + + #Pending mapping, queue an unmap,return existing deferred + if type(existing) is not tuple: + existing.addCallback(lambda x: self.unmap(port)) + return existing + + #Remove our local mapping + del self._mapped[port] + + #Ask the UPnP to remove the mapping + extaddr, extport = existing + return self._upnp.remove_port_mapping(extport, port.getHost().type) + else: + raise ValueError('Port %r is not currently mapped'%(port)) + + def get_port_mappings(self): + """ + See interface + """ + return self._upnp.get_port_mappings() + + def _map_got_local_ip(self, ip_result, port): + """ + We got the local ip address, retreive the existing port mappings + in the device. + + @param ip_result: result of L{ipdiscover.get_local_ip} + @param port: a L{twisted.internet.interfaces.IListeningPort} we + want to map + """ + local, ip = ip_result + return self._upnp.get_port_mappings().addCallback( + self._map_got_port_mappings, ip, port) + + def _map_got_port_mappings(self, mappings, ip, port): + """ + We got all the existing mappings in the device, find an unused one + and assign it for the requested port. + + @param ip: The local ip of this host "x.x.x.x" + @param port: a L{twisted.internet.interfaces.IListeningPort} we + want to map + @param mappings: result of L{UPnPDevice.get_port_mappings} + """ + + #Get the requested mapping's info + ptype = port.getHost().type + intport = port.getHost().port + + for extport in [random.randrange(1025, 65536) for val in range(20)]: + # Check if there is an existing mapping, if it does not exist, bingo + if not (ptype, extport) in mappings: + break + + if (ptype, extport) in mappings: + existing = mappings[ptype, extport] + + local_ip, local_port = existing + if local_ip == ip and local_port == intport: + # Existing binding for this host/port/proto - replace it + break + + # Triggers the creation of the mapping on the device + result = self._upnp.add_port_mapping(ip, intport, extport, 'pynupnp', ptype) + + # We also need the external IP, so we queue first an + # External IP Discovery, then we add the mapping. + return result.addCallback( + lambda x: self._upnp.get_external_ip()).addCallback( + self._port_mapping_added, extport, port) + + def _port_mapping_added(self, extaddr, extport, port): + """ + The port mapping was added in the device, this means:: + + Internet NAT LAN + | + > IP:extaddr |> IP:local ip + > Port:extport |> Port:port + | + + @param extaddr: The exernal ip address + @param extport: The external port as number + @param port: The internal port as a + L{twisted.internet.interfaces.IListeningPort} object, that has been + mapped + """ + self._mapped[port] = (extaddr, extport) + return (extaddr, extport) + +class UPnPDevice: + """ + Represents an UPnP device, with the associated infos, and remote methods. + """ + def __init__(self, soap_proxy, info): + """ + Build the device, with the given SOAP proxy, and the meta-infos. + + @param soap_proxy: an initialized L{SoapProxy} to the device + @param info: a dictionnary of various infos concerning the + device extracted with L{UPnPXml} + """ + self._soap_proxy = soap_proxy + self._info = info + + def get_external_ip(self): + """ + Triggers an external ip discovery on the upnp device. Returns + a deferred called with the external ip of this host. + + @return: A deferred called with the ip address, as "x.x.x.x" + @rtype: L{twisted.internet.defer.Deferred} + """ + result = self._soap_proxy.call('GetExternalIPAddress') + result.addCallback(self._on_external_ip) + return result + + def get_port_mappings(self): + """ + Retreive the existing port mappings + + @see: L{portmapper.NATMapper.get_port_mappings} + @return: A deferred called with the dictionnary as defined + in the interface L{portmapper.NATMapper.get_port_mappings} + @rtype: L{twisted.internet.defer.Deferred} + """ + return self._get_port_mapping() + + def add_port_mapping(self, local_ip, intport, extport, desc, proto, lease=0): + """ + Add a port mapping in the upnp device. Returns a deferred. + + @param local_ip: the LAN ip of this host as "x.x.x.x" + @param intport: the internal port number + @param extport: the external port number + @param desc: the description of this mapping (string) + @param proto: "UDP" or "TCP" + @param lease: The duration of the lease in (mili)seconds(??) + @return: A deferred called with None when the mapping is done + @rtype: L{twisted.internet.defer.Deferred} + """ + result = self._soap_proxy.call('AddPortMapping', NewRemoteHost="", + NewExternalPort=extport, + NewProtocol=proto, + NewInternalPort=intport, + NewInternalClient=local_ip, + NewEnabled=1, + NewPortMappingDescription=desc, + NewLeaseDuration=lease) + + return result.addCallbacks(self._on_port_mapping_added, + self._on_no_port_mapping_added) + + def remove_port_mapping(self, extport, proto): + """ + Remove an existing port mapping on the device. Returns a deferred + + @param extport: the external port number associated to the mapping + to be removed + @param proto: either "UDP" or "TCP" + @return: A deferred called with None when the mapping is done + @rtype: L{twisted.internet.defer.Deferred} + """ + result = self._soap_proxy.call('DeletePortMapping', NewRemoteHost="", + NewExternalPort=extport, + NewProtocol=proto) + + return result.addCallbacks(self._on_port_mapping_removed, + self._on_no_port_mapping_removed) + + # Private -------- + def _on_external_ip(self, res): + """ + Called when we received the external ip address from the device. + + @param res: the SOAPpy structure of the result + @return: the external ip string, as "x.x.x.x" + """ + logging.debug("Got external ip struct: %r", res) + return res['NewExternalIPAddress'] + + def _get_port_mapping(self, mapping_id=0, mappings=None): + """ + Fetch the existing mappings starting at index + "mapping_id" from the device. + + To retreive all the mappings call this without parameters. + + @param mapping_id: The index of the mapping to start fetching from + @param mappings: the dictionnary of already fetched mappings + @return: A deferred called with the existing mappings when all have been + retreived, see L{get_port_mappings} + @rtype: L{twisted.internet.defer.Deferred} + """ + if mappings == None: + mappings = {} + + result = self._soap_proxy.call('GetGenericPortMappingEntry', + NewPortMappingIndex=mapping_id) + return result.addCallbacks( + lambda x: self._on_port_mapping_received(x, mapping_id+1, mappings), + lambda x: self._on_no_port_mapping_received( x, mappings)) + + def _on_port_mapping_received(self, response, mapping_id, mappings): + """ + Called we we receive a single mapping from the device. + + @param response: a SOAPpy structure, representing the device's answer + @param mapping_id: The index of the next mapping in the device + @param mappings: the already fetched mappings, see L{get_port_mappings} + @return: A deferred called with the existing mappings when all have been + retreived, see L{get_port_mappings} + @rtype: L{twisted.internet.defer.Deferred} + """ + logging.debug("Got mapping struct: %r", response) + mappings[ + response['NewProtocol'], response['NewExternalPort'] + ] = (response['NewInternalClient'], response['NewInternalPort']) + return self._get_port_mapping(mapping_id, mappings) + + def _on_no_port_mapping_received(self, failure, mappings): + """ + Called when we have no more port mappings to retreive, or an + error occured while retreiving them. + + Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok, + it just means we have finished. If it returns some other error, then we + fail with an UPnPError. + + @param mappings: the already retreived mappings + @param failure: the failure + @return: The existing mappings as defined in L{get_port_mappings} + @raise UPnPError: When we got any other error + than "SpecifiedArrayIndexInvalid" + """ + logging.debug("_on_no_port_mapping_received: %s", failure) + err = failure.value + try: + message = err.args[0]["UPnPError"]["errorDescription"] + if "SpecifiedArrayIndexInvalid" == message: + return mappings + else: + raise UPnPError("GetGenericPortMappingEntry got %s"%(message)) + except: + raise UPnPError("GetGenericPortMappingEntry got %s"%(err.args[0])) + + + def _on_port_mapping_added(self, response): + """ + The port mapping was successfully added, return None to the deferred. + """ + return None + + def _on_no_port_mapping_added(self, failure): + """ + Called when the port mapping could not be added. Immediately + raise an UPnPError, with the SOAPpy structure inside. + + @raise UPnPError: When the port mapping could not be added + """ + raise UPnPError(failure.value.args[0]) + + def _on_port_mapping_removed(self, response): + """ + The port mapping was successfully removed, return None to the deferred. + """ + return None + + def _on_no_port_mapping_removed(self, failure): + """ + Called when the port mapping could not be removed. Immediately + raise an UPnPError, with the SOAPpy structure inside. + + @raise UPnPError: When the port mapping could not be deleted + """ + raise UPnPError(failure.value.args[0]) + +# UPNP multicast address, port and request string +_UPNP_MCAST = '239.255.255.250' +_UPNP_PORT = 1900 +_UPNP_SEARCH_REQUEST = """M-SEARCH * HTTP/1.1\r +Host:%s:%s\r +ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r +Man:"ssdp:discover"\r +MX:3\r +\r +""" % (_UPNP_MCAST, _UPNP_PORT) + +class UPnPProtocol(DatagramProtocol, object): + """ + The UPnP Device discovery udp multicast twisted protocol. + """ + + def __init__(self, *args, **kwargs): + """ + Init the protocol, no parameters needed. + """ + super(UPnPProtocol, self).__init__(*args, **kwargs) + + # Url to use to talk to upnp device + self._control_url = None + self._device = None + + #Device discovery deferred + self._discovery = None + self._discovery_timeout = None + + # Public methods + def search_device(self): + """ + Triggers a UPnP device discovery. + + The returned deferred will be called with the L{UPnPDevice} that has + been found in the LAN. + + @return: A deferred called with the detected L{UPnPDevice} instance. + @rtype: L{twisted.internet.defer.Deferred} + """ + self._discovery = defer.Deferred() + + attempt = 0 + mcast = None + while True: + try: + mcast = reactor.listenMulticast(1900+attempt, self) + break + except CannotListenError: + attempt = random.randint(0, 500) + + # joined multicast group, starting upnp search + mcast.joinGroup('239.255.255.250', socket.INADDR_ANY) + + self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT)) + self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT)) + self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT)) + + self._discovery_timeout = reactor.callLater( + 6, self._on_discovery_timeout) + + return self._discovery + + #Private methods + def datagramReceived(self, dgram, address): + """ + This is private, handle the multicast answer from the upnp device. + """ + logging.debug("Got UPNP multicast search answer:\n%s", dgram) + + #This is an HTTP response + response, message = dgram.split('\r\n', 1) + + # Prepare status line + version, status, textstatus = response.split(None, 2) + + if not version.startswith('HTTP') or self._control_url != None: + return + if status != "200": + return + + # We had a timeout pending, cancel it + if self._discovery_timeout != None: + self._discovery_timeout.cancel() + self._discovery_timeout = None + + # Launch the info fetching + def parse_discovery_response(message): + """Separate headers and body from the received http answer.""" + hdict = {} + body = '' + remaining = message + while remaining: + line, remaining = remaining.split('\r\n', 1) + line = line.strip() + if not line: + body = remaining + break + key, val = line.split(':', 1) + key = key.lower() + hdict.setdefault(key, []).append(val.strip()) + return hdict, body + + headers, body = parse_discovery_response(message) + + if not 'location' in headers: + self._on_discovery_failed( + UPnPError( + "No location header in response to M-SEARCH!: %r"%headers)) + return + + loc = headers['location'][0] + result = client.getPage(url=loc) + result.addCallback(self._on_gateway_response, loc).addErrback( + self._on_discovery_failed) + + def _on_gateway_response(self, body, loc): + """ + Called with the UPnP device XML description fetched via HTTP. + + If the device has suitable services for ip discovery and port mappings, + the callback returned in L{search_device} is called with + the discovered L{UPnPDevice}. + + @raise UPnPError: When no suitable service has been + found in the description, or another error occurs. + @param body: The xml description of the device. + @param loc: the url used to retreive the xml description + """ + if self._control_url != None: + return + + # Parse answer + upnpinfo = UPnPXml(body) + + # Check if we have a base url, if not consider location as base url + urlbase = upnpinfo.urlbase + if urlbase == None: + urlbase = loc + + # Check the control url, if None, then the device cannot do what we want + controlurl = upnpinfo.controlurl + if controlurl == None: + self._on_discovery_failed( + UPnPError("upnp response showed no WANConnections")) + return + + self._control_url = urlparse.urljoin(urlbase, controlurl) + + soap_proxy = SoapProxy(self._control_url, upnpinfo.wanservice) + if self._discovery != None: + self._device = UPnPDevice(soap_proxy, upnpinfo.deviceinfos) + self._discovery.callback(self._device) + self._discovery = None + + def _on_discovery_failed(self, err): + """ + Called when the UPnP Device discovery has failed. + + The callback returned in L{search_device} is called with + an error, corresponding to the cause of the failure. + """ + self._control_url = None + if self._discovery != None: + self._discovery.errback(err) + self._discovery = None + + def _on_discovery_timeout(self): + """ + Called when the UPnP Device discovery has timed out. + + Calls L{_on_discovery_failed}. + """ + self._discovery_timeout = None + self._on_discovery_failed(UPnPError())