From 0c941f53eefc82f64d0fe661489dbe200b3d4722 Mon Sep 17 00:00:00 2001 From: Paul Van Eck Date: Sat, 30 Mar 2024 00:57:39 +0000 Subject: [PATCH] [Identity] Update live test config Signed-off-by: Paul Van Eck --- sdk/identity/azure-identity/tests/conftest.py | 56 ++++++++++++++----- .../azure-identity/tests/test_live.py | 19 ++++--- .../azure-identity/tests/test_live_async.py | 19 ++++--- sdk/identity/tests.yml | 12 ++-- 4 files changed, 67 insertions(+), 39 deletions(-) diff --git a/sdk/identity/azure-identity/tests/conftest.py b/sdk/identity/azure-identity/tests/conftest.py index 1c227c06bc64b..12d78cdcdfbc5 100644 --- a/sdk/identity/azure-identity/tests/conftest.py +++ b/sdk/identity/azure-identity/tests/conftest.py @@ -2,6 +2,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ +import base64 import os import sys @@ -82,20 +83,31 @@ def live_service_principal(): } -def get_certificate_parameters(content, password_protected_content, password, extension): - # type: (bytes, bytes, str, str) -> dict +def get_certificate_parameters(content: bytes, extension: str) -> dict: current_directory = os.path.dirname(__file__) parameters = { "cert_bytes": content, "cert_path": os.path.join(current_directory, "certificate." + extension), + } + + try: + with open(parameters["cert_path"], "wb") as f: + f.write(parameters["cert_bytes"]) + except IOError as ex: + pytest.skip("Failed to write a file: {}".format(ex)) + + return parameters + + +def get_certificate_with_password_parameters(password_protected_content: bytes, password: str, extension: str) -> dict: + current_directory = os.path.dirname(__file__) + parameters = { "cert_with_password_bytes": password_protected_content, "cert_with_password_path": os.path.join(current_directory, "certificate-with-password." + extension), "password": password, } try: - with open(parameters["cert_path"], "wb") as f: - f.write(parameters["cert_bytes"]) with open(parameters["cert_with_password_path"], "wb") as f: f.write(parameters["cert_with_password_bytes"]) except IOError as ex: @@ -106,35 +118,49 @@ def get_certificate_parameters(content, password_protected_content, password, ex @pytest.fixture() def live_pem_certificate(live_service_principal): - content = os.environ.get("PEM_CONTENT") + content = os.environ.get("PEM_CONTENTS") password_protected_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED") password = os.environ.get("CERTIFICATE_PASSWORD") - if content and password_protected_content and password: - parameters = get_certificate_parameters( - content.encode("utf-8"), password_protected_content.encode("utf-8"), password, "pem" + cert_info = {} + + if content: + # content = content.replace("\\n", "\r\n") + parameters = get_certificate_parameters(content.encode("utf-8"), "pem") + cert_info.update(parameters) + + if password_protected_content and password: + parameters = get_certificate_with_password_parameters( + password_protected_content.encode("utf-8"), password, "pem" ) - return dict(live_service_principal, **parameters) + cert_info.update(parameters) + if cert_info: + return dict(live_service_principal, **cert_info) pytest.skip("Missing PEM certificate configuration") @pytest.fixture() def live_pfx_certificate(live_service_principal): # PFX bytes arrive base64 encoded because Key Vault secrets have string values - encoded_content = os.environ.get("PFX_CONTENT") + encoded_content = os.environ.get("PFX_CONTENTS") encoded_password_protected_content = os.environ.get("PFX_CONTENT_PASSWORD_PROTECTED") password = os.environ.get("CERTIFICATE_PASSWORD") - if encoded_content and encoded_password_protected_content and password: - import base64 + cert_info = {} + if encoded_content: content = base64.b64decode(encoded_content.encode("utf-8")) - password_protected_content = base64.b64decode(encoded_password_protected_content.encode("utf-8")) + parameters = get_certificate_parameters(content, "pfx") + cert_info.update(parameters) - parameters = get_certificate_parameters(content, password_protected_content, password, "pfx") - return dict(live_service_principal, **parameters) + if encoded_password_protected_content and password: + password_protected_content = base64.b64decode(encoded_password_protected_content.encode("utf-8")) + parameters = get_certificate_with_password_parameters(password_protected_content, password, "pfx") + cert_info.update(parameters) + if cert_info: + return dict(live_service_principal, **cert_info) pytest.skip("Missing PFX certificate configuration") diff --git a/sdk/identity/azure-identity/tests/test_live.py b/sdk/identity/azure-identity/tests/test_live.py index 9ac2c94bf282c..f57244dd843df 100644 --- a/sdk/identity/azure-identity/tests/test_live.py +++ b/sdk/identity/azure-identity/tests/test_live.py @@ -39,19 +39,22 @@ def test_certificate_credential(certificate_fixture, request): credential = CertificateCredential(tenant_id, client_id, cert["cert_path"]) get_token(credential) - credential = CertificateCredential(tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"]) - get_token(credential) - credential = CertificateCredential(tenant_id, client_id, certificate_data=cert["cert_bytes"]) - get_token(credential) - - credential = CertificateCredential( - tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"] - ) token = get_token(credential, enable_cae=True) parsed_payload = get_token_payload_contents(token.token) assert "xms_cc" in parsed_payload and "CP1" in parsed_payload["xms_cc"] + if "password" in cert: + credential = CertificateCredential( + tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"] + ) + get_token(credential) + + credential = CertificateCredential( + tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"] + ) + get_token(credential) + def test_client_secret_credential(live_service_principal): credential = ClientSecretCredential( diff --git a/sdk/identity/azure-identity/tests/test_live_async.py b/sdk/identity/azure-identity/tests/test_live_async.py index bf4ad7b35dee6..0786f7dfc2795 100644 --- a/sdk/identity/azure-identity/tests/test_live_async.py +++ b/sdk/identity/azure-identity/tests/test_live_async.py @@ -37,19 +37,22 @@ async def test_certificate_credential(certificate_fixture, request): credential = CertificateCredential(tenant_id, client_id, cert["cert_path"]) await get_token(credential) - credential = CertificateCredential(tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"]) - await get_token(credential) - credential = CertificateCredential(tenant_id, client_id, certificate_data=cert["cert_bytes"]) - await get_token(credential) - - credential = CertificateCredential( - tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"] - ) token = await get_token(credential, enable_cae=True) parsed_payload = get_token_payload_contents(token.token) assert "xms_cc" in parsed_payload and "CP1" in parsed_payload["xms_cc"] + if "password" in cert: + credential = CertificateCredential( + tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"] + ) + await get_token(credential) + + credential = CertificateCredential( + tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"] + ) + await get_token(credential, enable_cae=True) + @pytest.mark.asyncio async def test_client_secret_credential(live_service_principal): diff --git a/sdk/identity/tests.yml b/sdk/identity/tests.yml index 0257c5a426aa1..b304b416159c1 100644 --- a/sdk/identity/tests.yml +++ b/sdk/identity/tests.yml @@ -5,20 +5,16 @@ extends: parameters: ServiceDirectory: identity EnvVars: - AZURE_CLIENT_ID: $(python-identity-client-id) - AZURE_CLIENT_SECRET: $(python-identity-client-secret) - AZURE_TENANT_ID: $(aad-azure-sdk-test-tenant-id) - CERTIFICATE_PASSWORD: $(python-identity-certificate-password) - PEM_CONTENT: $(python-identity-certificate) - PEM_CONTENT_PASSWORD_PROTECTED: $(python-identity-certificate-with-password) - PFX_CONTENT: $(python-identity-certificate-pfx) - PFX_CONTENT_PASSWORD_PROTECTED: $(python-identity-certificate-with-password-pfx) + AZURE_CLIENT_ID: $(IDENTITY_SP_CLIENT_ID) + AZURE_CLIENT_SECRET: $(IDENTITY_SP_CLIENT_SECRET) + AZURE_TENANT_ID: $(IDENTITY_SP_TENANT_ID) AZURE_TEST_RUN_LIVE: true AZURE_SKIP_LIVE_RECORDING: 'True' CloudConfig: Public: SubscriptionConfigurations: - $(sub-config-azure-cloud-test-resources) + - $(sub-config-identity-test-resources) ${{ if contains(variables['Build.DefinitionName'], 'tests-weekly') }}: # Test Managed Identity integrations tests on weekly tests pipeline. AdditionalMatrixConfigs: