53b2d6b76a1e7cf57f488ceb63e0218e19b2b690
[electrum-nvc.git] / lib / x509.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from datetime import datetime, timedelta
21
22 try:
23     import pyasn1
24 except ImportError:
25     sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1'")
26
27 try:
28     import tlslite
29 except ImportError:
30     sys.exit("Error: tlslite does not seem to be installed. Try 'sudo pip install tlslite'")
31
32
33
34 from pyasn1.codec.der import decoder, encoder
35 from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
36 from pyasn1.type.char import BMPString, IA5String, UTF8String
37 from pyasn1.type.useful import GeneralizedTime
38 from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
39                                     SubjectAltName, GeneralNames,
40                                     GeneralName)
41 from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
42 from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
43 from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
44 from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
45 XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
46 SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
47 ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
48
49
50 class CertificateError(Exception):
51     pass
52
53 def decode_str(data):
54     encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
55     return bytes(data).decode(encoding)
56
57
58 class X509(tlslite.X509):
59     """Child class of tlslite.X509 that uses pyasn1 to parse cert
60     information. Note: pyasn1 is a lot slower than tlslite, so we
61     should try to do everything in tlslite.
62     """
63
64     def slow_parse(self):
65         self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
66         self.tbs = self.cert.getComponentByName('tbsCertificate')
67         self.subject = self.tbs.getComponentByName('subject')
68         self.extensions = self.tbs.getComponentByName('extensions') or []
69
70     def extract_names(self):
71         results = {'CN': None,
72                    'DNS': set(),
73                    'SRV': set(),
74                    'URI': set(),
75                    'XMPPAddr': set(), 
76                    'OU': None,}
77   
78         # Extract the CommonName(s) from the cert.
79         for rdnss in self.subject:
80             for rdns in rdnss:
81                 for name in rdns:
82                     oid = name.getComponentByName('type')
83                     value = name.getComponentByName('value')
84   
85                     if oid == COMMON_NAME:
86                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
87                         value = decode_str(value.getComponent())
88                         results['CN'] = value
89
90                     elif oid == OU_NAME:
91                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
92                         value = decode_str(value.getComponent())
93                         results['OU'] = value
94
95         # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
96         for extension in self.extensions:
97             oid = extension.getComponentByName('extnID')
98             if oid != SUBJECT_ALT_NAME:
99                 continue
100   
101             value = decoder.decode(extension.getComponentByName('extnValue'),
102                                asn1Spec=OctetString())[0]
103             sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
104             for name in sa_names:
105                 name_type = name.getName()
106                 if name_type == 'dNSName':
107                     results['DNS'].add(decode_str(name.getComponent()))
108                 if name_type == 'uniformResourceIdentifier':
109                     value = decode_str(name.getComponent())
110                     if value.startswith('xmpp:'):
111                         results['URI'].add(value[5:])
112                 elif name_type == 'otherName':
113                     name = name.getComponent()
114   
115                     oid = name.getComponentByName('type-id')
116                     value = name.getComponentByName('value')
117   
118                     if oid == XMPP_ADDR:
119                         value = decoder.decode(value, asn1Spec=UTF8String())[0]
120                         results['XMPPAddr'].add(decode_str(value))
121                     elif oid == SRV_NAME:
122                         value = decoder.decode(value, asn1Spec=IA5String())[0]
123                         results['SRV'].add(decode_str(value))
124         return results
125
126
127     def check_ca(self):
128         for extension in self.extensions:
129             oid = extension.getComponentByName('extnID')
130             if oid != id_ce_basicConstraints:
131                 continue
132             value = decoder.decode(extension.getComponentByName('extnValue'),
133                                asn1Spec=OctetString())[0]
134             constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
135             return bool(constraints[0])
136
137     def extract_sig(self):
138         signature = self.cert.getComponentByName('signatureValue')
139         algorithm = self.cert.getComponentByName('signatureAlgorithm')
140         data = encoder.encode(self.tbs)
141         s = encoder.encode(signature)
142         return algorithm, s, data
143
144
145     def extract_pubkey(self):
146         pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
147         algo = pki.getComponentByName('algorithm')
148         algorithm = algo.getComponentByName('algorithm')
149         parameters = algo.getComponentByName('parameters')
150         subjectPublicKey = pki.getComponentByName('subjectPublicKey')
151         return algorithm, parameters, encoder.encode(subjectPublicKey)
152
153
154     def extract_dates(self):
155         validity = self.tbs.getComponentByName('validity')
156         not_before = validity.getComponentByName('notBefore')
157         not_before = str(not_before.getComponent())
158         not_after = validity.getComponentByName('notAfter')
159         not_after = str(not_after.getComponent())
160         if isinstance(not_before, GeneralizedTime):
161             not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
162         else:
163             not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
164         if isinstance(not_after, GeneralizedTime):
165             not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
166         else:
167             not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
168         return not_before, not_after
169
170     def get_ttl(self):
171         not_before, not_after = self.extract_dates()
172         if not_after is None:
173             return None
174         return not_after - datetime.utcnow()
175
176     def check_date(self):
177         not_before, not_after = self.extract_dates()
178         now = datetime.utcnow()
179         if not_before > now:
180             raise CertificateError(
181                 'Certificate has not entered its valid date range.')
182         if not_after <= now:
183             raise CertificateError(
184                 'Certificate has expired.')
185
186     def check_name(self, expected):
187         cert_names = self.extract_names()
188         if '.' in expected:
189             expected_wild = expected[expected.index('.'):]
190         else:
191             expected_wild = expected
192         expected_srv = '_xmpp-client.%s' % expected
193         for name in cert_names['XMPPAddr']:
194             if name == expected:
195                 return True
196         for name in cert_names['SRV']:
197             if name == expected_srv or name == expected:
198                 return True
199         for name in cert_names['DNS']:
200             if name == expected:
201                 return True
202             if name.startswith('*'):
203                 if '.' in name:
204                     name_wild = name[name.index('.'):]
205                 else:
206                     name_wild = name
207                 if expected_wild == name_wild:
208                     return True
209         for name in cert_names['URI']:
210             if name == expected:
211                 return True
212         if cert_names['CN'] == expected:
213             return True
214         raise CertificateError(
215             'Could not match certficate against hostname: %s' % expected)
216
217
218 class X509CertChain(tlslite.X509CertChain):
219     pass