Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable authority for DefaultAzureCredential #8154

Merged
merged 2 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdk/identity/azure-identity/azure/identity/_credentials/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ class DefaultAzureCredential(ChainedTokenCredential):
3. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. This requires a
value for the environment variable ``AZURE_USERNAME``. See :class:`~azure.identity.SharedTokenCacheCredential`
for more details.

Keyword arguments
- **authority** (str): Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
"""

def __init__(self, **kwargs):
credentials = [EnvironmentCredential(**kwargs), ManagedIdentityCredential(**kwargs)]
authority = kwargs.pop("authority", None)
credentials = [EnvironmentCredential(authority=authority, **kwargs), ManagedIdentityCredential(**kwargs)]

# SharedTokenCacheCredential is part of the default only on supported platforms, when $AZURE_USERNAME has a
# value (because the cache may contain tokens for multiple identities and we can only choose one arbitrarily
Expand All @@ -38,7 +44,9 @@ def __init__(self, **kwargs):
and EnvironmentVariables.AZURE_PASSWORD not in os.environ
):
credentials.append(
SharedTokenCacheCredential(username=os.environ.get(EnvironmentVariables.AZURE_USERNAME), **kwargs)
SharedTokenCacheCredential(
username=os.environ.get(EnvironmentVariables.AZURE_USERNAME), authority=authority, **kwargs
)
)

super(DefaultAzureCredential, self).__init__(*credentials)
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ class DefaultAzureCredential(ChainedTokenCredential):
3. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. This requires a
value for the environment variable ``AZURE_USERNAME``. See
:class:`~azure.identity.aio.SharedTokenCacheCredential` for more details.

Keyword arguments
- **authority** (str): Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
"""

def __init__(self, **kwargs):
credentials = [EnvironmentCredential(**kwargs), ManagedIdentityCredential(**kwargs)]
authority = kwargs.pop("authority", None)
credentials = [EnvironmentCredential(authority=authority, **kwargs), ManagedIdentityCredential(**kwargs)]

# SharedTokenCacheCredential is part of the default only on supported platforms, when $AZURE_USERNAME has a
# value (because the cache may contain tokens for multiple identities and we can only choose one arbitrarily
Expand All @@ -38,7 +44,9 @@ def __init__(self, **kwargs):
and EnvironmentVariables.AZURE_PASSWORD not in os.environ
):
credentials.append(
SharedTokenCacheCredential(username=os.environ.get(EnvironmentVariables.AZURE_USERNAME), **kwargs)
SharedTokenCacheCredential(
username=os.environ.get(EnvironmentVariables.AZURE_USERNAME), authority=authority, **kwargs
)
)

super().__init__(*credentials)
61 changes: 61 additions & 0 deletions sdk/identity/azure-identity/tests/test_default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from azure.core.credentials import AccessToken
from azure.identity import DefaultAzureCredential, KnownAuthorities, SharedTokenCacheCredential
from azure.identity._constants import EnvironmentVariables
from six.moves.urllib_parse import urlparse

from helpers import mock_response

try:
from unittest.mock import Mock, patch
except ImportError: # python < 3.3
from mock import Mock, patch # type: ignore


def test_default_credential_authority():
# TODO need a mock cache to test SharedTokenCacheCredential
tenant_id = "expected_tenant"
expected_access_token = "***"
response = mock_response(
json_payload={
"access_token": expected_access_token,
"expires_in": 0,
"expires_on": 42,
"not_before": 0,
"resource": "scope",
"token_type": "Bearer",
}
)

def exercise_credentials(authority_kwarg, expected_authority=None):
expected_authority = expected_authority or authority_kwarg
def send(request, **_):
scheme, netloc, path, _, _, _ = urlparse(request.url)
assert scheme == "https"
assert netloc == expected_authority
assert path.startswith("/" + tenant_id)
return response

# environment credential configured with client secret should respect authority
environment = {
EnvironmentVariables.AZURE_CLIENT_ID: "client_id",
EnvironmentVariables.AZURE_CLIENT_SECRET: "secret",
EnvironmentVariables.AZURE_TENANT_ID: tenant_id,
}
with patch("os.environ", environment):
transport=Mock(send=send)
access_token, _ = DefaultAzureCredential(authority=authority_kwarg, transport=transport).get_token("scope")
assert access_token == expected_access_token

# managed identity credential should ignore authority
with patch("os.environ", {EnvironmentVariables.MSI_ENDPOINT: "https://some.url"}):
transport = Mock(send=lambda *_, **__: response)
access_token, _ = DefaultAzureCredential(authority=authority_kwarg, transport=transport).get_token("scope")
assert access_token == expected_access_token

# all credentials not representing managed identities should use a specified authority or default to public cloud
exercise_credentials("authority.com")
exercise_credentials(None, KnownAuthorities.AZURE_PUBLIC_CLOUD)
72 changes: 72 additions & 0 deletions sdk/identity/azure-identity/tests/test_default_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
from unittest.mock import Mock, patch
from urllib.parse import urlparse

from azure.core.credentials import AccessToken
from azure.identity import KnownAuthorities
from azure.identity.aio import DefaultAzureCredential, SharedTokenCacheCredential
from azure.identity._constants import EnvironmentVariables
import pytest

from helpers import mock_response


@pytest.mark.asyncio
async def test_default_credential_authority():
# TODO need a mock cache to test SharedTokenCacheCredential
you_shall_not_pass = "sentinel"
authority = "authority.com"
tenant_id = "expected_tenant"
expected_access_token = "***"
response = mock_response(
json_payload={
"access_token": expected_access_token,
"expires_in": 0,
"expires_on": 42,
"not_before": 0,
"resource": "scope",
"token_type": "Bearer",
}
)

async def exercise_credentials(authority_kwarg, expected_authority=None):
expected_authority = expected_authority or authority_kwarg
async def send(request, **_):
scheme, netloc, path, _, _, _ = urlparse(request.url)
assert scheme == "https"
assert netloc == expected_authority
assert path.startswith("/" + tenant_id)
return response

# environment credential configured with client secret should respect authority
environment = {
EnvironmentVariables.AZURE_CLIENT_ID: "client_id",
EnvironmentVariables.AZURE_CLIENT_SECRET: "secret",
EnvironmentVariables.AZURE_TENANT_ID: tenant_id,
}
with patch("os.environ", environment):
transport = Mock(send=send)
if authority_kwarg:
credential = DefaultAzureCredential(authority=authority_kwarg, transport=transport)
else:
credential = DefaultAzureCredential(transport=transport)
access_token, _ = await credential.get_token("scope")
assert access_token == expected_access_token

# managed identity credential should ignore authority
with patch("os.environ", {EnvironmentVariables.MSI_ENDPOINT: "https://some.url"}):
transport = Mock(send=asyncio.coroutine(lambda *_, **__: response))
if authority_kwarg:
credential = DefaultAzureCredential(authority=authority_kwarg, transport=transport)
else:
credential = DefaultAzureCredential(transport=transport)
access_token, _ = await credential.get_token("scope")
assert access_token == expected_access_token

# all credentials not representing managed identities should use a specified authority or default to public cloud
await exercise_credentials("authority.com")
await exercise_credentials(None, KnownAuthorities.AZURE_PUBLIC_CLOUD)