From 8a2db99b63f4db8692b2dac1684659480c1eee45 Mon Sep 17 00:00:00 2001 From: nappelson Date: Mon, 12 Jul 2021 16:45:46 -0400 Subject: [PATCH] Add option to enforce kms key_id upon decrypting with AwsKmsCryptographicMaterialsProvider --- src/dynamodb_encryption_sdk/material_providers/aws_kms.py | 8 ++++++++ test/unit/material_providers/test_aws_kms.py | 1 + 2 files changed, 9 insertions(+) diff --git a/src/dynamodb_encryption_sdk/material_providers/aws_kms.py b/src/dynamodb_encryption_sdk/material_providers/aws_kms.py index ea7a55f2..994aa3a6 100644 --- a/src/dynamodb_encryption_sdk/material_providers/aws_kms.py +++ b/src/dynamodb_encryption_sdk/material_providers/aws_kms.py @@ -154,6 +154,10 @@ class AwsKmsCryptographicMaterialsProvider(CryptographicMaterialsProvider): :param dict material_description: Material description to use as default state for this CMP (optional) :param dict regional_clients: Dictionary mapping AWS region names to pre-configured boto3 KMS clients (optional) + #param bool enforce_decrypt_with_key_id: The KeyId parameter is optional when decrypting ciphertext + that was encrypted under a symmetric CMK. This is because KMS can get information from metadata + added to symmetric ciphertext blob. By default, we do not enforce that the key_id is used for + decryption. When set to true, we ensure KMS only uses the specified CMK (key_id) for encryption. """ _key_id = attr.ib(validator=attr.validators.instance_of(six.string_types)) @@ -175,6 +179,7 @@ def __init__( grant_tokens=None, # type: Optional[Tuple[Text]] material_description=None, # type: Optional[Dict[Text, Text]] regional_clients=None, # type: Optional[Dict[Text, botocore.client.BaseClient]] + enforce_decrypt_with_key_id=False, # type: bool ): # noqa=D107 # type: (...) -> None # Workaround pending resolution of attrs/mypy interaction. @@ -195,6 +200,7 @@ def __init__( self._grant_tokens = grant_tokens self._material_description = material_description self._regional_clients = regional_clients + self._enforce_decrypt_with_key_id = enforce_decrypt_with_key_id attr.validate(self) self.__attrs_post_init__() @@ -386,6 +392,8 @@ def _decrypt_initial_material(self, encryption_context): kms_params = dict(CiphertextBlob=encrypted_initial_material, EncryptionContext=kms_encryption_context) if self._grant_tokens: kms_params["GrantTokens"] = self._grant_tokens + if self._enforce_decrypt_with_key_id: + kms_params["KeyId"] = key_id # Catch any boto3 errors and normalize to expected UnwrappingError try: response = self._client(key_id).decrypt(**kms_params) diff --git a/test/unit/material_providers/test_aws_kms.py b/test/unit/material_providers/test_aws_kms.py index edcd301d..8cad4de2 100644 --- a/test/unit/material_providers/test_aws_kms.py +++ b/test/unit/material_providers/test_aws_kms.py @@ -250,6 +250,7 @@ def test_kms_cmp_values_set(kwargs): assert cmp._grant_tokens == kwargs.get("grant_tokens", ()) assert cmp._material_description == kwargs.get("material_description", {}) assert cmp._regional_clients == kwargs.get("regional_clients", {}) + assert cmp._enforce_decrypt_with_key_id == kwargs.get("enforce_decrypt_with_key_id", False) def test_add_regional_client_known_region(default_kms_cmp, patch_boto3_session):