Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
'paste',
'zope.interface',
'repoze.who',
'pycryptodomex',
'cryptography',
'pytz',
'pyOpenSSL',
'python-dateutil',
Expand Down
37 changes: 11 additions & 26 deletions src/saml2/cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from OpenSSL import crypto
from os.path import join
from os import remove
from Cryptodome.Util import asn1

from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate

backend = default_backend()

class WrongInput(Exception):
pass
Expand Down Expand Up @@ -194,9 +198,8 @@ def write_str_to_file(self, file, str_data):
f.close()

def read_str_from_file(self, file, type="pem"):
f = open(file, 'rt')
str_data = f.read()
f.close()
with open(file, 'rb') as f:
str_data = f.read()

if type == "pem":
return str_data
Expand Down Expand Up @@ -336,31 +339,13 @@ def verify(self, signing_cert_str, cert_str):
cert_algorithm = cert.get_signature_algorithm()
if six.PY3:
cert_algorithm = cert_algorithm.decode('ascii')
cert_str = cert_str.encode('ascii')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the certificate contains umlauts e.g. "ä"? Are they encoded in a special format?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cert_str is a PEM formatted string here, which is guaranteed to be ascii. The requirement for this is because load_pem_x509_certificate takes bytes, not text.


cert_asn1 = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)

der_seq = asn1.DerSequence()
der_seq.decode(cert_asn1)

cert_certificate = der_seq[0]
#cert_signature_algorithm=der_seq[1]
cert_signature = der_seq[2]

cert_signature_decoded = asn1.DerObject()
cert_signature_decoded.decode(cert_signature)

signature_payload = cert_signature_decoded.payload

sig_pay0 = signature_payload[0]
if ((isinstance(sig_pay0, int) and sig_pay0 != 0) or
(isinstance(sig_pay0, str) and sig_pay0 != '\x00')):
return (False,
"The certificate should not contain any unused bits.")

signature = signature_payload[1:]
cert_crypto = load_pem_x509_certificate(cert_str, backend)

try:
crypto.verify(ca_cert, signature, cert_certificate,
crypto.verify(ca_cert, cert_crypto.signature,
cert_crypto.tbs_certificate_bytes,
cert_algorithm)
return True, "Signed certificate is valid and correctly signed by CA certificate."
except crypto.Error as e:
Expand Down
3 changes: 0 additions & 3 deletions src/saml2/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from binascii import hexlify
from hashlib import sha1

# from Crypto.PublicKey import RSA
from Cryptodome.PublicKey import RSA

from saml2.metadata import ENDPOINTS
from saml2.profile import paos, ecp
from saml2.soap import parse_soap_enveloped_saml_artifact_resolve
Expand Down
86 changes: 30 additions & 56 deletions src/saml2/sigver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,13 @@

from future.backports.urllib.parse import urlencode

# from Crypto.PublicKey.RSA import importKey
# from Crypto.Signature import PKCS1_v1_5
# from Crypto.Util.asn1 import DerSequence
# from Crypto.PublicKey import RSA
# from Crypto.Hash import SHA
# from Crypto.Hash import SHA224
# from Crypto.Hash import SHA256
# from Crypto.Hash import SHA384
# from Crypto.Hash import SHA512

from Cryptodome.PublicKey.RSA import importKey
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.Util.asn1 import DerSequence
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA
from Cryptodome.Hash import SHA224
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import SHA384
from Cryptodome.Hash import SHA512
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import load_pem_x509_certificate

from tempfile import NamedTemporaryFile
from subprocess import Popen
Expand Down Expand Up @@ -87,6 +75,8 @@
PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>"
PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>'

backend = default_backend()


class SigverError(SAMLError):
pass
Expand Down Expand Up @@ -406,18 +396,10 @@ def active_cert(key):
"""
try:
cert_str = pem_format(key)
try:
certificate = importKey(cert_str)
not_before = to_time(str(certificate.get_not_before()))
not_after = to_time(str(certificate.get_not_after()))
assert not_before < utc_now()
assert not_after > utc_now()
return True
except:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
assert cert.has_expired() == 0
assert not OpenSSLWrapper().certificate_not_valid_yet(cert)
return True
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
assert cert.has_expired() == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertion in production code might be optimized out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm simply replacing an existing assertion. I expect @rohe already knows that about this code.

assert not OpenSSLWrapper().certificate_not_valid_yet(cert)
return True
except AssertionError:
return False
except AttributeError:
Expand Down Expand Up @@ -555,19 +537,8 @@ def rsa_eq(key1, key2):


def extract_rsa_key_from_x509_cert(pem):
# Convert from PEM to DER
der = ssl.PEM_cert_to_DER_cert(pem.decode('ascii'))

# Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
cert = DerSequence()
cert.decode(der)
tbsCertificate = DerSequence()
tbsCertificate.decode(cert[0])
subjectPublicKeyInfo = tbsCertificate[6]

# Initialize RSA key
rsa_key = RSA.importKey(subjectPublicKeyInfo)
return rsa_key
cert = load_pem_x509_certificate(pem, backend)
return cert.public_key()


def pem_format(key):
Expand All @@ -576,7 +547,7 @@ def pem_format(key):


def import_rsa_key_from_file(filename):
return RSA.importKey(read_file(filename, 'r'))
return load_pem_private_key(read_file(filename, 'rb'), None, backend)


def parse_xmlsec_output(output):
Expand Down Expand Up @@ -622,25 +593,28 @@ def sign(self, msg, key=None):
if key is None:
key = self.key

h = self.digest.new(msg)
signer = PKCS1_v1_5.new(key)
return signer.sign(h)
return key.sign(msg, PKCS1v15(), self.digest)

def verify(self, msg, sig, key=None):
if key is None:
key = self.key

h = self.digest.new(msg)
verifier = PKCS1_v1_5.new(key)
return verifier.verify(h, sig)
try:
if isinstance(key, rsa.RSAPrivateKey):
key = key.public_key()

key.verify(sig, msg, PKCS1v15(), self.digest)
return True
except InvalidSignature:
return False


SIGNER_ALGS = {
SIG_RSA_SHA1: RSASigner(SHA),
SIG_RSA_SHA224: RSASigner(SHA224),
SIG_RSA_SHA256: RSASigner(SHA256),
SIG_RSA_SHA384: RSASigner(SHA384),
SIG_RSA_SHA512: RSASigner(SHA512),
SIG_RSA_SHA1: RSASigner(hashes.SHA1()),
SIG_RSA_SHA224: RSASigner(hashes.SHA224()),
SIG_RSA_SHA256: RSASigner(hashes.SHA256()),
SIG_RSA_SHA384: RSASigner(hashes.SHA384()),
SIG_RSA_SHA512: RSASigner(hashes.SHA512()),
}

REQ_ORDER = ["SAMLRequest", "RelayState", "SigAlg"]
Expand Down