Add timestamp offset for block header
[p2pool.git] / nattraverso / pynupnp / upnp.py
1 """
2 This module is the heart of the upnp support. Device discover, ip discovery
3 and port mappings are implemented here.
4
5 @author: Raphael Slinckx
6 @author: Anthony Baxter
7 @copyright: Copyright 2005
8 @license: LGPL
9 @contact: U{raphael@slinckx.net<mailto:raphael@slinckx.net>}
10 @version: 0.1.0
11 """
12 __revision__ = "$id"
13
14 import socket, random, urlparse, logging
15
16 from twisted.internet import reactor, defer
17 from twisted.web import client
18 from twisted.internet.protocol import DatagramProtocol
19 from twisted.internet.error import CannotListenError
20 from twisted.python import failure
21
22 from nattraverso.pynupnp.soap import SoapProxy
23 from nattraverso.pynupnp.upnpxml import UPnPXml
24 from nattraverso import ipdiscover, portmapper
25
26 class UPnPError(Exception):
27     """
28     A generic UPnP error, with a descriptive message as content.
29     """
30     pass
31
32 class UPnPMapper(portmapper.NATMapper):
33     """
34     This is the UPnP port mapper implementing the
35     L{NATMapper<portmapper.NATMapper>} interface.
36     
37     @see: L{NATMapper<portmapper.NATMapper>}
38     """
39     
40     def __init__(self, upnp):
41         """
42         Creates the mapper, with the given L{UPnPDevice} instance.
43         
44         @param upnp: L{UPnPDevice} instance
45         """
46         self._mapped = {}
47         self._upnp = upnp
48     
49     def map(self, port):
50         """
51         See interface
52         """
53         self._check_valid_port(port)
54         
55         #Port is already mapped
56         if port in self._mapped:
57             return defer.succeed(self._mapped[port])
58         
59         #Trigger a new mapping creation, first fetch local ip.
60         result = ipdiscover.get_local_ip()
61         self._mapped[port] = result
62         return result.addCallback(self._map_got_local_ip, port)
63     
64     def info(self, port):
65         """
66         See interface
67         """
68         # If the mapping exists, everything's ok
69         if port in self._mapped:
70             return self._mapped[port]
71         else:
72             raise ValueError('Port %r is not currently mapped'%(port))
73     
74     def unmap(self, port):
75         """
76         See interface
77         """
78         if port in self._mapped:
79             existing = self._mapped[port]
80             
81             #Pending mapping, queue an unmap,return existing deferred
82             if type(existing) is not tuple:
83                 existing.addCallback(lambda x: self.unmap(port))
84                 return existing
85             
86             #Remove our local mapping
87             del self._mapped[port]
88             
89             #Ask the UPnP to remove the mapping
90             extaddr, extport = existing
91             return self._upnp.remove_port_mapping(extport, port.getHost().type)
92         else:
93             raise ValueError('Port %r is not currently mapped'%(port))
94     
95     def get_port_mappings(self):
96         """
97         See interface
98         """
99         return self._upnp.get_port_mappings()
100     
101     def _map_got_local_ip(self, ip_result, port):
102         """
103         We got the local ip address, retreive the existing port mappings
104         in the device.
105         
106         @param ip_result: result of L{ipdiscover.get_local_ip}
107         @param port: a L{twisted.internet.interfaces.IListeningPort} we
108             want to map
109         """
110         local, ip = ip_result
111         return self._upnp.get_port_mappings().addCallback(
112             self._map_got_port_mappings, ip, port)
113     
114     def _map_got_port_mappings(self, mappings, ip, port):
115         """
116         We got all the existing mappings in the device, find an unused one
117         and assign it for the requested port.
118         
119         @param ip: The local ip of this host "x.x.x.x"
120         @param port: a L{twisted.internet.interfaces.IListeningPort} we
121             want to map
122         @param mappings: result of L{UPnPDevice.get_port_mappings}
123         """
124         
125         #Get the requested mapping's info
126         ptype = port.getHost().type
127         intport = port.getHost().port
128         
129         for extport in [random.randrange(1025, 65536) for val in range(20)]:
130             # Check if there is an existing mapping, if it does not exist, bingo
131             if not (ptype, extport) in mappings:
132                 break
133             
134             if (ptype, extport) in mappings:
135                 existing = mappings[ptype, extport]
136             
137             local_ip, local_port = existing
138             if local_ip == ip and local_port == intport:
139                 # Existing binding for this host/port/proto - replace it
140                 break
141         
142         # Triggers the creation of the mapping on the device
143         result = self._upnp.add_port_mapping(ip, intport, extport, 'pynupnp', ptype)
144         
145         # We also need the external IP, so we queue first an
146         # External IP Discovery, then we add the mapping.
147         return result.addCallback(
148             lambda x: self._upnp.get_external_ip()).addCallback(
149                 self._port_mapping_added, extport, port)
150     
151     def _port_mapping_added(self, extaddr, extport, port):
152         """
153         The port mapping was added in the device, this means::
154             
155             Internet        NAT         LAN
156                 |
157         > IP:extaddr       |>       IP:local ip
158             > Port:extport     |>       Port:port
159                 |
160         
161         @param extaddr: The exernal ip address
162         @param extport: The external port as number
163         @param port: The internal port as a
164             L{twisted.internet.interfaces.IListeningPort} object, that has been
165             mapped
166         """
167         self._mapped[port] = (extaddr, extport)
168         return (extaddr, extport)
169
170 class UPnPDevice:
171     """
172     Represents an UPnP device, with the associated infos, and remote methods.
173     """
174     def __init__(self, soap_proxy, info):
175         """
176         Build the device, with the given SOAP proxy, and the meta-infos.
177         
178         @param soap_proxy: an initialized L{SoapProxy} to the device
179         @param info: a dictionnary of various infos concerning the
180             device extracted with L{UPnPXml}
181         """
182         self._soap_proxy = soap_proxy
183         self._info = info
184     
185     def get_external_ip(self):
186         """
187         Triggers an external ip discovery on the upnp device. Returns
188         a deferred called with the external ip of this host.
189         
190         @return: A deferred called with the ip address, as "x.x.x.x"
191         @rtype: L{twisted.internet.defer.Deferred}
192         """
193         result = self._soap_proxy.call('GetExternalIPAddress')
194         result.addCallback(self._on_external_ip)
195         return result
196     
197     def get_port_mappings(self):
198         """
199         Retreive the existing port mappings
200         
201         @see: L{portmapper.NATMapper.get_port_mappings}
202         @return: A deferred called with the dictionnary as defined
203             in the interface L{portmapper.NATMapper.get_port_mappings}
204         @rtype: L{twisted.internet.defer.Deferred}
205         """
206         return self._get_port_mapping()
207     
208     def add_port_mapping(self, local_ip, intport, extport, desc, proto, lease=0):
209         """
210         Add a port mapping in the upnp device. Returns a deferred.
211         
212         @param local_ip: the LAN ip of this host as "x.x.x.x"
213         @param intport: the internal port number
214         @param extport: the external port number
215         @param desc: the description of this mapping (string)
216         @param proto: "UDP" or "TCP"
217         @param lease: The duration of the lease in (mili)seconds(??)
218         @return: A deferred called with None when the mapping is done
219         @rtype: L{twisted.internet.defer.Deferred}
220         """
221         result = self._soap_proxy.call('AddPortMapping', NewRemoteHost="",
222             NewExternalPort=extport,
223             NewProtocol=proto,
224             NewInternalPort=intport,
225             NewInternalClient=local_ip,
226             NewEnabled=1,
227             NewPortMappingDescription=desc,
228             NewLeaseDuration=lease)
229         
230         return result.addCallbacks(self._on_port_mapping_added,
231             self._on_no_port_mapping_added)
232     
233     def remove_port_mapping(self, extport, proto):
234         """
235         Remove an existing port mapping on the device. Returns a deferred
236         
237         @param extport: the external port number associated to the mapping
238             to be removed
239         @param proto: either "UDP" or "TCP"
240         @return: A deferred called with None when the mapping is done
241         @rtype: L{twisted.internet.defer.Deferred}
242         """
243         result = self._soap_proxy.call('DeletePortMapping', NewRemoteHost="",
244             NewExternalPort=extport,
245             NewProtocol=proto)
246         
247         return result.addCallbacks(self._on_port_mapping_removed,
248             self._on_no_port_mapping_removed)
249     
250     # Private --------
251     def _on_external_ip(self, res):
252         """
253         Called when we received the external ip address from the device.
254         
255         @param res: the SOAPpy structure of the result
256         @return: the external ip string, as "x.x.x.x"
257         """
258         logging.debug("Got external ip struct: %r", res)
259         return res['NewExternalIPAddress']
260     
261     def _get_port_mapping(self, mapping_id=0, mappings=None):
262         """
263         Fetch the existing mappings starting at index
264         "mapping_id" from the device.
265         
266         To retreive all the mappings call this without parameters.
267         
268         @param mapping_id: The index of the mapping to start fetching from
269         @param mappings: the dictionnary of already fetched mappings
270         @return: A deferred called with the existing mappings when all have been
271             retreived, see L{get_port_mappings}
272         @rtype: L{twisted.internet.defer.Deferred}
273         """
274         if mappings == None:
275             mappings = {}
276         
277         result = self._soap_proxy.call('GetGenericPortMappingEntry',
278             NewPortMappingIndex=mapping_id)
279         return result.addCallbacks(
280             lambda x: self._on_port_mapping_received(x, mapping_id+1, mappings),
281             lambda x: self._on_no_port_mapping_received(        x, mappings))
282     
283     def _on_port_mapping_received(self, response, mapping_id, mappings):
284         """
285         Called we we receive a single mapping from the device.
286         
287         @param response: a SOAPpy structure, representing the device's answer
288         @param mapping_id: The index of the next mapping in the device
289         @param mappings: the already fetched mappings, see L{get_port_mappings}
290         @return: A deferred called with the existing mappings when all have been
291             retreived, see L{get_port_mappings}
292         @rtype: L{twisted.internet.defer.Deferred}
293         """
294         logging.debug("Got mapping struct: %r", response)
295         mappings[
296             response['NewProtocol'], response['NewExternalPort']
297         ] = (response['NewInternalClient'], response['NewInternalPort'])
298         return self._get_port_mapping(mapping_id, mappings)
299     
300     def _on_no_port_mapping_received(self, failure, mappings):
301         """
302         Called when we have no more port mappings to retreive, or an
303         error occured while retreiving them.
304         
305         Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
306         it just means we have finished. If it returns some other error, then we
307         fail with an UPnPError.
308         
309         @param mappings: the already retreived mappings
310         @param failure: the failure
311         @return: The existing mappings as defined in L{get_port_mappings}
312         @raise UPnPError: When we got any other error
313             than "SpecifiedArrayIndexInvalid"
314         """
315         logging.debug("_on_no_port_mapping_received: %s", failure)
316         err = failure.value
317         message = err.args[0]["UPnPError"]["errorDescription"]
318         if "SpecifiedArrayIndexInvalid" == message:
319             return mappings
320         else:
321             return failure
322     
323     
324     def _on_port_mapping_added(self, response):
325         """
326         The port mapping was successfully added, return None to the deferred.
327         """
328         return None
329     
330     def _on_no_port_mapping_added(self, failure):
331         """
332         Called when the port mapping could not be added. Immediately
333         raise an UPnPError, with the SOAPpy structure inside.
334         
335         @raise UPnPError: When the port mapping could not be added
336         """
337         return failure
338     
339     def _on_port_mapping_removed(self, response):
340         """
341         The port mapping was successfully removed, return None to the deferred.
342         """
343         return None
344     
345     def _on_no_port_mapping_removed(self, failure):
346         """
347         Called when the port mapping could not be removed. Immediately
348         raise an UPnPError, with the SOAPpy structure inside.
349         
350         @raise UPnPError: When the port mapping could not be deleted
351         """
352         return failure
353
354 # UPNP multicast address, port and request string
355 _UPNP_MCAST = '239.255.255.250'
356 _UPNP_PORT = 1900
357 _UPNP_SEARCH_REQUEST = """M-SEARCH * HTTP/1.1\r
358 Host:%s:%s\r
359 ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r
360 Man:"ssdp:discover"\r
361 MX:3\r
362 \r
363 """ % (_UPNP_MCAST, _UPNP_PORT)
364
365 class UPnPProtocol(DatagramProtocol, object):
366     """
367     The UPnP Device discovery udp multicast twisted protocol.
368     """
369     
370     def __init__(self, *args, **kwargs):
371         """
372         Init the protocol, no parameters needed.
373         """
374         super(UPnPProtocol, self).__init__(*args, **kwargs)
375         
376         #Device discovery deferred
377         self._discovery = None
378         self._discovery_timeout = None
379         self.mcast = None
380         self._done = False
381     
382     # Public methods
383     def search_device(self):
384         """
385         Triggers a UPnP device discovery.
386         
387         The returned deferred will be called with the L{UPnPDevice} that has
388         been found in the LAN.
389         
390         @return: A deferred called with the detected L{UPnPDevice} instance.
391         @rtype: L{twisted.internet.defer.Deferred}
392         """
393         if self._discovery is not None:
394             raise ValueError('already used')
395         self._discovery = defer.Deferred()
396         self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
397         
398         attempt = 0
399         mcast = None
400         while True:
401             try:
402                 self.mcast = reactor.listenMulticast(1900+attempt, self)
403                 break
404             except CannotListenError:
405                 attempt = random.randint(0, 500)
406         
407         # joined multicast group, starting upnp search
408         self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
409         
410         self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
411         self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
412         self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
413         
414         return self._discovery
415     
416     #Private methods
417     def datagramReceived(self, dgram, address):
418         if self._done:
419             return
420         """
421         This is private, handle the multicast answer from the upnp device.
422         """
423         logging.debug("Got UPNP multicast search answer:\n%s", dgram)
424         
425         #This is an HTTP response
426         response, message = dgram.split('\r\n', 1)
427         
428         # Prepare status line
429         version, status, textstatus = response.split(None, 2)
430         
431         if not version.startswith('HTTP'):
432             return
433         if status != "200":
434             return
435         
436         # Launch the info fetching
437         def parse_discovery_response(message):
438             """Separate headers and body from the received http answer."""
439             hdict = {}
440             body = ''
441             remaining = message
442             while remaining:
443                 line, remaining = remaining.split('\r\n', 1)
444                 line = line.strip()
445                 if not line:
446                     body = remaining
447                     break
448                 key, val = line.split(':', 1)
449                 key = key.lower()
450                 hdict.setdefault(key, []).append(val.strip())
451             return hdict, body
452         
453         headers, body = parse_discovery_response(message)
454         
455         if not 'location' in headers:
456             self._on_discovery_failed(
457                 UPnPError(
458                     "No location header in response to M-SEARCH!: %r"%headers))
459             return
460         
461         loc = headers['location'][0]
462         result = client.getPage(url=loc)
463         result.addCallback(self._on_gateway_response, loc).addErrback(self._on_discovery_failed)
464     
465     def _on_gateway_response(self, body, loc):
466         if self._done:
467             return
468         """
469         Called with the UPnP device XML description fetched via HTTP.
470         
471         If the device has suitable services for ip discovery and port mappings,
472         the callback returned in L{search_device} is called with
473         the discovered L{UPnPDevice}.
474         
475         @raise UPnPError: When no suitable service has been
476             found in the description, or another error occurs.
477         @param body: The xml description of the device.
478         @param loc: the url used to retreive the xml description
479         """
480         
481         # Parse answer
482         upnpinfo = UPnPXml(body)
483         
484         # Check if we have a base url, if not consider location as base url
485         urlbase = upnpinfo.urlbase
486         if urlbase == None:
487             urlbase = loc
488         
489         # Check the control url, if None, then the device cannot do what we want
490         controlurl = upnpinfo.controlurl
491         if controlurl == None:
492             self._on_discovery_failed(UPnPError("upnp response showed no WANConnections"))
493             return
494         
495         control_url2 = urlparse.urljoin(urlbase, controlurl)
496         soap_proxy = SoapProxy(control_url2, upnpinfo.wanservice)
497         self._on_discovery_succeeded(UPnPDevice(soap_proxy, upnpinfo.deviceinfos))
498     
499     def _on_discovery_succeeded(self, res):
500         if self._done:
501             return
502         self._done = True
503         self.mcast.stopListening()
504         self._discovery_timeout.cancel()
505         self._discovery.callback(res)
506     
507     def _on_discovery_failed(self, err):
508         if self._done:
509             return
510         self._done = True
511         self.mcast.stopListening()
512         self._discovery_timeout.cancel()
513         self._discovery.errback(err)
514     
515     def _on_discovery_timeout(self):
516         if self._done:
517             return
518         self._done = True
519         self.mcast.stopListening()
520         self._discovery.errback(failure.Failure(defer.TimeoutError('in _on_discovery_timeout')))
521
522 def search_upnp_device ():
523     """
524     Check the network for an UPnP device. Returns a deferred
525     with the L{UPnPDevice} instance as result, if found.
526     
527     @return: A deferred called with the L{UPnPDevice} instance
528     @rtype: L{twisted.internet.defer.Deferred}
529     """
530     return defer.maybeDeferred(UPnPProtocol().search_device)