Skip to content

Commit

Permalink
CertificateCredential supports PKCS12 certs (Azure#16384)
Browse files Browse the repository at this point in the history
  • Loading branch information
chlowell authored and hildurhodd committed Aug 26, 2021
1 parent 1ad60b7 commit 41db3c3
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 187 deletions.
12 changes: 12 additions & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 1.7.0b4 (Unreleased)

### Features Added
- `CertificateCredential` accepts certificates in PKCS12 format
([#13540](https://github.com/Azure/azure-sdk-for-python/issues/13540))

### Breaking Changes

Expand All @@ -15,6 +17,16 @@
([#18798](https://github.com/Azure/azure-sdk-for-python/issues/18798))



## 1.6.1 (2021-08-19)

### Other Changes
- Persistent cache implementations are now loaded on demand, enabling
workarounds when importing transitive dependencies such as pywin32
fails
([#19989](https://github.com/Azure/azure-sdk-for-python/issues/19989))


## 1.7.0b3 (2021-08-10)

### Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion sdk/identity/azure-identity/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ variables:
|-|-
|`AZURE_CLIENT_ID`|id of an Azure Active Directory application
|`AZURE_TENANT_ID`|id of the application's Azure Active Directory tenant
|`AZURE_CLIENT_CERTIFICATE_PATH`|path to a PEM-encoded certificate file including private key (without password protection)
|`AZURE_CLIENT_CERTIFICATE_PATH`|path to a PEM or PKCS12 certificate file including private key (without password protection)

#### Username and password
|variable name|value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License.
# ------------------------------------
from binascii import hexlify
from typing import TYPE_CHECKING
from typing import cast, NamedTuple, TYPE_CHECKING

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
Expand All @@ -15,6 +15,7 @@
from .._internal.client_credential_base import ClientCredentialBase

if TYPE_CHECKING:
# pylint:disable=ungrouped-imports
from typing import Any, Optional, Union


Expand All @@ -28,13 +29,13 @@ class CertificateCredential(ClientCredentialBase):
:param str tenant_id: ID of the service principal's tenant. Also called its "directory" ID.
:param str client_id: the service principal's client ID
:param str certificate_path: path to a PEM-encoded certificate file including the private key. If not provided,
**certificate_data** is required.
:param str certificate_path: Optional path to a certificate file in PEM or PKCS12 format, including the private
key. If not provided, **certificate_data** is required.
:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
:keyword str authority: Authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com",
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword bytes certificate_data: the bytes of a certificate in PEM format, including the private key
:keyword bytes certificate_data: the bytes of a certificate in PEM or PKCS12 format, including the private key
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
requires a different encoding, pass appropriately encoded bytes instead.
:paramtype password: str or bytes
Expand Down Expand Up @@ -76,6 +77,42 @@ def extract_cert_chain(pem_bytes):
return b"".join(chain.splitlines())


_Cert = NamedTuple("_Cert", [("pem_bytes", bytes), ("private_key", "Any"), ("fingerprint", bytes)])


def load_pem_certificate(certificate_data, password):
# type: (bytes, Optional[bytes]) -> _Cert
private_key = serialization.load_pem_private_key(certificate_data, password, backend=default_backend())
cert = x509.load_pem_x509_certificate(certificate_data, default_backend())
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec
return _Cert(certificate_data, private_key, fingerprint)


def load_pkcs12_certificate(certificate_data, password):
# type: (bytes, Optional[bytes]) -> _Cert
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, pkcs12, PrivateFormat

private_key, cert, additional_certs = pkcs12.load_key_and_certificates(
certificate_data, password, backend=default_backend()
)
if not private_key:
raise ValueError("The certificate must include its private key")
if not cert:
# mentioning PEM here because we raise this error when certificate_data is garbage
raise ValueError("Failed to deserialize certificate in PEM or PKCS12 format")

# This serializes the private key without any encryption it may have had. Doing so doesn't violate security
# boundaries because this representation of the key is kept in memory. We already have the key and its
# password, if any, in memory.
key_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
pem_sections = [key_bytes] + [c.public_bytes(Encoding.PEM) for c in [cert] + additional_certs]
pem_bytes = b"".join(pem_sections)

fingerprint = cert.fingerprint(hashes.SHA1()) # nosec

return _Cert(pem_bytes, private_key, fingerprint)


def get_client_credential(certificate_path, password=None, certificate_data=None, send_certificate_chain=False, **_):
# type: (Optional[str], Optional[Union[bytes, str]], Optional[bytes], bool, **Any) -> dict
"""Load a certificate from a filesystem path or bytes, return it as a dict suitable for msal.ClientApplication"""
Expand All @@ -88,24 +125,28 @@ def get_client_credential(certificate_path, password=None, certificate_data=None
elif not certificate_data:
raise ValueError('CertificateCredential requires a value for either "certificate_path" or "certificate_data"')

if isinstance(password, six.text_type):
password = password.encode(encoding="utf-8")
if password:
# if password is already bytes, this won't change its encoding
password = six.ensure_binary(password, "utf-8")
password = cast("Optional[bytes]", password)

private_key = serialization.load_pem_private_key(certificate_data, password=password, backend=default_backend())
if not isinstance(private_key, RSAPrivateKey):
raise ValueError("CertificateCredential requires an RSA private key because it uses RS256 for signing")
if certificate_data.startswith(b"-----"):
cert = load_pem_certificate(certificate_data, password)
else:
cert = load_pkcs12_certificate(certificate_data, password)
password = None # load_pkcs12_certificate returns cert.pem_bytes decrypted

cert = x509.load_pem_x509_certificate(certificate_data, default_backend())
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec
if not isinstance(cert.private_key, RSAPrivateKey):
raise ValueError("CertificateCredential requires an RSA private key because it uses RS256 for signing")

client_credential = {"private_key": certificate_data, "thumbprint": hexlify(fingerprint).decode("utf-8")}
client_credential = {"private_key": cert.pem_bytes, "thumbprint": hexlify(cert.fingerprint).decode("utf-8")}
if password:
client_credential["passphrase"] = password

if send_certificate_chain:
try:
# the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert
chain = extract_cert_chain(certificate_data)
chain = extract_cert_chain(cert.pem_bytes)
client_credential["public_certificate"] = six.ensure_str(chain)
except ValueError as ex:
# we shouldn't land here--cryptography already loaded the cert and would have raised if it were malformed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class EnvironmentCredential(object):
Service principal with certificate:
- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID.
- **AZURE_CLIENT_ID**: the service principal's client ID
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM or PKCS12 certificate file including the private key. The
certificate must not be password-protected.
User with username and password:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class EnvironmentCredential(AsyncContextManager):
Service principal with certificate:
- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID.
- **AZURE_CLIENT_ID**: the service principal's client ID
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM or PKCS12 certificate file including the private key. The
certificate must not be password-protected.
:keyword bool allow_multitenant_authentication: when True, enables the credential to acquire tokens from any tenant
Expand Down
61 changes: 43 additions & 18 deletions sdk/identity/azure-identity/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def record_imds_test(request):


@pytest.fixture()
def live_service_principal(): # pylint:disable=inconsistent-return-statements
def live_service_principal():
"""Fixture for live Identity tests. Skips them when environment configuration is incomplete."""

missing_variables = [
Expand All @@ -77,33 +77,58 @@ def live_service_principal(): # pylint:disable=inconsistent-return-statements
}


def get_certificate_parameters(content, password_protected_content, password, extension):
# type: (bytes, bytes, str, str) -> dict
current_directory = os.path.dirname(__file__)
parameters = {
"cert_bytes": six.ensure_binary(content),
"cert_path": os.path.join(current_directory, "certificate." + extension),
"cert_with_password_bytes": six.ensure_binary(password_protected_content),
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password." + extension),
"password": password,
}

try:
with open(parameters["cert_path"], "wb") as f:
f.write(parameters["cert_bytes"])
with open(parameters["cert_with_password_path"], "wb") as f:
f.write(parameters["cert_with_password_bytes"])
except IOError as ex:
pytest.skip("Failed to write a file: {}".format(ex))

return parameters


@pytest.fixture()
def live_certificate(live_service_principal):
def live_pem_certificate(live_service_principal):
content = os.environ.get("PEM_CONTENT")
password_protected_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")

if content and password_protected_content and password:
current_directory = os.path.dirname(__file__)
parameters = {
"cert_bytes": six.ensure_binary(content),
"cert_path": os.path.join(current_directory, "certificate.pem"),
"cert_with_password_bytes": six.ensure_binary(password_protected_content),
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password.pem"),
"password": password,
}
parameters = get_certificate_parameters(content, password_protected_content, password, "pem")
return dict(live_service_principal, **parameters)

try:
with open(parameters["cert_path"], "wb") as f:
f.write(parameters["cert_bytes"])
with open(parameters["cert_with_password_path"], "wb") as f:
f.write(parameters["cert_with_password_bytes"])
except IOError as ex:
pytest.skip("Failed to write a file: {}".format(ex))
pytest.skip("Missing PEM certificate configuration")


@pytest.fixture()
def live_pfx_certificate(live_service_principal):
# PFX bytes arrive base64 encoded because Key Vault secrets have string values
encoded_content = os.environ.get("PFX_CONTENT")
encoded_password_protected_content = os.environ.get("PFX_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")

if encoded_content and encoded_password_protected_content and password:
import base64

content = base64.b64decode(six.ensure_binary(encoded_content))
password_protected_content = base64.b64decode(six.ensure_binary(encoded_password_protected_content))

parameters = get_certificate_parameters(content, password_protected_content, password, "pfx")
return dict(live_service_principal, **parameters)

pytest.skip("Missing PEM certificate configuration")
pytest.skip("Missing PFX certificate configuration")


@pytest.fixture()
Expand Down
96 changes: 18 additions & 78 deletions sdk/identity/azure-identity/samples/key_vault_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@
import os

from azure.identity import CertificateCredential, DefaultAzureCredential
from azure.keyvault.certificates import (
CertificateClient,
CertificateContentType,
CertificatePolicy,
WellKnownIssuerNames,
)
from azure.keyvault.certificates import CertificateClient, CertificateContentType, CertificatePolicy
from azure.keyvault.secrets import SecretClient

VAULT_URL = os.environ["VAULT_URL"]
Expand All @@ -31,79 +26,24 @@
# Key Vault stores certificate private keys as secrets, so we use a SecretClient to retrieve them
SECRET_CLIENT = SecretClient(VAULT_URL, credential)

# Creating a self-signed cert to work with
create_cert_poller = CERT_CLIENT.begin_create_certificate("azure-identity-sample", CertificatePolicy.get_default())
cert = create_cert_poller.result()

def pkcs12_cert():
"""Demonstrates creating a CertificateCredential with a Key Vault certificate stored in PKCS12 (default) format"""
# The certificate as returned by begin_create_certificate() or get_certificate() contains
# only the public portion of the certificate. Key Vault will release the private key only
# if the certificate's policy indicates it's exportable (certs are exportable by default).
policy = CERT_CLIENT.get_certificate_policy(cert.name)
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"

# Creating a self-signed cert to work with
create_cert_poller = CERT_CLIENT.begin_create_certificate(
"azure-identity-sample-default", CertificatePolicy.get_default()
)
cert = create_cert_poller.result()
# The policy's content_type indicates whether the certificate is stored in PEM or PKCS12 format
assert policy.content_type == CertificateContentType.pkcs12, "Expected PKCS12 because that's Key Vault's default"

# CertificateCredential requires the certificate and its private key in PEM format.
# The certificate as returned by begin_create_certificate() or get_certificate() contains
# only the public portion of the certificate. Key Vault will release the private key only
# if the certificate's policy indicates it's exportable (certs are exportable by default).
policy = CERT_CLIENT.get_certificate_policy(cert.name)
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"
# Key Vault stores the complete certificate, with its private key, as a secret sharing the certificate's name
# Because this certificate is stored in PKCS12 format, the secret's value is base64 encoded bytes
encoded_cert = SECRET_CLIENT.get_secret(cert.name).value
pkcs12_bytes = base64.b64decode(encoded_cert)

# The policy's content_type indicates whether the certificate is stored in PEM or PKCS12 format
assert policy.content_type == CertificateContentType.pkcs12, "Expected PKCS12 because that's Key Vault's default"

# Key Vault stores the complete certificate, with its private key, as a secret sharing the certificate's name
# Because this certificate is stored in PKCS12 format, the secret's value is base64 encoded bytes
encoded_cert = SECRET_CLIENT.get_secret(cert.name).value
pkcs12_bytes = base64.b64decode(encoded_cert)

# cryptography can convert PKCS12 to PEM
def pkcs12_to_pem(pkcs12_bytes):
"""Convert certificate bytes from PKCS12 format to PEM using the "cryptography" library"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding, pkcs12, PrivateFormat, NoEncryption

private_key, cert, additional_certs = pkcs12.load_key_and_certificates(
pkcs12_bytes, password=None, backend=default_backend()
)

# using NoEncryption because the certificate created above is not password protected
private_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
pem_sections = [private_bytes] + [c.public_bytes(Encoding.PEM) for c in [cert] + additional_certs]
return b"".join(pem_sections)

pem_bytes = pkcs12_to_pem(pkcs12_bytes)

# This credential will load the certificate but can't actually authenticate. Authentication requires real
# tenant and client IDs for a service principal configured to accept the certificate.
CertificateCredential("tenant-id", "client-id", certificate_data=pem_bytes)


def pem_cert():
"""Demonstrates creating a CertificateCredential with a Key Vault certificate stored in PEM format"""

# creating a self-signed certificate stored in PEM format (PKCS12 is Key Vault's default format)
pem_policy = CertificatePolicy(
WellKnownIssuerNames.self, subject="CN=localhost", content_type=CertificateContentType.pem
)
pem_cert = CERT_CLIENT.begin_create_certificate("azure-identity-sample-pem", pem_policy).result()

# verifying the certificate is exportable and stored in PEM format, to
# demonstrate how you would do so when you don't already have its policy
policy = CERT_CLIENT.get_certificate_policy(pem_cert.name)
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"
assert policy.content_type == CertificateContentType.pem

# Because the certificate is exportable, it's available (with its private key) as a secret
pem_cert_secret = SECRET_CLIENT.get_secret(pem_cert.name)

# The secret's value is a string; CertificateCredential requires bytes
pem_bytes = pem_cert_secret.value.encode()

# This credential will load the certificate but can't actually authenticate. Authentication requires real
# tenant and client IDs for a service principal configured to accept the certificate.
CertificateCredential("tenant-id", "client-id", certificate_data=pem_bytes)


if __name__ == "__main__":
pkcs12_cert()
pem_cert()
# This credential will load the certificate but can't actually authenticate. Authentication requires real
# tenant and client IDs for a service principal configured to accept the certificate.
CertificateCredential("tenant-id", "client-id", certificate_data=pkcs12_bytes)
2 changes: 1 addition & 1 deletion sdk/identity/azure-identity/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
),
install_requires=[
"azure-core<2.0.0,>=1.11.0",
"cryptography>=2.1.4",
"cryptography>=2.5",
"msal<2.0.0,>=1.12.0",
"msal-extensions~=0.3.0",
"six>=1.12.0",
Expand Down
Binary file not shown.
Binary file added sdk/identity/azure-identity/tests/certificate.pfx
Binary file not shown.
4 changes: 2 additions & 2 deletions sdk/identity/azure-identity/tests/test_aad_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from six.moves.urllib_parse import urlparse

from helpers import build_aad_response, mock_response
from test_certificate_credential import CERT_PATH
from test_certificate_credential import PEM_CERT_PATH

try:
from unittest.mock import Mock, patch
Expand Down Expand Up @@ -222,7 +222,7 @@ def test_retries_token_requests():
transport.send.reset_mock()

with pytest.raises(ServiceRequestError, match=message):
client.obtain_token_by_client_certificate("", AadClientCertificate(open(CERT_PATH, "rb").read()))
client.obtain_token_by_client_certificate("", AadClientCertificate(open(PEM_CERT_PATH, "rb").read()))
assert transport.send.call_count > 1
transport.send.reset_mock()

Expand Down
Loading

0 comments on commit 41db3c3

Please sign in to comment.