+#!/usr/bin/env python
+#
+# Electrum - lightweight Bitcoin client
+# Copyright (C) 2014 Thomas Voegtlin
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
import hashlib
import httplib
import os.path
import bitcoin
import util
import transaction
+import x509
REQUEST_HEADERS = {'Accept': 'application/bitcoin-paymentrequest', 'User-Agent': 'Electrum'}
ACK_HEADERS = {'Content-Type':'application/bitcoin-payment','Accept':'application/bitcoin-paymentack','User-Agent':'Electrum'}
-# status can be:
-PR_UNPAID = 0
-PR_EXPIRED = 1
-PR_SENT = 2 # sent but not propagated
-PR_PAID = 3 # send and propagated
-PR_ERROR = 4 # could not parse
-
ca_list = {}
+ca_path = requests.certs.where()
+
+
-def load_certificates():
- try:
- from M2Crypto import X509
- except:
- print_error("ERROR: Could not import M2Crypto")
- return False
- ca_path = os.path.expanduser("~/.electrum/ca/ca-bundle.crt")
+def load_certificates():
try:
ca_f = open(ca_path, 'r')
except Exception:
else:
c += line
if line == "-----END CERTIFICATE-----\n":
- x = X509.load_cert_string(c)
- ca_list[x.get_fingerprint()] = x
+ x = x509.X509()
+ try:
+ x.parse(c)
+ except Exception as e:
+ util.print_error("cannot parse cert:", e)
+ ca_list[x.getFingerprint()] = x
ca_f.close()
+ util.print_error("%d certificates"%len(ca_list))
return True
load_certificates()
+
class PaymentRequest:
def __init__(self, config):
self.config = config
with open(filename,'r') as f:
r = f.read()
+ assert key == bitcoin.sha256(r)[0:16].encode('hex')
+ self.id = key
self.parse(r)
def verify(self):
- try:
- from M2Crypto import X509
- except:
- self.error = "cannot import M2Crypto"
- return False
if not ca_list:
self.error = "Trusted certificate authorities list not found"
return False
paymntreq = self.data
- sig = paymntreq.signature
- if not sig:
+ if not paymntreq.signature:
self.error = "No signature"
return
cert.ParseFromString(paymntreq.pki_data)
cert_num = len(cert.certificate)
- x509_1 = X509.load_cert_der_string(cert.certificate[0])
- if self.domain != x509_1.get_subject().CN:
- validcert = False
- try:
- SANs = x509_1.get_ext("subjectAltName").get_value().split(",")
- for s in SANs:
- s = s.strip()
- if s.startswith("DNS:") and s[4:] == self.domain:
- validcert = True
- print "Match SAN DNS"
- elif s.startswith("IP:") and s[3:] == self.domain:
- validcert = True
- print "Match SAN IP"
- elif s.startswith("email:") and s[6:] == self.domain:
- validcert = True
- print "Match SAN email"
- except Exception, e:
- print "ERROR: No SAN data"
- if not validcert:
- ###TODO: check for wildcards
- self.error = "ERROR: Certificate Subject Domain Mismatch and SAN Mismatch"
- return
-
- x509 = []
- CA_OU = ''
-
- if cert_num > 1:
- for i in range(cert_num - 1):
- x509.append(X509.load_cert_der_string(cert.certificate[i+1]))
- if x509[i].check_ca() == 0:
+ x509_chain = []
+ for i in range(cert_num):
+ x = x509.X509()
+ x.parseBinary(bytearray(cert.certificate[i]))
+ x.slow_parse()
+ x509_chain.append(x)
+ if i == 0:
+ try:
+ x.check_date()
+ x.check_name(self.domain)
+ except Exception as e:
+ self.error = str(e)
+ return
+ else:
+ if not x.check_ca():
self.error = "ERROR: Supplied CA Certificate Error"
return
- for i in range(cert_num - 1):
- if i == 0:
- if x509_1.verify(x509[i].get_pubkey()) != 1:
- self.error = "ERROR: Certificate not Signed by Provided CA Certificate Chain"
- return
- else:
- if x509[i-1].verify(x509[i].get_pubkey()) != 1:
- self.error = "ERROR: CA Certificate not Signed by Provided CA Certificate Chain"
- return
-
- supplied_CA_fingerprint = x509[cert_num-2].get_fingerprint()
- supplied_CA_CN = x509[cert_num-2].get_subject().CN
- CA_match = False
- x = ca_list.get(supplied_CA_fingerprint)
- if x:
- CA_OU = x.get_subject().OU
- CA_match = True
- if x.get_subject().CN != supplied_CA_CN:
- print "ERROR: Trusted CA CN Mismatch; however CA has trusted fingerprint"
- print "Payment will continue with manual verification."
+ if not cert_num > 1:
+ self.error = "ERROR: CA Certificate Chain Not Provided by Payment Processor"
+ return False
+
+ for i in range(1, cert_num):
+ x = x509_chain[i]
+ prev_x = x509_chain[i-1]
+
+ algo, sig, data = prev_x.extract_sig()
+ sig = bytearray(sig[5:])
+ pubkey = x.publicKey
+ if algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA1:
+ verify = pubkey.hashAndVerify(sig, data)
+ elif algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA256:
+ hashBytes = bytearray(hashlib.sha256(data).digest())
+ prefixBytes = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20])
+ verify = pubkey.verify(sig, prefixBytes + hashBytes)
else:
- print "ERROR: Supplied CA Not Found in Trusted CA Store."
+ self.error = "Algorithm not supported"
+ util.print_error(self.error, algo.getComponentByName('algorithm'))
+ return
+
+ if not verify:
+ self.error = "Certificate not Signed by Provided CA Certificate Chain"
+ return
+
+ ca = x509_chain[cert_num-1]
+ supplied_CA_fingerprint = ca.getFingerprint()
+ supplied_CA_names = ca.extract_names()
+ CA_OU = supplied_CA_names['OU']
+
+ x = ca_list.get(supplied_CA_fingerprint)
+ if x:
+ x.slow_parse()
+ names = x.extract_names()
+ CA_match = True
+ if names['CN'] != supplied_CA_names['CN']:
+ print "ERROR: Trusted CA CN Mismatch; however CA has trusted fingerprint"
print "Payment will continue with manual verification."
else:
- self.error = "ERROR: CA Certificate Chain Not Provided by Payment Processor"
- return False
+ CA_match = False
+ pubkey0 = x509_chain[0].publicKey
+ sig = paymntreq.signature
paymntreq.signature = ''
s = paymntreq.SerializeToString()
- pubkey_1 = x509_1.get_pubkey()
+ sigBytes = bytearray(sig)
+ msgBytes = bytearray(s)
if paymntreq.pki_type == "x509+sha256":
- pubkey_1.reset_context(md="sha256")
+ hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
+ prefixBytes = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20])
+ verify = pubkey0.verify(sigBytes, prefixBytes + hashBytes)
elif paymntreq.pki_type == "x509+sha1":
- pubkey_1.reset_context(md="sha1")
+ verify = pubkey0.hashAndVerify(sigBytes, msgBytes)
else:
self.error = "ERROR: Unsupported PKI Type for Message Signature"
return False
- pubkey_1.verify_init()
- pubkey_1.verify_update(s)
- if pubkey_1.verify_final(sig) != 1:
+ if not verify:
self.error = "ERROR: Invalid Signature for Payment Request Data"
return False
### SIG Verified
-
self.details = pay_det = paymentrequest_pb2.PaymentDetails()
self.details.ParseFromString(paymntreq.serialized_payment_details)
if CA_match:
self.status = 'Signed by Trusted CA:\n' + CA_OU
+ else:
+ self.status = "Supplied CA Not Found in Trusted CA Store."
self.payment_url = self.details.payment_url
- if self.has_expired():
- self.error = "ERROR: Payment Request has Expired."
- return False
-
return True
def has_expired(self):
return self.details.expires and self.details.expires < int(time.time())
+ def get_expiration_date(self):
+ return self.details.expires
+
def get_amount(self):
return sum(map(lambda x:x[1], self.outputs))
def get_domain(self):
return self.domain
+ def get_memo(self):
+ return self.memo
+
def get_id(self):
return self.id
def get_outputs(self):
- return self.outputs
+ return self.outputs[:]
def send_ack(self, raw_tx, refund_addr):
- if self.has_expired():
- return False, "has expired"
-
pay_det = self.details
if not self.details.payment_url:
return False, "no url"
uri = sys.argv[1]
except:
print "usage: %s url"%sys.argv[0]
- print "example url: \"bitcoin:mpu3yTLdqA1BgGtFUwkVJmhnU3q5afaFkf?r=https%3A%2F%2Fbitcoincore.org%2F%7Egavin%2Ff.php%3Fh%3D2a828c05b8b80dc440c80a5d58890298&amount=1\""
+ print "example url: \"bitcoin:17KjQgnXC96jakzJe9yo8zxqerhqNptmhq?amount=0.0018&r=https%3A%2F%2Fbitpay.com%2Fi%2FMXc7qTM5f87EC62SWiS94z\""
sys.exit(1)
- address, amount, label, message, request_url, url = util.parse_url(uri)
- pr = PaymentRequest(request_url)
+ address, amount, label, message, request_url = util.parse_URI(uri)
+ from simple_config import SimpleConfig
+ config = SimpleConfig()
+ pr = PaymentRequest(config)
+ pr.read(request_url)
if not pr.verify():
+ print 'verify failed'
+ print pr.error
sys.exit(1)
print 'Payment Request Verified Domain: ', pr.domain