diff --git a/src/azure-cli-core/azure/cli/core/_profile.py b/src/azure-cli-core/azure/cli/core/_profile.py index 0d03195a464..78419d5b42b 100644 --- a/src/azure-cli-core/azure/cli/core/_profile.py +++ b/src/azure-cli-core/azure/cli/core/_profile.py @@ -11,6 +11,7 @@ from azure.cli.core._session import ACCOUNT from azure.cli.core.azclierror import AuthenticationError from azure.cli.core.cloud import get_active_cloud, set_cloud_subscription +from azure.cli.core.auth.credential_adaptor import CredentialAdaptor from azure.cli.core.util import in_cloud_console, can_launch_browser, is_github_codespaces from knack.log import get_logger from knack.util import CLIError @@ -313,9 +314,10 @@ def login_with_managed_identity_azure_arc(self, identity_id=None, allow_no_subsc import jwt identity_type = MsiAccountTypes.system_assigned from .auth.msal_credentials import ManagedIdentityCredential + from .auth.constants import ACCESS_TOKEN cred = ManagedIdentityCredential() - token = cred.get_token(*self._arm_scope).token + token = cred.acquire_token(self._arm_scope)[ACCESS_TOKEN] logger.info('Managed identity: token was retrieved. Now trying to initialize local accounts...') decode = jwt.decode(token, algorithms=['RS256'], options={"verify_signature": False}) tenant = decode['tid'] @@ -339,9 +341,10 @@ def login_with_managed_identity_azure_arc(self, identity_id=None, allow_no_subsc def login_in_cloud_shell(self): import jwt from .auth.msal_credentials import CloudShellCredential + from .auth.constants import ACCESS_TOKEN cred = CloudShellCredential() - token = cred.get_token(*self._arm_scope).token + token = cred.acquire_token(self._arm_scope)[ACCESS_TOKEN] logger.info('Cloud Shell token was retrieved. Now trying to initialize local accounts...') decode = jwt.decode(token, algorithms=['RS256'], options={"verify_signature": False}) tenant = decode['tid'] @@ -397,21 +400,19 @@ def get_login_credentials(self, subscription_id=None, aux_subscriptions=None, au if in_cloud_console() and account[_USER_ENTITY].get(_CLOUD_SHELL_ID): # Cloud Shell from .auth.msal_credentials import CloudShellCredential - from azure.cli.core.auth.credential_adaptor import CredentialAdaptor # The credential must be wrapped by CredentialAdaptor so that it can work with Track 1 SDKs. - cred = CredentialAdaptor(CloudShellCredential()) + sdk_cred = CredentialAdaptor(CloudShellCredential()) elif managed_identity_type: # managed identity if _on_azure_arc(): from .auth.msal_credentials import ManagedIdentityCredential - from azure.cli.core.auth.credential_adaptor import CredentialAdaptor # The credential must be wrapped by CredentialAdaptor so that it can work with Track 1 SDKs. - cred = CredentialAdaptor(ManagedIdentityCredential()) + sdk_cred = CredentialAdaptor(ManagedIdentityCredential()) else: # The resource is merely used by msrestazure to get the first access token. # It is not actually used in an API invocation. - cred = MsiAccountTypes.msi_auth_factory( + sdk_cred = MsiAccountTypes.msi_auth_factory( managed_identity_type, managed_identity_id, self.cli_ctx.cloud.endpoints.active_directory_resource_id) @@ -431,10 +432,9 @@ def get_login_credentials(self, subscription_id=None, aux_subscriptions=None, au external_credentials = [] for external_tenant in external_tenants: external_credentials.append(self._create_credential(account, tenant_id=external_tenant)) - from azure.cli.core.auth.credential_adaptor import CredentialAdaptor - cred = CredentialAdaptor(credential, auxiliary_credentials=external_credentials) + sdk_cred = CredentialAdaptor(credential, auxiliary_credentials=external_credentials) - return (cred, + return (sdk_cred, str(account[_SUBSCRIPTION_ID]), str(account[_TENANT_ID])) @@ -460,7 +460,7 @@ def get_raw_token(self, resource=None, scopes=None, subscription=None, tenant=No if tenant: raise CLIError("Tenant shouldn't be specified for Cloud Shell account") from .auth.msal_credentials import CloudShellCredential - cred = CloudShellCredential() + sdk_cred = CredentialAdaptor(CloudShellCredential()) elif managed_identity_type: # managed identity @@ -468,16 +468,16 @@ def get_raw_token(self, resource=None, scopes=None, subscription=None, tenant=No raise CLIError("Tenant shouldn't be specified for managed identity account") if _on_azure_arc(): from .auth.msal_credentials import ManagedIdentityCredential - cred = ManagedIdentityCredential() + sdk_cred = CredentialAdaptor(ManagedIdentityCredential()) else: from .auth.util import scopes_to_resource - cred = MsiAccountTypes.msi_auth_factory(managed_identity_type, managed_identity_id, - scopes_to_resource(scopes)) + sdk_cred = MsiAccountTypes.msi_auth_factory(managed_identity_type, managed_identity_id, + scopes_to_resource(scopes)) else: - cred = self._create_credential(account, tenant_id=tenant) + sdk_cred = CredentialAdaptor(self._create_credential(account, tenant_id=tenant)) - sdk_token = cred.get_token(*scopes) + sdk_token = sdk_cred.get_token(*scopes) # Convert epoch int 'expires_on' to datetime string 'expiresOn' for backward compatibility # WARNING: expiresOn is deprecated and will be removed in future release. import datetime @@ -856,7 +856,6 @@ def find_using_common_tenant(self, username, credential=None): specific_tenant_credential = identity.get_user_credential(username) try: - subscriptions = self.find_using_specific_tenant(tenant_id, specific_tenant_credential, tenant_id_description=t) except AuthenticationError as ex: @@ -927,9 +926,9 @@ def _create_subscription_client(self, credential): raise CLIInternalError("Unable to get '{}' in profile '{}'" .format(ResourceType.MGMT_RESOURCE_SUBSCRIPTIONS, self.cli_ctx.cloud.profile)) api_version = get_api_version(self.cli_ctx, ResourceType.MGMT_RESOURCE_SUBSCRIPTIONS) - client_kwargs = _prepare_mgmt_client_kwargs_track2(self.cli_ctx, credential) - - client = client_type(credential, api_version=api_version, + sdk_cred = CredentialAdaptor(credential) + client_kwargs = _prepare_mgmt_client_kwargs_track2(self.cli_ctx, sdk_cred) + client = client_type(sdk_cred, api_version=api_version, base_url=self.cli_ctx.cloud.endpoints.resource_manager, **client_kwargs) return client diff --git a/src/azure-cli-core/azure/cli/core/auth/constants.py b/src/azure-cli-core/azure/cli/core/auth/constants.py index b011d7816ec..9d964dca08f 100644 --- a/src/azure-cli-core/azure/cli/core/auth/constants.py +++ b/src/azure-cli-core/azure/cli/core/auth/constants.py @@ -4,3 +4,6 @@ # -------------------------------------------------------------------------------------------- AZURE_CLI_CLIENT_ID = '04b07795-8ddb-461a-bbee-02f9e1bf7b46' + +ACCESS_TOKEN = 'access_token' +EXPIRES_IN = "expires_in" diff --git a/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py b/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py index 7fc03bd2749..15963bf44f6 100644 --- a/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py +++ b/src/azure-cli-core/azure/cli/core/auth/credential_adaptor.py @@ -4,19 +4,19 @@ # -------------------------------------------------------------------------------------------- from knack.log import get_logger +from .util import build_sdk_access_token logger = get_logger(__name__) class CredentialAdaptor: def __init__(self, credential, auxiliary_credentials=None): - """Cross-tenant credential adaptor. It takes a main credential and auxiliary credentials. - + """Credential adaptor between MSAL credential and SDK credential. It implements Track 2 SDK's azure.core.credentials.TokenCredential by exposing get_token. - :param credential: Main credential from .msal_authentication - :param auxiliary_credentials: Credentials from .msal_authentication for cross tenant authentication. - Details about cross tenant authentication: + :param credential: MSAL credential from ._msal_credentials + :param auxiliary_credentials: MSAL credentials for cross-tenant authentication. + Details about cross-tenant authentication: https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/authenticate-multi-tenant """ @@ -32,11 +32,12 @@ def get_token(self, *scopes, **kwargs): if 'data' in kwargs: filtered_kwargs['data'] = kwargs['data'] - return self._credential.get_token(*scopes, **filtered_kwargs) + return build_sdk_access_token(self._credential.acquire_token(list(scopes), **filtered_kwargs)) def get_auxiliary_tokens(self, *scopes, **kwargs): """Get access tokens from auxiliary credentials.""" # To test cross-tenant authentication, see https://github.com/Azure/azure-cli/issues/16691 if self._auxiliary_credentials: - return [cred.get_token(*scopes, **kwargs) for cred in self._auxiliary_credentials] + return [build_sdk_access_token(cred.acquire_token(list(scopes), **kwargs)) + for cred in self._auxiliary_credentials] return None diff --git a/src/azure-cli-core/azure/cli/core/auth/identity.py b/src/azure-cli-core/azure/cli/core/auth/identity.py index 69f853a4a36..5fdcc99e040 100644 --- a/src/azure-cli-core/azure/cli/core/auth/identity.py +++ b/src/azure-cli-core/azure/cli/core/auth/identity.py @@ -192,9 +192,8 @@ def login_with_service_principal(self, client_id, credential, scopes): """ sp_auth = ServicePrincipalAuth.build_from_credential(self.tenant_id, client_id, credential) client_credential = sp_auth.get_msal_client_credential() - cca = ConfidentialClientApplication(client_id, client_credential=client_credential, **self._msal_app_kwargs) - result = cca.acquire_token_for_client(scopes) - check_result(result) + cred = ServicePrincipalCredential(client_id, client_credential, **self._msal_app_kwargs) + cred.acquire_token(scopes) # Only persist the service principal after a successful login entry = sp_auth.get_entry_to_persist() diff --git a/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py b/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py index c15d7ea5f7b..c8f568df2c3 100644 --- a/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py +++ b/src/azure-cli-core/azure/cli/core/auth/msal_credentials.py @@ -4,16 +4,7 @@ # -------------------------------------------------------------------------------------------- """ -Credentials defined in this module are alternative implementations of credentials provided by Azure Identity. - -These credentials implement azure.core.credentials.TokenCredential by exposing `get_token` method for Track 2 -SDK invocation. - -If you want to implement your own credential, the credential must also expose `get_token` method. - -`get_token` method takes `scopes` as positional arguments and other optional `kwargs`, such as `claims`, `data`. -The return value should be a named tuple containing two elements: token (str), expires_on (int). You may simply use -azure.cli.core.auth.util.AccessToken to build the return value. See below credentials as examples. +Credentials to acquire tokens from MSAL. """ from knack.log import get_logger @@ -22,7 +13,7 @@ ManagedIdentityClient, SystemAssignedManagedIdentity) from .constants import AZURE_CLI_CLIENT_ID -from .util import check_result, build_sdk_access_token +from .util import check_result logger = get_logger(__name__) @@ -30,7 +21,7 @@ class UserCredential: # pylint: disable=too-few-public-methods def __init__(self, client_id, username, **kwargs): - """User credential implementing get_token interface. + """User credential wrapping msal.application.PublicClientApplication :param client_id: Client ID of the CLI. :param username: The username for user credential. @@ -52,14 +43,16 @@ def __init__(self, client_id, username, **kwargs): self._account = accounts[0] - def get_token(self, *scopes, claims=None, **kwargs): - # scopes = ['https://pas.windows.net/CheckMyAccess/Linux/.default'] - logger.debug("UserCredential.get_token: scopes=%r, claims=%r, kwargs=%r", scopes, claims, kwargs) + def acquire_token(self, scopes, claims=None, **kwargs): + # scopes must be a list. + # For acquiring SSH certificate, scopes is ['https://pas.windows.net/CheckMyAccess/Linux/.default'] + # kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL + logger.debug("UserCredential.acquire_token: scopes=%r, claims=%r, kwargs=%r", scopes, claims, kwargs) if claims: logger.warning('Acquiring new access token silently for tenant %s with claims challenge: %s', self._msal_app.authority.tenant, claims) - result = self._msal_app.acquire_token_silent_with_error(list(scopes), self._account, claims_challenge=claims, + result = self._msal_app.acquire_token_silent_with_error(scopes, self._account, claims_challenge=claims, **kwargs) from azure.cli.core.azclierror import AuthenticationError @@ -82,7 +75,7 @@ def get_token(self, *scopes, claims=None, **kwargs): success_template, error_template = read_response_templates() result = self._msal_app.acquire_token_interactive( - list(scopes), login_hint=self._account['username'], + scopes, login_hint=self._account['username'], port=8400 if self._msal_app.authority.is_adfs else None, success_template=success_template, error_template=error_template, **kwargs) check_result(result) @@ -91,25 +84,24 @@ def get_token(self, *scopes, claims=None, **kwargs): # launch browser, but show the error message and `az login` command instead. else: raise - return build_sdk_access_token(result) + return result class ServicePrincipalCredential: # pylint: disable=too-few-public-methods def __init__(self, client_id, client_credential, **kwargs): - """Service principal credential implementing get_token interface. + """Service principal credential wrapping msal.application.ConfidentialClientApplication. :param client_id: The service principal's client ID. :param client_credential: client_credential that will be passed to MSAL. """ - self._msal_app = ConfidentialClientApplication(client_id, client_credential, **kwargs) - - def get_token(self, *scopes, **kwargs): - logger.debug("ServicePrincipalCredential.get_token: scopes=%r, kwargs=%r", scopes, kwargs) + self._msal_app = ConfidentialClientApplication(client_id, client_credential=client_credential, **kwargs) - result = self._msal_app.acquire_token_for_client(list(scopes), **kwargs) + def acquire_token(self, scopes, **kwargs): + logger.debug("ServicePrincipalCredential.acquire_token: scopes=%r, kwargs=%r", scopes, kwargs) + result = self._msal_app.acquire_token_for_client(scopes, **kwargs) check_result(result) - return build_sdk_access_token(result) + return result class CloudShellCredential: # pylint: disable=too-few-public-methods @@ -126,12 +118,11 @@ def __init__(self): # token_cache=... ) - def get_token(self, *scopes, **kwargs): - logger.debug("CloudShellCredential.get_token: scopes=%r, kwargs=%r", scopes, kwargs) - # kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL - result = self._msal_app.acquire_token_interactive(list(scopes), prompt="none", **kwargs) + def acquire_token(self, scopes, **kwargs): + logger.debug("CloudShellCredential.acquire_token: scopes=%r, kwargs=%r", scopes, kwargs) + result = self._msal_app.acquire_token_interactive(scopes, prompt="none", **kwargs) check_result(result, scopes=scopes) - return build_sdk_access_token(result) + return result class ManagedIdentityCredential: # pylint: disable=too-few-public-methods @@ -143,10 +134,10 @@ def __init__(self): import requests self._msal_client = ManagedIdentityClient(SystemAssignedManagedIdentity(), http_client=requests.Session()) - def get_token(self, *scopes, **kwargs): - logger.debug("ManagedIdentityCredential.get_token: scopes=%r, kwargs=%r", scopes, kwargs) + def acquire_token(self, scopes, **kwargs): + logger.debug("ManagedIdentityCredential.acquire_token: scopes=%r, kwargs=%r", scopes, kwargs) from .util import scopes_to_resource result = self._msal_client.acquire_token_for_client(resource=scopes_to_resource(scopes)) check_result(result) - return build_sdk_access_token(result) + return result diff --git a/src/azure-cli-core/azure/cli/core/auth/util.py b/src/azure-cli-core/azure/cli/core/auth/util.py index 58a3fdd8d02..43679c34616 100644 --- a/src/azure-cli-core/azure/cli/core/auth/util.py +++ b/src/azure-cli-core/azure/cli/core/auth/util.py @@ -140,9 +140,6 @@ def check_result(result, **kwargs): def build_sdk_access_token(token_entry): - import time - request_time = int(time.time()) - # MSAL token entry sample: # { # 'access_token': 'eyJ0eXAiOiJKV...', @@ -153,7 +150,8 @@ def build_sdk_access_token(token_entry): # Importing azure.core.credentials.AccessToken is expensive. # This can slow down commands that doesn't need azure.core, like `az account get-access-token`. # So We define our own AccessToken. - return AccessToken(token_entry["access_token"], request_time + token_entry["expires_in"]) + from .constants import ACCESS_TOKEN, EXPIRES_IN + return AccessToken(token_entry[ACCESS_TOKEN], _now_timestamp() + token_entry[EXPIRES_IN]) def decode_access_token(access_token): @@ -177,3 +175,8 @@ def read_response_templates(): error_template = f.read() return success_template, error_template + + +def _now_timestamp(): + import time + return int(time.time()) diff --git a/src/azure-cli-core/azure/cli/core/tests/test_profile.py b/src/azure-cli-core/azure/cli/core/tests/test_profile.py index 5954ec8f283..dc797de79ed 100644 --- a/src/azure-cli-core/azure/cli/core/tests/test_profile.py +++ b/src/azure-cli-core/azure/cli/core/tests/test_profile.py @@ -39,23 +39,48 @@ def _build_test_jwt(claims): return '.'.join(base64.urlsafe_b64encode(p.encode('utf-8')).decode('utf-8').replace('=', '') for p in parts) -class CredentialMock: +def _now_timestamp_mock(): + # 2021-09-06 08:55:23 + return 1630918523 + + +class MsalCredentialStub: def __init__(self, *args, **kwargs): - # If get_token_scopes is checked, make sure to create a new instance of CredentialMock + # If acquire_token_scopes is checked, make sure to create a new instance of MsalCredentialStub # to avoid interference from other tests. - self.get_token_scopes = None + self.acquire_token_scopes = None super().__init__() - def get_token(self, *scopes, **kwargs): - self.get_token_scopes = scopes - return AccessToken(MOCK_ACCESS_TOKEN, MOCK_EXPIRES_ON_INT) + def acquire_token(self, scopes, **kwargs): + self.acquire_token_scopes = scopes + return { + 'access_token': MOCK_ACCESS_TOKEN, + 'token_type': 'Bearer', + 'expires_in': 1800, + 'token_source': 'cache' + } # Used as the return_value of azure.cli.core.auth.identity.Identity.get_user_credential -# If we directly patch azure.cli.core.auth.msal_authentication.UserCredential with CredentialMock, +# If we directly patch azure.cli.core.auth.msal_authentication.UserCredential with MsalCredentialStub, # get_user_credential will prepare MSAL token cache and HTTP cache which is time-consuming and unnecessary. -credential_mock = CredentialMock() +credential_mock = MsalCredentialStub() + + +class CloudShellCredentialStub: + def __init__(self): + self.acquire_token_scopes = None + super().__init__() + + def acquire_token(self, scopes, **kwargs): + self.acquire_token_scopes = scopes + return { + 'access_token': TestProfile.test_cloud_shell_access_token, + 'token_type': 'Bearer', + 'expires_in': 1800, + 'token_source': 'cache' + } class MSRestAzureAuthStub: @@ -91,15 +116,6 @@ def get_token(self, *args, **kwargs): return AccessToken(self.token['access_token'], int(self.token['expires_on'])) -class CloudShellCredentialStub: - def __init__(self): - self.get_token_scopes = None - - def get_token(self, *scopes, **kwargs): - self.get_token_scopes = scopes - return AccessToken(TestProfile.test_cloud_shell_access_token, MOCK_EXPIRES_ON_INT) - - class TestProfile(unittest.TestCase): @classmethod @@ -485,7 +501,7 @@ def test_login_in_cloud_shell(self, cloud_shell_credential_mock, create_subscrip # Verify correct scopes are passed to get_token credential_instance = create_subscription_client_mock.call_args.args[1] - assert credential_instance.get_token_scopes == ('https://management.core.windows.net//.default',) + assert credential_instance.acquire_token_scopes == ['https://management.core.windows.net//.default'] self.assertEqual(len(subscriptions), 1) s = subscriptions[0] @@ -1127,9 +1143,10 @@ def test_get_login_credentials_mi_user_assigned_with_res_id(self): self.assertTrue(cred.token_read_count) self.assertTrue(cred.msi_res_id, test_res_id) + @mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock) @mock.patch('azure.cli.core.auth.identity.Identity.get_user_credential') def test_get_raw_token(self, get_user_credential_mock): - credential_mock_temp = CredentialMock() + credential_mock_temp = MsalCredentialStub() get_user_credential_mock.return_value = credential_mock_temp cli = DummyCli() # setup @@ -1163,7 +1180,7 @@ def test_get_raw_token(self, get_user_credential_mock): creds, sub, tenant = profile.get_raw_token(resource=self.adal_resource, tenant=self.tenant_id) # verify - assert list(credential_mock_temp.get_token_scopes) == self.msal_scopes + assert credential_mock_temp.acquire_token_scopes == self.msal_scopes self.assertEqual(creds[0], 'Bearer') self.assertEqual(creds[1], MOCK_ACCESS_TOKEN) @@ -1174,9 +1191,10 @@ def test_get_raw_token(self, get_user_credential_mock): self.assertIsNone(sub) self.assertEqual(tenant, self.tenant_id) + @mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock) @mock.patch('azure.cli.core.auth.identity.Identity.get_service_principal_credential') def test_get_raw_token_for_sp(self, get_service_principal_credential_mock): - credential_mock_temp = CredentialMock() + credential_mock_temp = MsalCredentialStub() get_service_principal_credential_mock.return_value = credential_mock_temp cli = DummyCli() # setup @@ -1190,7 +1208,7 @@ def test_get_raw_token_for_sp(self, get_service_principal_credential_mock): creds, sub, tenant = profile.get_raw_token(resource=self.adal_resource) # verify - assert list(credential_mock_temp.get_token_scopes) == self.msal_scopes + assert credential_mock_temp.acquire_token_scopes == self.msal_scopes self.assertEqual(creds[0], BEARER) self.assertEqual(creds[1], MOCK_ACCESS_TOKEN) @@ -1252,6 +1270,7 @@ def mi_auth_factory(*args, **kwargs): with self.assertRaisesRegex(CLIError, "Tenant shouldn't be specified"): cred, subscription_id, _ = profile.get_raw_token(resource='http://test_resource', tenant=self.tenant_id) + @mock.patch('azure.cli.core.auth.util._now_timestamp', new=_now_timestamp_mock) @mock.patch('azure.cli.core._profile.in_cloud_console', autospec=True) @mock.patch('azure.cli.core.auth.msal_credentials.CloudShellCredential', autospec=True) def test_get_raw_token_in_cloud_shell(self, cloud_shell_credential_mock, mock_in_cloud_console): @@ -1293,7 +1312,7 @@ def cloud_shell_credential_factory(): # Verify only one credential is created assert len(credential_instances) == 1 # Verify correct scopes are passed to get_token - assert list(credential_instances[0].get_token_scopes) == self.msal_scopes + assert credential_instances[0].acquire_token_scopes == self.msal_scopes self.assertEqual(subscription_id, test_subscription_id) self.assertEqual(token_tuple[0], 'Bearer') diff --git a/src/azure-cli-testsdk/azure/cli/testsdk/patches.py b/src/azure-cli-testsdk/azure/cli/testsdk/patches.py index ec401ed9cb0..e4a415585d3 100644 --- a/src/azure-cli-testsdk/azure/cli/testsdk/patches.py +++ b/src/azure-cli-testsdk/azure/cli/testsdk/patches.py @@ -68,16 +68,16 @@ class UserCredentialMock: def __init__(self, *args, **kwargs): super().__init__() - def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument + def acquire_token(self, scopes, **kwargs): # pylint: disable=unused-argument # Old Track 2 SDKs are no longer supported. https://github.com/Azure/azure-cli/pull/29690 assert len(scopes) == 1, "'scopes' must contain only one element." - - from azure.core.credentials import AccessToken - import time fake_raw_token = 'top-secret-token-for-you' - now = int(time.time()) - return AccessToken(fake_raw_token, now + 3600) - + return { + 'access_token': fake_raw_token, + 'token_type': 'Bearer', + 'expires_in': 1800, + 'token_source': 'cache' + } return UserCredentialMock() mock_in_unit_test(unit_test, 'azure.cli.core.auth.identity.Identity.get_user_credential', get_user_credential_mock)