try pyasn1-modules as well
[electrum-nvc.git] / lib / x509.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from datetime import datetime
21 import sys
22
23 try:
24     import pyasn1
25 except ImportError:
26     sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1'")
27
28 try:
29     import pyasn1_modules
30 except ImportError:
31     sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1-modules'")
32
33 try:
34     import tlslite
35 except ImportError:
36     sys.exit("Error: tlslite does not seem to be installed. Try 'sudo pip install tlslite'")
37
38
39
40 from pyasn1.codec.der import decoder, encoder
41 from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
42 from pyasn1.type.char import BMPString, IA5String, UTF8String
43 from pyasn1.type.useful import GeneralizedTime
44 from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
45                                     SubjectAltName, GeneralNames,
46                                     GeneralName)
47 from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
48 from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
49 from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
50 from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
51 XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
52 SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
53 ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
54
55
56 class CertificateError(Exception):
57     pass
58
59 def decode_str(data):
60     encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
61     return bytes(data).decode(encoding)
62
63
64 class X509(tlslite.X509):
65     """Child class of tlslite.X509 that uses pyasn1 to parse cert
66     information. Note: pyasn1 is a lot slower than tlslite, so we
67     should try to do everything in tlslite.
68     """
69
70     def slow_parse(self):
71         self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
72         self.tbs = self.cert.getComponentByName('tbsCertificate')
73         self.subject = self.tbs.getComponentByName('subject')
74         self.extensions = self.tbs.getComponentByName('extensions') or []
75
76     def extract_names(self):
77         results = {'CN': None,
78                    'DNS': set(),
79                    'SRV': set(),
80                    'URI': set(),
81                    'XMPPAddr': set(), 
82                    'OU': None,}
83   
84         # Extract the CommonName(s) from the cert.
85         for rdnss in self.subject:
86             for rdns in rdnss:
87                 for name in rdns:
88                     oid = name.getComponentByName('type')
89                     value = name.getComponentByName('value')
90   
91                     if oid == COMMON_NAME:
92                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
93                         value = decode_str(value.getComponent())
94                         results['CN'] = value
95
96                     elif oid == OU_NAME:
97                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
98                         value = decode_str(value.getComponent())
99                         results['OU'] = value
100
101         # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
102         for extension in self.extensions:
103             oid = extension.getComponentByName('extnID')
104             if oid != SUBJECT_ALT_NAME:
105                 continue
106   
107             value = decoder.decode(extension.getComponentByName('extnValue'),
108                                asn1Spec=OctetString())[0]
109             sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
110             for name in sa_names:
111                 name_type = name.getName()
112                 if name_type == 'dNSName':
113                     results['DNS'].add(decode_str(name.getComponent()))
114                 if name_type == 'uniformResourceIdentifier':
115                     value = decode_str(name.getComponent())
116                     if value.startswith('xmpp:'):
117                         results['URI'].add(value[5:])
118                 elif name_type == 'otherName':
119                     name = name.getComponent()
120   
121                     oid = name.getComponentByName('type-id')
122                     value = name.getComponentByName('value')
123   
124                     if oid == XMPP_ADDR:
125                         value = decoder.decode(value, asn1Spec=UTF8String())[0]
126                         results['XMPPAddr'].add(decode_str(value))
127                     elif oid == SRV_NAME:
128                         value = decoder.decode(value, asn1Spec=IA5String())[0]
129                         results['SRV'].add(decode_str(value))
130         return results
131
132
133     def check_ca(self):
134         for extension in self.extensions:
135             oid = extension.getComponentByName('extnID')
136             if oid != id_ce_basicConstraints:
137                 continue
138             value = decoder.decode(extension.getComponentByName('extnValue'),
139                                asn1Spec=OctetString())[0]
140             constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
141             return bool(constraints[0])
142
143     def extract_sig(self):
144         signature = self.cert.getComponentByName('signatureValue')
145         algorithm = self.cert.getComponentByName('signatureAlgorithm')
146         data = encoder.encode(self.tbs)
147         s = encoder.encode(signature)
148         return algorithm, s, data
149
150
151     def extract_pubkey(self):
152         pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
153         algo = pki.getComponentByName('algorithm')
154         algorithm = algo.getComponentByName('algorithm')
155         parameters = algo.getComponentByName('parameters')
156         subjectPublicKey = pki.getComponentByName('subjectPublicKey')
157         return algorithm, parameters, encoder.encode(subjectPublicKey)
158
159
160     def extract_dates(self):
161         validity = self.tbs.getComponentByName('validity')
162         not_before = validity.getComponentByName('notBefore')
163         not_before = str(not_before.getComponent())
164         not_after = validity.getComponentByName('notAfter')
165         not_after = str(not_after.getComponent())
166         if isinstance(not_before, GeneralizedTime):
167             not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
168         else:
169             not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
170         if isinstance(not_after, GeneralizedTime):
171             not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
172         else:
173             not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
174         return not_before, not_after
175
176     def get_ttl(self):
177         not_before, not_after = self.extract_dates()
178         if not_after is None:
179             return None
180         return not_after - datetime.utcnow()
181
182     def check_date(self):
183         not_before, not_after = self.extract_dates()
184         now = datetime.utcnow()
185         if not_before > now:
186             raise CertificateError(
187                 'Certificate has not entered its valid date range.')
188         if not_after <= now:
189             raise CertificateError(
190                 'Certificate has expired.')
191
192     def check_name(self, expected):
193         cert_names = self.extract_names()
194         if '.' in expected:
195             expected_wild = expected[expected.index('.'):]
196         else:
197             expected_wild = expected
198         expected_srv = '_xmpp-client.%s' % expected
199         for name in cert_names['XMPPAddr']:
200             if name == expected:
201                 return True
202         for name in cert_names['SRV']:
203             if name == expected_srv or name == expected:
204                 return True
205         for name in cert_names['DNS']:
206             if name == expected:
207                 return True
208             if name.startswith('*'):
209                 if '.' in name:
210                     name_wild = name[name.index('.'):]
211                 else:
212                     name_wild = name
213                 if expected_wild == name_wild:
214                     return True
215         for name in cert_names['URI']:
216             if name == expected:
217                 return True
218         if cert_names['CN'] == expected:
219             return True
220         raise CertificateError(
221             'Could not match certficate against hostname: %s' % expected)
222
223
224 class X509CertChain(tlslite.X509CertChain):
225     pass