--- /dev/null
+"""
+This module is the heart of the upnp support. Device discover, ip discovery
+and port mappings are implemented here.
+
+@author: Raphael Slinckx
+@author: Anthony Baxter
+@copyright: Copyright 2005
+@license: LGPL
+@contact: U{raphael@slinckx.net<mailto:raphael@slinckx.net>}
+@version: 0.1.0
+"""
+__revision__ = "$id"
+
+import socket, random, urlparse, logging
+
+from twisted.internet import reactor, defer
+from twisted.web import client
+from twisted.internet.protocol import DatagramProtocol
+from twisted.internet.error import CannotListenError
+
+from nattraverso.pynupnp.soap import SoapProxy
+from nattraverso.pynupnp.upnpxml import UPnPXml
+from nattraverso import ipdiscover, portmapper
+
+class UPnPError(Exception):
+ """
+ A generic UPnP error, with a descriptive message as content.
+ """
+ pass
+
+def search_upnp_device ():
+ """
+ Check the network for an UPnP device. Returns a deferred
+ with the L{UPnPDevice} instance as result, if found.
+
+ @return: A deferred called with the L{UPnPDevice} instance
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ try:
+ return UPnPProtocol().search_device()
+ except Exception, msg:
+ return defer.fail(UPnPError(msg))
+
+class UPnPMapper(portmapper.NATMapper):
+ """
+ This is the UPnP port mapper implementing the
+ L{NATMapper<portmapper.NATMapper>} interface.
+
+ @see: L{NATMapper<portmapper.NATMapper>}
+ """
+
+ def __init__(self, upnp):
+ """
+ Creates the mapper, with the given L{UPnPDevice} instance.
+
+ @param upnp: L{UPnPDevice} instance
+ """
+ self._mapped = {}
+ self._upnp = upnp
+
+ def map(self, port):
+ """
+ See interface
+ """
+ self._check_valid_port(port)
+
+ #Port is already mapped
+ if port in self._mapped:
+ return defer.succeed(self._mapped[port])
+
+ #Trigger a new mapping creation, first fetch local ip.
+ result = ipdiscover.get_local_ip()
+ self._mapped[port] = result
+ return result.addCallback(self._map_got_local_ip, port)
+
+ def info(self, port):
+ """
+ See interface
+ """
+ # If the mapping exists, everything's ok
+ if port in self._mapped:
+ return self._mapped[port]
+ else:
+ raise ValueError('Port %r is not currently mapped'%(port))
+
+ def unmap(self, port):
+ """
+ See interface
+ """
+ if port in self._mapped:
+ existing = self._mapped[port]
+
+ #Pending mapping, queue an unmap,return existing deferred
+ if type(existing) is not tuple:
+ existing.addCallback(lambda x: self.unmap(port))
+ return existing
+
+ #Remove our local mapping
+ del self._mapped[port]
+
+ #Ask the UPnP to remove the mapping
+ extaddr, extport = existing
+ return self._upnp.remove_port_mapping(extport, port.getHost().type)
+ else:
+ raise ValueError('Port %r is not currently mapped'%(port))
+
+ def get_port_mappings(self):
+ """
+ See interface
+ """
+ return self._upnp.get_port_mappings()
+
+ def _map_got_local_ip(self, ip_result, port):
+ """
+ We got the local ip address, retreive the existing port mappings
+ in the device.
+
+ @param ip_result: result of L{ipdiscover.get_local_ip}
+ @param port: a L{twisted.internet.interfaces.IListeningPort} we
+ want to map
+ """
+ local, ip = ip_result
+ return self._upnp.get_port_mappings().addCallback(
+ self._map_got_port_mappings, ip, port)
+
+ def _map_got_port_mappings(self, mappings, ip, port):
+ """
+ We got all the existing mappings in the device, find an unused one
+ and assign it for the requested port.
+
+ @param ip: The local ip of this host "x.x.x.x"
+ @param port: a L{twisted.internet.interfaces.IListeningPort} we
+ want to map
+ @param mappings: result of L{UPnPDevice.get_port_mappings}
+ """
+
+ #Get the requested mapping's info
+ ptype = port.getHost().type
+ intport = port.getHost().port
+
+ for extport in [random.randrange(1025, 65536) for val in range(20)]:
+ # Check if there is an existing mapping, if it does not exist, bingo
+ if not (ptype, extport) in mappings:
+ break
+
+ if (ptype, extport) in mappings:
+ existing = mappings[ptype, extport]
+
+ local_ip, local_port = existing
+ if local_ip == ip and local_port == intport:
+ # Existing binding for this host/port/proto - replace it
+ break
+
+ # Triggers the creation of the mapping on the device
+ result = self._upnp.add_port_mapping(ip, intport, extport, 'pynupnp', ptype)
+
+ # We also need the external IP, so we queue first an
+ # External IP Discovery, then we add the mapping.
+ return result.addCallback(
+ lambda x: self._upnp.get_external_ip()).addCallback(
+ self._port_mapping_added, extport, port)
+
+ def _port_mapping_added(self, extaddr, extport, port):
+ """
+ The port mapping was added in the device, this means::
+
+ Internet NAT LAN
+ |
+ > IP:extaddr |> IP:local ip
+ > Port:extport |> Port:port
+ |
+
+ @param extaddr: The exernal ip address
+ @param extport: The external port as number
+ @param port: The internal port as a
+ L{twisted.internet.interfaces.IListeningPort} object, that has been
+ mapped
+ """
+ self._mapped[port] = (extaddr, extport)
+ return (extaddr, extport)
+
+class UPnPDevice:
+ """
+ Represents an UPnP device, with the associated infos, and remote methods.
+ """
+ def __init__(self, soap_proxy, info):
+ """
+ Build the device, with the given SOAP proxy, and the meta-infos.
+
+ @param soap_proxy: an initialized L{SoapProxy} to the device
+ @param info: a dictionnary of various infos concerning the
+ device extracted with L{UPnPXml}
+ """
+ self._soap_proxy = soap_proxy
+ self._info = info
+
+ def get_external_ip(self):
+ """
+ Triggers an external ip discovery on the upnp device. Returns
+ a deferred called with the external ip of this host.
+
+ @return: A deferred called with the ip address, as "x.x.x.x"
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ result = self._soap_proxy.call('GetExternalIPAddress')
+ result.addCallback(self._on_external_ip)
+ return result
+
+ def get_port_mappings(self):
+ """
+ Retreive the existing port mappings
+
+ @see: L{portmapper.NATMapper.get_port_mappings}
+ @return: A deferred called with the dictionnary as defined
+ in the interface L{portmapper.NATMapper.get_port_mappings}
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ return self._get_port_mapping()
+
+ def add_port_mapping(self, local_ip, intport, extport, desc, proto, lease=0):
+ """
+ Add a port mapping in the upnp device. Returns a deferred.
+
+ @param local_ip: the LAN ip of this host as "x.x.x.x"
+ @param intport: the internal port number
+ @param extport: the external port number
+ @param desc: the description of this mapping (string)
+ @param proto: "UDP" or "TCP"
+ @param lease: The duration of the lease in (mili)seconds(??)
+ @return: A deferred called with None when the mapping is done
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ result = self._soap_proxy.call('AddPortMapping', NewRemoteHost="",
+ NewExternalPort=extport,
+ NewProtocol=proto,
+ NewInternalPort=intport,
+ NewInternalClient=local_ip,
+ NewEnabled=1,
+ NewPortMappingDescription=desc,
+ NewLeaseDuration=lease)
+
+ return result.addCallbacks(self._on_port_mapping_added,
+ self._on_no_port_mapping_added)
+
+ def remove_port_mapping(self, extport, proto):
+ """
+ Remove an existing port mapping on the device. Returns a deferred
+
+ @param extport: the external port number associated to the mapping
+ to be removed
+ @param proto: either "UDP" or "TCP"
+ @return: A deferred called with None when the mapping is done
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ result = self._soap_proxy.call('DeletePortMapping', NewRemoteHost="",
+ NewExternalPort=extport,
+ NewProtocol=proto)
+
+ return result.addCallbacks(self._on_port_mapping_removed,
+ self._on_no_port_mapping_removed)
+
+ # Private --------
+ def _on_external_ip(self, res):
+ """
+ Called when we received the external ip address from the device.
+
+ @param res: the SOAPpy structure of the result
+ @return: the external ip string, as "x.x.x.x"
+ """
+ logging.debug("Got external ip struct: %r", res)
+ return res['NewExternalIPAddress']
+
+ def _get_port_mapping(self, mapping_id=0, mappings=None):
+ """
+ Fetch the existing mappings starting at index
+ "mapping_id" from the device.
+
+ To retreive all the mappings call this without parameters.
+
+ @param mapping_id: The index of the mapping to start fetching from
+ @param mappings: the dictionnary of already fetched mappings
+ @return: A deferred called with the existing mappings when all have been
+ retreived, see L{get_port_mappings}
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ if mappings == None:
+ mappings = {}
+
+ result = self._soap_proxy.call('GetGenericPortMappingEntry',
+ NewPortMappingIndex=mapping_id)
+ return result.addCallbacks(
+ lambda x: self._on_port_mapping_received(x, mapping_id+1, mappings),
+ lambda x: self._on_no_port_mapping_received( x, mappings))
+
+ def _on_port_mapping_received(self, response, mapping_id, mappings):
+ """
+ Called we we receive a single mapping from the device.
+
+ @param response: a SOAPpy structure, representing the device's answer
+ @param mapping_id: The index of the next mapping in the device
+ @param mappings: the already fetched mappings, see L{get_port_mappings}
+ @return: A deferred called with the existing mappings when all have been
+ retreived, see L{get_port_mappings}
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ logging.debug("Got mapping struct: %r", response)
+ mappings[
+ response['NewProtocol'], response['NewExternalPort']
+ ] = (response['NewInternalClient'], response['NewInternalPort'])
+ return self._get_port_mapping(mapping_id, mappings)
+
+ def _on_no_port_mapping_received(self, failure, mappings):
+ """
+ Called when we have no more port mappings to retreive, or an
+ error occured while retreiving them.
+
+ Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
+ it just means we have finished. If it returns some other error, then we
+ fail with an UPnPError.
+
+ @param mappings: the already retreived mappings
+ @param failure: the failure
+ @return: The existing mappings as defined in L{get_port_mappings}
+ @raise UPnPError: When we got any other error
+ than "SpecifiedArrayIndexInvalid"
+ """
+ logging.debug("_on_no_port_mapping_received: %s", failure)
+ err = failure.value
+ try:
+ message = err.args[0]["UPnPError"]["errorDescription"]
+ if "SpecifiedArrayIndexInvalid" == message:
+ return mappings
+ else:
+ raise UPnPError("GetGenericPortMappingEntry got %s"%(message))
+ except:
+ raise UPnPError("GetGenericPortMappingEntry got %s"%(err.args[0]))
+
+
+ def _on_port_mapping_added(self, response):
+ """
+ The port mapping was successfully added, return None to the deferred.
+ """
+ return None
+
+ def _on_no_port_mapping_added(self, failure):
+ """
+ Called when the port mapping could not be added. Immediately
+ raise an UPnPError, with the SOAPpy structure inside.
+
+ @raise UPnPError: When the port mapping could not be added
+ """
+ raise UPnPError(failure.value.args[0])
+
+ def _on_port_mapping_removed(self, response):
+ """
+ The port mapping was successfully removed, return None to the deferred.
+ """
+ return None
+
+ def _on_no_port_mapping_removed(self, failure):
+ """
+ Called when the port mapping could not be removed. Immediately
+ raise an UPnPError, with the SOAPpy structure inside.
+
+ @raise UPnPError: When the port mapping could not be deleted
+ """
+ raise UPnPError(failure.value.args[0])
+
+# UPNP multicast address, port and request string
+_UPNP_MCAST = '239.255.255.250'
+_UPNP_PORT = 1900
+_UPNP_SEARCH_REQUEST = """M-SEARCH * HTTP/1.1\r
+Host:%s:%s\r
+ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r
+Man:"ssdp:discover"\r
+MX:3\r
+\r
+""" % (_UPNP_MCAST, _UPNP_PORT)
+
+class UPnPProtocol(DatagramProtocol, object):
+ """
+ The UPnP Device discovery udp multicast twisted protocol.
+ """
+
+ def __init__(self, *args, **kwargs):
+ """
+ Init the protocol, no parameters needed.
+ """
+ super(UPnPProtocol, self).__init__(*args, **kwargs)
+
+ # Url to use to talk to upnp device
+ self._control_url = None
+ self._device = None
+
+ #Device discovery deferred
+ self._discovery = None
+ self._discovery_timeout = None
+
+ # Public methods
+ def search_device(self):
+ """
+ Triggers a UPnP device discovery.
+
+ The returned deferred will be called with the L{UPnPDevice} that has
+ been found in the LAN.
+
+ @return: A deferred called with the detected L{UPnPDevice} instance.
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ self._discovery = defer.Deferred()
+
+ attempt = 0
+ mcast = None
+ while True:
+ try:
+ mcast = reactor.listenMulticast(1900+attempt, self)
+ break
+ except CannotListenError:
+ attempt = random.randint(0, 500)
+
+ # joined multicast group, starting upnp search
+ mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
+
+ self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
+ self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
+ self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
+
+ self._discovery_timeout = reactor.callLater(
+ 6, self._on_discovery_timeout)
+
+ return self._discovery
+
+ #Private methods
+ def datagramReceived(self, dgram, address):
+ """
+ This is private, handle the multicast answer from the upnp device.
+ """
+ logging.debug("Got UPNP multicast search answer:\n%s", dgram)
+
+ #This is an HTTP response
+ response, message = dgram.split('\r\n', 1)
+
+ # Prepare status line
+ version, status, textstatus = response.split(None, 2)
+
+ if not version.startswith('HTTP') or self._control_url != None:
+ return
+ if status != "200":
+ return
+
+ # We had a timeout pending, cancel it
+ if self._discovery_timeout != None:
+ self._discovery_timeout.cancel()
+ self._discovery_timeout = None
+
+ # Launch the info fetching
+ def parse_discovery_response(message):
+ """Separate headers and body from the received http answer."""
+ hdict = {}
+ body = ''
+ remaining = message
+ while remaining:
+ line, remaining = remaining.split('\r\n', 1)
+ line = line.strip()
+ if not line:
+ body = remaining
+ break
+ key, val = line.split(':', 1)
+ key = key.lower()
+ hdict.setdefault(key, []).append(val.strip())
+ return hdict, body
+
+ headers, body = parse_discovery_response(message)
+
+ if not 'location' in headers:
+ self._on_discovery_failed(
+ UPnPError(
+ "No location header in response to M-SEARCH!: %r"%headers))
+ return
+
+ loc = headers['location'][0]
+ result = client.getPage(url=loc)
+ result.addCallback(self._on_gateway_response, loc).addErrback(
+ self._on_discovery_failed)
+
+ def _on_gateway_response(self, body, loc):
+ """
+ Called with the UPnP device XML description fetched via HTTP.
+
+ If the device has suitable services for ip discovery and port mappings,
+ the callback returned in L{search_device} is called with
+ the discovered L{UPnPDevice}.
+
+ @raise UPnPError: When no suitable service has been
+ found in the description, or another error occurs.
+ @param body: The xml description of the device.
+ @param loc: the url used to retreive the xml description
+ """
+ if self._control_url != None:
+ return
+
+ # Parse answer
+ upnpinfo = UPnPXml(body)
+
+ # Check if we have a base url, if not consider location as base url
+ urlbase = upnpinfo.urlbase
+ if urlbase == None:
+ urlbase = loc
+
+ # Check the control url, if None, then the device cannot do what we want
+ controlurl = upnpinfo.controlurl
+ if controlurl == None:
+ self._on_discovery_failed(
+ UPnPError("upnp response showed no WANConnections"))
+ return
+
+ self._control_url = urlparse.urljoin(urlbase, controlurl)
+
+ soap_proxy = SoapProxy(self._control_url, upnpinfo.wanservice)
+ if self._discovery != None:
+ self._device = UPnPDevice(soap_proxy, upnpinfo.deviceinfos)
+ self._discovery.callback(self._device)
+ self._discovery = None
+
+ def _on_discovery_failed(self, err):
+ """
+ Called when the UPnP Device discovery has failed.
+
+ The callback returned in L{search_device} is called with
+ an error, corresponding to the cause of the failure.
+ """
+ self._control_url = None
+ if self._discovery != None:
+ self._discovery.errback(err)
+ self._discovery = None
+
+ def _on_discovery_timeout(self):
+ """
+ Called when the UPnP Device discovery has timed out.
+
+ Calls L{_on_discovery_failed}.
+ """
+ self._discovery_timeout = None
+ self._on_discovery_failed(UPnPError())