0656820ae2d63401a576bc9701ec0e3d744eb151
[electrum-nvc.git] / lib / x509.py
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from datetime import datetime, timedelta
21
22 try:
23     import pyasn1
24 except ImportError:
25     sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1'")
26
27 try:
28     import tlslite
29 except ImportError:
30     sys.exit("Error: tlslite does not seem to be installed. Try 'sudo pip install tlslite'")
31
32
33
34 from pyasn1.codec.der import decoder, encoder
35 from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
36 from pyasn1.type.char import BMPString, IA5String, UTF8String
37 from pyasn1.type.useful import GeneralizedTime
38 from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
39                                     SubjectAltName, GeneralNames,
40                                     GeneralName)
41 from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
42 from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
43 from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
44 from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
45 XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
46 SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
47 ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
48
49
50 class CertificateError(Exception):
51     pass
52
53 def decode_str(data):
54     encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
55     return bytes(data).decode(encoding)
56
57
58 class X509(tlslite.X509):
59     """ Child class of tlslite.X509 that uses pyasn1 """
60
61     def slow_parse(self):
62         self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
63         self.tbs = self.cert.getComponentByName('tbsCertificate')
64         self.subject = self.tbs.getComponentByName('subject')
65         self.extensions = self.tbs.getComponentByName('extensions') or []
66
67     def extract_names(self):
68         results = {'CN': None,
69                    'DNS': set(),
70                    'SRV': set(),
71                    'URI': set(),
72                    'XMPPAddr': set(), 
73                    'OU': None,}
74   
75         # Extract the CommonName(s) from the cert.
76         for rdnss in self.subject:
77             for rdns in rdnss:
78                 for name in rdns:
79                     oid = name.getComponentByName('type')
80                     value = name.getComponentByName('value')
81   
82                     if oid == COMMON_NAME:
83                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
84                         value = decode_str(value.getComponent())
85                         results['CN'] = value
86
87                     elif oid == OU_NAME:
88                         value = decoder.decode(value, asn1Spec=DirectoryString())[0]
89                         value = decode_str(value.getComponent())
90                         results['OU'] = value
91
92         # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
93         for extension in self.extensions:
94             oid = extension.getComponentByName('extnID')
95             if oid != SUBJECT_ALT_NAME:
96                 continue
97   
98             value = decoder.decode(extension.getComponentByName('extnValue'),
99                                asn1Spec=OctetString())[0]
100             sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
101             for name in sa_names:
102                 name_type = name.getName()
103                 if name_type == 'dNSName':
104                     results['DNS'].add(decode_str(name.getComponent()))
105                 if name_type == 'uniformResourceIdentifier':
106                     value = decode_str(name.getComponent())
107                     if value.startswith('xmpp:'):
108                         results['URI'].add(value[5:])
109                 elif name_type == 'otherName':
110                     name = name.getComponent()
111   
112                     oid = name.getComponentByName('type-id')
113                     value = name.getComponentByName('value')
114   
115                     if oid == XMPP_ADDR:
116                         value = decoder.decode(value, asn1Spec=UTF8String())[0]
117                         results['XMPPAddr'].add(decode_str(value))
118                     elif oid == SRV_NAME:
119                         value = decoder.decode(value, asn1Spec=IA5String())[0]
120                         results['SRV'].add(decode_str(value))
121         return results
122
123
124     def check_ca(self):
125         for extension in self.extensions:
126             oid = extension.getComponentByName('extnID')
127             if oid != id_ce_basicConstraints:
128                 continue
129             value = decoder.decode(extension.getComponentByName('extnValue'),
130                                asn1Spec=OctetString())[0]
131             constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
132             return bool(constraints[0])
133
134     def extract_sig(self):
135         signature = self.cert.getComponentByName('signatureValue')
136         algorithm = self.cert.getComponentByName('signatureAlgorithm')
137         data = encoder.encode(self.tbs)
138         s = encoder.encode(signature)
139         return algorithm, s, data
140
141
142     def extract_pubkey(self):
143         pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
144         algo = pki.getComponentByName('algorithm')
145         algorithm = algo.getComponentByName('algorithm')
146         parameters = algo.getComponentByName('parameters')
147         subjectPublicKey = pki.getComponentByName('subjectPublicKey')
148         return algorithm, parameters, encoder.encode(subjectPublicKey)
149
150
151     def extract_dates(self):
152         validity = self.tbs.getComponentByName('validity')
153         not_before = validity.getComponentByName('notBefore')
154         not_before = str(not_before.getComponent())
155         not_after = validity.getComponentByName('notAfter')
156         not_after = str(not_after.getComponent())
157         if isinstance(not_before, GeneralizedTime):
158             not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
159         else:
160             not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
161         if isinstance(not_after, GeneralizedTime):
162             not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
163         else:
164             not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
165         return not_before, not_after
166
167     def get_ttl(self):
168         not_before, not_after = self.extract_dates()
169         if not_after is None:
170             return None
171         return not_after - datetime.utcnow()
172
173     def check_name(self, expected):
174         not_before, not_after = self.extract_dates()
175         cert_names = self.extract_names()
176         now = datetime.utcnow()
177         if not_before > now:
178             raise CertificateError(
179                 'Certificate has not entered its valid date range.')
180         if not_after <= now:
181             raise CertificateError(
182                 'Certificate has expired.')
183         if '.' in expected:
184             expected_wild = expected[expected.index('.'):]
185         else:
186             expected_wild = expected
187         expected_srv = '_xmpp-client.%s' % expected
188         for name in cert_names['XMPPAddr']:
189             if name == expected:
190                 return True
191         for name in cert_names['SRV']:
192             if name == expected_srv or name == expected:
193                 return True
194         for name in cert_names['DNS']:
195             if name == expected:
196                 return True
197             if name.startswith('*'):
198                 if '.' in name:
199                     name_wild = name[name.index('.'):]
200                 else:
201                     name_wild = name
202                 if expected_wild == name_wild:
203                     return True
204         for name in cert_names['URI']:
205             if name == expected:
206                 return True
207         if cert_names['CN'] == expected:
208             return True
209         raise CertificateError(
210             'Could not match certficate against hostname: %s' % expected)
211
212
213 class X509CertChain(tlslite.X509CertChain):
214     pass