e2021df47dc45d8b484a640bea81c6c5fbab9139
[electrum-nvc.git] / lib / x509.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from datetime import datetime
21 import sys
22
23 try:
24     import pyasn1
25 except ImportError:
26     sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1'")
27
28 try:
29     import tlslite
30 except ImportError:
31     sys.exit("Error: tlslite does not seem to be installed. Try 'sudo pip install tlslite'")
32
33
34
35 from pyasn1.codec.der import decoder, encoder
36 from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
37 from pyasn1.type.char import BMPString, IA5String, UTF8String
38 from pyasn1.type.useful import GeneralizedTime
39 from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
40                                     SubjectAltName, GeneralNames,
41                                     GeneralName)
42 from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
43 from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
44 from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
45 from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
46 XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
47 SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
48 ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
49
50
51 class CertificateError(Exception):
52     pass
53
54 def decode_str(data):
55     encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
56     return bytes(data).decode(encoding)
57
58
59 class X509(tlslite.X509):
60     """Child class of tlslite.X509 that uses pyasn1 to parse cert
61     information. Note: pyasn1 is a lot slower than tlslite, so we
62     should try to do everything in tlslite.
63     """
64
65     def slow_parse(self):
66         self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
67         self.tbs = self.cert.getComponentByName('tbsCertificate')
68         self.subject = self.tbs.getComponentByName('subject')
69         self.extensions = self.tbs.getComponentByName('extensions') or []
70
71     def extract_names(self):
72         results = {'CN': None,
73                    'DNS': set(),
74                    'SRV': set(),
75                    'URI': set(),
76                    'XMPPAddr': set(), 
77                    'OU': None,}
78   
79         # Extract the CommonName(s) from the cert.
80         for rdnss in self.subject:
81             for rdns in rdnss:
82                 for name in rdns:
83                     oid = name.getComponentByName('type')
84                     value = name.getComponentByName('value')
85   
86                     if oid == COMMON_NAME:
87                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
88                         value = decode_str(value.getComponent())
89                         results['CN'] = value
90
91                     elif oid == OU_NAME:
92                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
93                         value = decode_str(value.getComponent())
94                         results['OU'] = value
95
96         # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
97         for extension in self.extensions:
98             oid = extension.getComponentByName('extnID')
99             if oid != SUBJECT_ALT_NAME:
100                 continue
101   
102             value = decoder.decode(extension.getComponentByName('extnValue'),
103                                asn1Spec=OctetString())[0]
104             sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
105             for name in sa_names:
106                 name_type = name.getName()
107                 if name_type == 'dNSName':
108                     results['DNS'].add(decode_str(name.getComponent()))
109                 if name_type == 'uniformResourceIdentifier':
110                     value = decode_str(name.getComponent())
111                     if value.startswith('xmpp:'):
112                         results['URI'].add(value[5:])
113                 elif name_type == 'otherName':
114                     name = name.getComponent()
115   
116                     oid = name.getComponentByName('type-id')
117                     value = name.getComponentByName('value')
118   
119                     if oid == XMPP_ADDR:
120                         value = decoder.decode(value, asn1Spec=UTF8String())[0]
121                         results['XMPPAddr'].add(decode_str(value))
122                     elif oid == SRV_NAME:
123                         value = decoder.decode(value, asn1Spec=IA5String())[0]
124                         results['SRV'].add(decode_str(value))
125         return results
126
127
128     def check_ca(self):
129         for extension in self.extensions:
130             oid = extension.getComponentByName('extnID')
131             if oid != id_ce_basicConstraints:
132                 continue
133             value = decoder.decode(extension.getComponentByName('extnValue'),
134                                asn1Spec=OctetString())[0]
135             constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
136             return bool(constraints[0])
137
138     def extract_sig(self):
139         signature = self.cert.getComponentByName('signatureValue')
140         algorithm = self.cert.getComponentByName('signatureAlgorithm')
141         data = encoder.encode(self.tbs)
142         s = encoder.encode(signature)
143         return algorithm, s, data
144
145
146     def extract_pubkey(self):
147         pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
148         algo = pki.getComponentByName('algorithm')
149         algorithm = algo.getComponentByName('algorithm')
150         parameters = algo.getComponentByName('parameters')
151         subjectPublicKey = pki.getComponentByName('subjectPublicKey')
152         return algorithm, parameters, encoder.encode(subjectPublicKey)
153
154
155     def extract_dates(self):
156         validity = self.tbs.getComponentByName('validity')
157         not_before = validity.getComponentByName('notBefore')
158         not_before = str(not_before.getComponent())
159         not_after = validity.getComponentByName('notAfter')
160         not_after = str(not_after.getComponent())
161         if isinstance(not_before, GeneralizedTime):
162             not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
163         else:
164             not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
165         if isinstance(not_after, GeneralizedTime):
166             not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
167         else:
168             not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
169         return not_before, not_after
170
171     def get_ttl(self):
172         not_before, not_after = self.extract_dates()
173         if not_after is None:
174             return None
175         return not_after - datetime.utcnow()
176
177     def check_date(self):
178         not_before, not_after = self.extract_dates()
179         now = datetime.utcnow()
180         if not_before > now:
181             raise CertificateError(
182                 'Certificate has not entered its valid date range.')
183         if not_after <= now:
184             raise CertificateError(
185                 'Certificate has expired.')
186
187     def check_name(self, expected):
188         cert_names = self.extract_names()
189         if '.' in expected:
190             expected_wild = expected[expected.index('.'):]
191         else:
192             expected_wild = expected
193         expected_srv = '_xmpp-client.%s' % expected
194         for name in cert_names['XMPPAddr']:
195             if name == expected:
196                 return True
197         for name in cert_names['SRV']:
198             if name == expected_srv or name == expected:
199                 return True
200         for name in cert_names['DNS']:
201             if name == expected:
202                 return True
203             if name.startswith('*'):
204                 if '.' in name:
205                     name_wild = name[name.index('.'):]
206                 else:
207                     name_wild = name
208                 if expected_wild == name_wild:
209                     return True
210         for name in cert_names['URI']:
211             if name == expected:
212                 return True
213         if cert_names['CN'] == expected:
214             return True
215         raise CertificateError(
216             'Could not match certficate against hostname: %s' % expected)
217
218
219 class X509CertChain(tlslite.X509CertChain):
220     pass