Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Key Vault] Add support for key rotation #20416

Merged
merged 7 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/keyvault/azure-keyvault-keys/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
## 4.5.0b4 (Unreleased)

### Features Added
- Added support for automated and on-demand key rotation in Azure Key Vault
([#19840](https://github.com/Azure/azure-sdk-for-python/issues/19840))
- Added `KeyClient.rotate_key` to rotate a key on-demand
- Added `KeyClient.update_key_rotation_policy` to update a key's automated rotation policy

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# -------------------------------------
from ._enums import KeyCurveName, KeyExportEncryptionAlgorithm, KeyOperation, KeyType
from ._enums import KeyCurveName, KeyExportEncryptionAlgorithm, KeyOperation, KeyRotationPolicyAction, KeyType
from ._shared.client_base import ApiVersion
from ._models import (
DeletedKey,
JsonWebKey,
KeyProperties,
KeyReleasePolicy,
KeyRotationLifetimeAction,
KeyRotationPolicy,
KeyVaultKey,
KeyVaultKeyIdentifier,
RandomBytes,
Expand All @@ -25,10 +27,13 @@
"KeyCurveName",
"KeyExportEncryptionAlgorithm",
"KeyOperation",
"KeyRotationPolicyAction",
"KeyType",
"DeletedKey",
"KeyProperties",
"KeyReleasePolicy",
"KeyRotationLifetimeAction",
"KeyRotationPolicy",
"RandomBytes",
"ReleaseKeyResult",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ._shared import KeyVaultClientBase
from ._shared.exceptions import error_map as _error_map
from ._shared._polling import DeleteRecoverPollingMethod, KeyVaultOperationPoller
from ._models import DeletedKey, KeyVaultKey, KeyProperties, RandomBytes, ReleaseKeyResult
from ._models import DeletedKey, KeyVaultKey, KeyProperties, KeyRotationPolicy, RandomBytes, ReleaseKeyResult

try:
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -727,3 +727,71 @@ def get_random_bytes(self, count, **kwargs):
parameters = self._models.GetRandomBytesRequest(count=count)
result = self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs)
return RandomBytes(value=result.value)

@distributed_trace
def get_key_rotation_policy(self, name, **kwargs):
# type: (str, **Any) -> Optional[KeyRotationPolicy]
"""Get the rotation policy of a Key Vault key.

:param str name: The name of the key.

:return: The key rotation policy, or None if there is no policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy or None
"""
policy = self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs)
if policy.id:
return KeyRotationPolicy._from_generated(policy)
return None

@distributed_trace
def rotate_key(self, name, **kwargs):
# type: (str, **Any) -> KeyVaultKey
"""Rotate the key based on the key policy by generating a new version of the key.

This operation requires the keys/rotate permission.

:param str name: The name of the key to rotate.

:return: The new version of the rotated key.
:rtype: ~azure.keyvault.keys.KeyVaultKey
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyVaultKey._from_key_bundle(bundle)

@distributed_trace
def update_key_rotation_policy(self, name, **kwargs):
# type: (str, **Any) -> KeyRotationPolicy
"""Updates the rotation policy of a Key Vault key.

This operation requires the keys/update permission.

:param str name: The name of the key in the given vault.

:keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction]
:keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an
ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H".

:return: The updated rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
lifetime_actions = kwargs.pop("lifetime_actions", None)
if lifetime_actions:
lifetime_actions = [
self._models.LifetimeActions(
action=self._models.LifetimeActionsType(type=action.action),
trigger=self._models.LifetimeActionsTrigger(
time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry
),
)
for action in lifetime_actions
]

attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("expires_in", None))
policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes)
result = self._client.update_key_rotation_policy(
vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy
)
return KeyRotationPolicy._from_generated(result)
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class KeyOperation(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
export = "export"


class KeyRotationPolicyAction(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
"""The action that will be executed in a key rotation policy"""

ROTATE = "Rotate" #: Rotate the key based on the key policy.
NOTIFY = "Notify" #: Trigger Event Grid events. Not configurable in preview; defaults to 30 days before expiry.


class KeyType(with_metaclass(CaseInsensitiveEnumMeta, str, Enum)):
"""Supported key types"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Any, Dict, Optional, List
from datetime import datetime
from ._generated.v7_0 import models as _models
from ._enums import KeyOperation, KeyType
from ._enums import KeyOperation, KeyRotationPolicyAction, KeyType

KeyOperationResult = namedtuple("KeyOperationResult", ["id", "value"])

Expand Down Expand Up @@ -279,6 +279,71 @@ def __init__(self, value):
self.value = value


class KeyRotationLifetimeAction(object):
"""An action and its corresponding trigger that will be performed by Key Vault over the lifetime of a key.

:param action: The action that will be executed.
:type action: ~azure.keyvault.keys.KeyRotationPolicyAction or str

:keyword str time_after_create: Time after creation to attempt the specified action, as an ISO 8601 duration.
For example, 90 days is "P90D".
:keyword str time_before_expiry: Time before expiry to attempt the specified action, as an ISO 8601 duration.
For example, 90 days is "P90D".
"""

def __init__(self, action, **kwargs):
# type: (KeyRotationPolicyAction, **Any) -> None
self.action = action
self.time_after_create = kwargs.get("time_after_create", None)
self.time_before_expiry = kwargs.get("time_before_expiry", None)

@classmethod
def _from_generated(cls, lifetime_action):
if lifetime_action.trigger:
return cls(
action=lifetime_action.action,
time_after_create=lifetime_action.trigger.time_after_create,
time_before_expiry=lifetime_action.trigger.time_before_expiry,
)
return cls(action=lifetime_action.action)


class KeyRotationPolicy(object):
"""The key rotation policy that belongs to a key.

:ivar str id: The identifier of the key rotation policy.
:ivar lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:type lifetime_actions: list[~azure.keyvault.keys.KeyRotationLifetimeAction]
:ivar str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an ISO
8601 duration. For example, 90 days is "P90D".
:ivar created_on: When the policy was created, in UTC
:type created_on: ~datetime.datetime
:ivar updated_on: When the policy was last updated, in UTC
:type updated_on: ~datetime.datetime
"""

def __init__(self, policy_id, **kwargs):
# type: (str, **Any) -> None
self.id = policy_id
self.lifetime_actions = kwargs.get("lifetime_actions", None)
self.expires_in = kwargs.get("expires_in", None)
self.created_on = kwargs.get("created_on", None)
self.updated_on = kwargs.get("updated_on", None)

@classmethod
def _from_generated(cls, policy):
lifetime_actions = [KeyRotationLifetimeAction._from_generated(action) for action in policy.lifetime_actions]
if policy.attributes:
return cls(
policy_id=policy.id,
lifetime_actions=lifetime_actions,
expires_on=policy.attributes.expiry_time,
created_on=policy.attributes.created,
updated_on=policy.attributes.updated,
)
return cls(policy_id=policy.id, lifetime_actions=lifetime_actions)


class KeyVaultKey(object):
"""A key's attributes and cryptographic material.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
from .._shared._polling_async import AsyncDeleteRecoverPollingMethod
from .._shared import AsyncKeyVaultClientBase
from .._shared.exceptions import error_map as _error_map
from .. import DeletedKey, JsonWebKey, KeyProperties, KeyVaultKey, RandomBytes, ReleaseKeyResult
from .. import (
DeletedKey,
JsonWebKey,
KeyProperties,
KeyRotationPolicy,
KeyVaultKey,
RandomBytes,
ReleaseKeyResult,
)

if TYPE_CHECKING:
# pylint:disable=ungrouped-imports
from azure.core.async_paging import AsyncItemPaged
from typing import Any, Optional, Union
from typing import Any, Iterable, Optional, Union
from .. import KeyType


Expand Down Expand Up @@ -702,3 +710,69 @@ async def get_random_bytes(self, count: int, **kwargs: "Any") -> RandomBytes:
parameters = self._models.GetRandomBytesRequest(count=count)
result = await self._client.get_random_bytes(vault_base_url=self._vault_url, parameters=parameters, **kwargs)
return RandomBytes(value=result.value)

@distributed_trace_async
async def get_key_rotation_policy(self, name: str, **kwargs: "Any") -> "Optional[KeyRotationPolicy]":
"""Get the rotation policy of a Key Vault key.

:param str name: The name of the key.

:return: The key rotation policy, or None if there is no policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy or None
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
policy = await self._client.get_key_rotation_policy(vault_base_url=self._vault_url, key_name=name, **kwargs)
if policy.id:
return KeyRotationPolicy._from_generated(policy)
return None

@distributed_trace_async
async def rotate_key(self, name: str, **kwargs: "Any") -> KeyVaultKey:
"""Rotate the key based on the key policy by generating a new version of the key.

This operation requires the keys/rotate permission.

:param str name: The name of the key to rotate.

:return: The new version of the rotated key.
:rtype: ~azure.keyvault.keys.KeyVaultKey
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
bundle = await self._client.rotate_key(vault_base_url=self._vault_url, key_name=name, **kwargs)
return KeyVaultKey._from_key_bundle(bundle)

@distributed_trace_async
async def update_key_rotation_policy(self, name: str, **kwargs: "Any") -> KeyRotationPolicy:
"""Updates the rotation policy of a Key Vault key.

This operation requires the keys/update permission.

:param str name: The name of the key in the given vault.

:keyword lifetime_actions: Actions that will be performed by Key Vault over the lifetime of a key.
:paramtype lifetime_actions: Iterable[~azure.keyvault.keys.KeyRotationLifetimeAction]
:keyword str expires_in: The expiry time of the policy that will be applied on new key versions, defined as an
ISO 8601 duration. For example: 90 days is "P90D", 3 months is "P3M", and 48 hours is "PT48H".

:return: The updated rotation policy.
:rtype: ~azure.keyvault.keys.KeyRotationPolicy
:raises: :class:`~azure.core.exceptions.HttpResponseError`
"""
lifetime_actions = kwargs.pop("lifetime_actions", None)
if lifetime_actions:
lifetime_actions = [
self._models.LifetimeActions(
action=self._models.LifetimeActionsType(type=action.action),
trigger=self._models.LifetimeActionsTrigger(
time_after_create=action.time_after_create, time_before_expiry=action.time_before_expiry
),
)
for action in lifetime_actions
]

attributes = self._models.KeyRotationPolicyAttributes(expiry_time=kwargs.pop("expires_in", None))
policy = self._models.KeyRotationPolicy(lifetime_actions=lifetime_actions, attributes=attributes)
result = await self._client.update_key_rotation_policy(
vault_base_url=self._vault_url, key_name=name, key_rotation_policy=policy
)
return KeyRotationPolicy._from_generated(result)
Loading