Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Identity] Update live test config #35019

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions sdk/identity/azure-identity/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import base64
import os
import sys

Expand Down Expand Up @@ -82,20 +83,31 @@ def live_service_principal():
}


def get_certificate_parameters(content, password_protected_content, password, extension):
# type: (bytes, bytes, str, str) -> dict
def get_certificate_parameters(content: bytes, extension: str) -> dict:
current_directory = os.path.dirname(__file__)
parameters = {
"cert_bytes": content,
"cert_path": os.path.join(current_directory, "certificate." + extension),
}

try:
with open(parameters["cert_path"], "wb") as f:
f.write(parameters["cert_bytes"])
except IOError as ex:
pytest.skip("Failed to write a file: {}".format(ex))

return parameters


def get_certificate_with_password_parameters(password_protected_content: bytes, password: str, extension: str) -> dict:
current_directory = os.path.dirname(__file__)
parameters = {
"cert_with_password_bytes": password_protected_content,
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password." + extension),
"password": password,
}

try:
with open(parameters["cert_path"], "wb") as f:
f.write(parameters["cert_bytes"])
with open(parameters["cert_with_password_path"], "wb") as f:
f.write(parameters["cert_with_password_bytes"])
except IOError as ex:
Expand All @@ -110,31 +122,45 @@ def live_pem_certificate(live_service_principal):
password_protected_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")

if content and password_protected_content and password:
parameters = get_certificate_parameters(
content.encode("utf-8"), password_protected_content.encode("utf-8"), password, "pem"
cert_info = {}

if content:
content = content.replace("\\n", "\r\n")
parameters = get_certificate_parameters(content.encode("utf-8"), "pem")
cert_info.update(parameters)

if password_protected_content and password:
parameters = get_certificate_with_password_parameters(
password_protected_content.encode("utf-8"), password, "pem"
)
return dict(live_service_principal, **parameters)
cert_info.update(parameters)

if cert_info:
return dict(live_service_principal, **cert_info)
pytest.skip("Missing PEM certificate configuration")


@pytest.fixture()
def live_pfx_certificate(live_service_principal):
# PFX bytes arrive base64 encoded because Key Vault secrets have string values
encoded_content = os.environ.get("PFX_CONTENT")
encoded_content = os.environ.get("PFX_CONTENTS")
encoded_password_protected_content = os.environ.get("PFX_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")

if encoded_content and encoded_password_protected_content and password:
import base64
cert_info = {}

if encoded_content:
content = base64.b64decode(encoded_content.encode("utf-8"))
password_protected_content = base64.b64decode(encoded_password_protected_content.encode("utf-8"))
parameters = get_certificate_parameters(content, "pfx")
cert_info.update(parameters)

parameters = get_certificate_parameters(content, password_protected_content, password, "pfx")
return dict(live_service_principal, **parameters)
if encoded_password_protected_content and password:
password_protected_content = base64.b64decode(encoded_password_protected_content.encode("utf-8"))
parameters = get_certificate_with_password_parameters(password_protected_content, password, "pfx")
cert_info.update(parameters)

if cert_info:
return dict(live_service_principal, **cert_info)
pytest.skip("Missing PFX certificate configuration")


Expand Down
19 changes: 11 additions & 8 deletions sdk/identity/azure-identity/tests/test_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ def test_certificate_credential(certificate_fixture, request):
credential = CertificateCredential(tenant_id, client_id, cert["cert_path"])
get_token(credential)

credential = CertificateCredential(tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"])
get_token(credential)

credential = CertificateCredential(tenant_id, client_id, certificate_data=cert["cert_bytes"])
get_token(credential)

credential = CertificateCredential(
tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"]
)
token = get_token(credential, enable_cae=True)
parsed_payload = get_token_payload_contents(token.token)
assert "xms_cc" in parsed_payload and "CP1" in parsed_payload["xms_cc"]

if "password" in cert:
credential = CertificateCredential(
tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"]
)
get_token(credential)

credential = CertificateCredential(
tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"]
)
get_token(credential)


def test_client_secret_credential(live_service_principal):
credential = ClientSecretCredential(
Expand Down
19 changes: 11 additions & 8 deletions sdk/identity/azure-identity/tests/test_live_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ async def test_certificate_credential(certificate_fixture, request):
credential = CertificateCredential(tenant_id, client_id, cert["cert_path"])
await get_token(credential)

credential = CertificateCredential(tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"])
await get_token(credential)

credential = CertificateCredential(tenant_id, client_id, certificate_data=cert["cert_bytes"])
await get_token(credential)

credential = CertificateCredential(
tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"]
)
token = await get_token(credential, enable_cae=True)
parsed_payload = get_token_payload_contents(token.token)
assert "xms_cc" in parsed_payload and "CP1" in parsed_payload["xms_cc"]

if "password" in cert:
credential = CertificateCredential(
tenant_id, client_id, cert["cert_with_password_path"], password=cert["password"]
)
await get_token(credential)

credential = CertificateCredential(
tenant_id, client_id, certificate_data=cert["cert_with_password_bytes"], password=cert["password"]
)
await get_token(credential, enable_cae=True)


@pytest.mark.asyncio
async def test_client_secret_credential(live_service_principal):
Expand Down
11 changes: 4 additions & 7 deletions sdk/identity/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@ extends:
parameters:
ServiceDirectory: identity
EnvVars:
AZURE_CLIENT_ID: $(python-identity-client-id)
AZURE_CLIENT_SECRET: $(python-identity-client-secret)
AZURE_TENANT_ID: $(aad-azure-sdk-test-tenant-id)
CERTIFICATE_PASSWORD: $(python-identity-certificate-password)
AZURE_CLIENT_ID: $(IDENTITY_SP_CLIENT_ID)
AZURE_CLIENT_SECRET: $(IDENTITY_SP_CLIENT_SECRET)
AZURE_TENANT_ID: $(IDENTITY_SP_TENANT_ID)
PEM_CONTENT: $(python-identity-certificate)
PEM_CONTENT_PASSWORD_PROTECTED: $(python-identity-certificate-with-password)
PFX_CONTENT: $(python-identity-certificate-pfx)
PFX_CONTENT_PASSWORD_PROTECTED: $(python-identity-certificate-with-password-pfx)
AZURE_TEST_RUN_LIVE: true
AZURE_SKIP_LIVE_RECORDING: 'True'
CloudConfig:
Public:
SubscriptionConfigurations:
- $(sub-config-azure-cloud-test-resources)
- $(sub-config-identity-test-resources)
${{ if contains(variables['Build.DefinitionName'], 'tests-weekly') }}:
# Test Managed Identity integrations tests on weekly tests pipeline.
AdditionalMatrixConfigs:
Expand Down
Loading