diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py index 498d6afb32b21..ce401364619fd 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py @@ -8,7 +8,7 @@ from ._shared import KeyVaultClientBase from ._shared.exceptions import error_map as _error_map from ._shared._polling import DeleteRecoverPollingMethod, KeyVaultOperationPoller -from ._models import DeletedKey, KeyVaultKey, KeyProperties, RandomBytes, ReleaseKeyResult +from ._models import DeletedKey, KeyVaultKey, KeyProperties, KeyRotationLifetimeAction, KeyRotationPolicy, RandomBytes, ReleaseKeyResult try: from typing import TYPE_CHECKING @@ -725,3 +725,71 @@ def get_random_bytes(self, count, **kwargs): parameters = self._models.GetRandomBytesRequest(count=count) result = self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs) return RandomBytes(value=result.value) + + @distributed_trace + async def get_key_rotation_policy(self, name, **kwargs): + # type: (str, **Any) -> Optional[KeyRotationPolicy] + """Get the rotation policy of a Key Vault key. + + :param str name: The name of the key. + + :return: The key rotation policy, or None if there is no policy. + :rtype: ~azure.keyvault.keys.KeyRotationPolicy or None + """ + policy = self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs) + if policy.id: + return KeyRotationPolicy._from_generated(policy) + return None + + @distributed_trace + def rotate_key(self, name, **kwargs): + # type: (str, **Any) -> KeyVaultKey + """Rotate the key based on the key policy by generating a new version of the key. + + This operation requires the keys/rotate permission. + + :param str name: The name of the key to rotate. + + :return: The new version of the rotated key. + :rtype: ~azure.keyvault.keys.KeyVaultKey + :raises: :class:`~azure.core.exceptions.HttpResponseError` + """ + bundle = self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs) + return KeyVaultKey._from_key_bundle(bundle) + + @distributed_trace + def update_key_rotation_policy(self, name, **kwargs): + # type: (str, **Any) -> KeyRotationPolicy + """Updates the rotation policy of a Key Vault key. + + This operation requires the keys/update permission. + + :param str name: The name of the key in the given vault. + + :keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key. + :paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction] + :keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an + ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H". + + :return: The updated rotation policy. + :rtype: ~azure.keyvault.keys.KeyRotationPolicy + :raises: :class:`~azure.core.exceptions.HttpResponseError` + """ + lifetime_actions = kwargs.pop("lifetime_actions", None) + if lifetime_actions: + lifetime_actions = [ + self._models.LifetimeActions( + action=action.action, + trigger=self._models.LifetimeActionsTrigger( + time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry + ), + ) + for action in lifetime_actions + ] + + attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("exires_in", None)) + policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes) + result = self._client.update_key_rotation_policy( + vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy + ) + return KeyRotationPolicy._from_generated(result) diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py index 496844d1aa339..a03168a73a21e 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py @@ -11,12 +11,21 @@ from .._shared._polling_async import AsyncDeleteRecoverPollingMethod from .._shared import AsyncKeyVaultClientBase from .._shared.exceptions import error_map as _error_map -from .. import DeletedKey, JsonWebKey, KeyProperties, KeyVaultKey, RandomBytes, ReleaseKeyResult +from .. import ( + DeletedKey, + JsonWebKey, + KeyProperties, + KeyRotationLifetimeAction, + KeyRotationPolicy, + KeyVaultKey, + RandomBytes, + ReleaseKeyResult, +) if TYPE_CHECKING: # pylint:disable=ungrouped-imports from azure.core.async_paging import AsyncItemPaged - from typing import Any, Optional, Union + from typing import Any, Iterable, Optional, Union from .. import KeyType @@ -702,3 +711,69 @@ async def get_random_bytes(self, count: int, **kwargs: "Any") -> RandomBytes: parameters = self._models.GetRandomBytesRequest(count=count) result = await self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs) return RandomBytes(value=result.value) + + @distributed_trace_async + async def get_key_rotation_policy(self, name: str, **kwargs: "Any") -> "Optional[KeyRotationPolicy]": + """Get the rotation policy of a Key Vault key. + + :param str name: The name of the key. + + :return: The key rotation policy, or None if there is no policy. + :rtype: ~azure.keyvault.keys.KeyRotationPolicy or None + :raises: :class:`~azure.core.exceptions.HttpResponseError` + """ + policy = await self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs) + if policy.id: + return KeyRotationPolicy._from_generated(policy) + return None + + @distributed_trace_async + async def rotate_key(self, name: str, **kwargs: "Any") -> KeyVaultKey: + """Rotate the key based on the key policy by generating a new version of the key. + + This operation requires the keys/rotate permission. + + :param str name: The name of the key to rotate. + + :return: The new version of the rotated key. + :rtype: ~azure.keyvault.keys.KeyVaultKey + :raises: :class:`~azure.core.exceptions.HttpResponseError` + """ + bundle = await self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs) + return KeyVaultKey._from_key_bundle(bundle) + + @distributed_trace_async + async def update_key_rotation_policy(self, name: str, **kwargs: "Any") -> KeyRotationPolicy: + """Updates the rotation policy of a Key Vault key. + + This operation requires the keys/update permission. + + :param str name: The name of the key in the given vault. + + :keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key. + :paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction] + :keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an + ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H". + + :return: The updated rotation policy. + :rtype: ~azure.keyvault.keys.KeyRotationPolicy + :raises: :class:`~azure.core.exceptions.HttpResponseError` + """ + lifetime_actions = kwargs.pop("lifetime_actions", None) + if lifetime_actions: + lifetime_actions = [ + self._models.LifetimeActions( + action=action.action, + trigger=self._models.LifetimeActionsTrigger( + time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry + ), + ) + for action in lifetime_actions + ] + + attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("exires_in", None)) + policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes) + result = await self._client.update_key_rotation_policy( + vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy + ) + return KeyRotationPolicy._from_generated(result)