Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Changes to decryption codepath to allow v2.1 #37455

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

_ENCRYPTION_PROTOCOL_V1 = '1.0'
_ENCRYPTION_PROTOCOL_V2 = '2.0'
_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
vincenttran-msft marked this conversation as resolved.
Show resolved Hide resolved
_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
_GCM_NONCE_LENGTH = 12
_GCM_TAG_LENGTH = 16
Expand Down Expand Up @@ -293,14 +296,14 @@ def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:

def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
"""
Determine whether the given encryption data signifies version 2.0.
Determine whether the given encryption data signifies version 2.0 or 2.1.

:param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
:return: True, if the encryption data indicates encryption V2, false otherwise.
:rtype: bool
"""
# If encryption_data is None, assume no encryption
return bool(encryption_data and (encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2))
return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))


def modify_user_agent_for_encryption(
Expand Down Expand Up @@ -405,7 +408,7 @@ def get_adjusted_download_range_and_offset(
end_offset = 15 - (end % 16)
end += end_offset

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
start_offset, end_offset = 0, end

if encryption_data.encrypted_region_info is None:
Expand Down Expand Up @@ -550,7 +553,7 @@ def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _Encryptio
"""
try:
protocol = encryption_data_dict['EncryptionAgent']['Protocol']
if protocol not in [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2]:
if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError("Unsupported encryption version.")
except KeyError as exc:
raise ValueError("Unsupported encryption version.") from exc
Expand Down Expand Up @@ -636,7 +639,7 @@ def _validate_and_unwrap_cek(
# Validate we have the right info for the specified version
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
_validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
_validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
else:
raise ValueError('Specified encryption version is not supported.')
Expand All @@ -662,8 +665,8 @@ def _validate_and_unwrap_cek(

# For V2, the version is included with the cek. We need to validate it
# and remove it from the actual cek.
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
version_2_bytes = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0')
if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
if cek_version_bytes != version_2_bytes:
raise ValueError('The encryption metadata is not valid and may have been modified.')
vincenttran-msft marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -722,7 +725,7 @@ def _decrypt_message(
unpadder = PKCS7(128).unpadder()
decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
vincenttran-msft marked this conversation as resolved.
Show resolved Hide resolved
block_info = encryption_data.encrypted_region_info
if not block_info or not block_info.nonce_length:
raise ValueError("Missing required metadata for decryption.")
Expand Down Expand Up @@ -894,7 +897,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
raise ValueError('Specified encryption algorithm is not supported.')

version = encryption_data.encryption_agent.protocol
if version not in (_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2):
if version not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError('Specified encryption version is not supported.')

content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
Expand Down Expand Up @@ -945,7 +948,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements

return content[start_offset: len(content) - end_offset]

if version == _ENCRYPTION_PROTOCOL_V2:
if version in _ENCRYPTION_V2_PROTOCOLS:
# We assume the content contains only full encryption regions
total_size = len(content)
offset = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@

_ENCRYPTION_PROTOCOL_V1 = '1.0'
_ENCRYPTION_PROTOCOL_V2 = '2.0'
_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
_GCM_NONCE_LENGTH = 12
_GCM_TAG_LENGTH = 16

_ERROR_OBJECT_INVALID = \
'{0} does not define a complete interface. Value of {1} is either missing or invalid.'

_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION = (
'The require_encryption flag is set, but encryption is not supported'
' for this method.')


class KeyEncryptionKey(Protocol):

Expand Down Expand Up @@ -289,14 +296,14 @@ def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:

def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
"""
Determine whether the given encryption data signifies version 2.0.
Determine whether the given encryption data signifies version 2.0 or 2.1.

:param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
:return: True, if the encryption data indicates encryption V2, false otherwise.
:rtype: bool
"""
# If encryption_data is None, assume no encryption
return bool(encryption_data and (encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2))
return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))


def modify_user_agent_for_encryption(
Expand Down Expand Up @@ -357,7 +364,7 @@ def get_adjusted_upload_size(length: int, encryption_version: str) -> int:
def get_adjusted_download_range_and_offset(
start: int,
end: int,
length: int,
length: Optional[int],
encryption_data: Optional[_EncryptionData]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Gets the new download range and offsets into the decrypted data for
Expand All @@ -374,7 +381,7 @@ def get_adjusted_download_range_and_offset(

:param int start: The user-requested start index.
:param int end: The user-requested end index.
:param int length: The user-requested length. Only used for V1.
:param Optional[int] length: The user-requested length. Only used for V1.
:param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
:return: (new start, new end), (start offset, end offset)
:rtype: Tuple[Tuple[int, int], Tuple[int, int]]
Expand All @@ -401,7 +408,7 @@ def get_adjusted_download_range_and_offset(
end_offset = 15 - (end % 16)
end += end_offset

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
start_offset, end_offset = 0, end

if encryption_data.encrypted_region_info is None:
Expand Down Expand Up @@ -451,17 +458,20 @@ def parse_encryption_data(metadata: Dict[str, Any]) -> Optional[_EncryptionData]
return None


def adjust_blob_size_for_encryption(size: int, encryption_data: _EncryptionData) -> int:
def adjust_blob_size_for_encryption(size: int, encryption_data: Optional[_EncryptionData]) -> int:
"""
Adjusts the given blob size for encryption by subtracting the size of
the encryption data (nonce + tag). This only has an affect for encryption V2.

:param int size: The original blob size.
:param _EncryptionData encryption_data: The encryption data to determine version and sizes.
:param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
:return: The new blob size.
:rtype: int
"""
if is_encryption_v2(encryption_data) and encryption_data.encrypted_region_info is not None:
if (encryption_data is not None and
encryption_data.encrypted_region_info is not None and
is_encryption_v2(encryption_data)):

nonce_length = encryption_data.encrypted_region_info.nonce_length
data_length = encryption_data.encrypted_region_info.data_length
tag_length = encryption_data.encrypted_region_info.tag_length
Expand Down Expand Up @@ -543,7 +553,7 @@ def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _Encryptio
"""
try:
protocol = encryption_data_dict['EncryptionAgent']['Protocol']
if protocol not in [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2]:
if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError("Unsupported encryption version.")
except KeyError as exc:
raise ValueError("Unsupported encryption version.") from exc
Expand Down Expand Up @@ -629,7 +639,7 @@ def _validate_and_unwrap_cek(
# Validate we have the right info for the specified version
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
_validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
_validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
else:
raise ValueError('Specified encryption version is not supported.')
Expand All @@ -655,8 +665,8 @@ def _validate_and_unwrap_cek(

# For V2, the version is included with the cek. We need to validate it
# and remove it from the actual cek.
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
version_2_bytes = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0')
if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
if cek_version_bytes != version_2_bytes:
raise ValueError('The encryption metadata is not valid and may have been modified.')
Expand Down Expand Up @@ -715,7 +725,7 @@ def _decrypt_message(
unpadder = PKCS7(128).unpadder()
decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
block_info = encryption_data.encrypted_region_info
if not block_info or not block_info.nonce_length:
raise ValueError("Missing required metadata for decryption.")
Expand Down Expand Up @@ -887,7 +897,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
raise ValueError('Specified encryption algorithm is not supported.')

version = encryption_data.encryption_agent.protocol
if version not in (_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2):
if version not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError('Specified encryption version is not supported.')

content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
Expand Down Expand Up @@ -938,7 +948,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements

return content[start_offset: len(content) - end_offset]

if version == _ENCRYPTION_PROTOCOL_V2:
if version in _ENCRYPTION_V2_PROTOCOLS:
# We assume the content contains only full encryption regions
total_size = len(content)
offset = 0
Expand Down