diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.cc b/cpp/src/parquet/encryption/file_system_key_material_store.cc index cbecf2645d3..7a7db3fa625 100644 --- a/cpp/src/parquet/encryption/file_system_key_material_store.cc +++ b/cpp/src/parquet/encryption/file_system_key_material_store.cc @@ -25,7 +25,6 @@ #include "parquet/encryption/file_system_key_material_store.h" #include "parquet/encryption/key_material.h" -#include "parquet/exception.h" namespace parquet::encryption { diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.h b/cpp/src/parquet/encryption/file_system_key_material_store.h index 3babfdbf82d..ecbadb90a94 100644 --- a/cpp/src/parquet/encryption/file_system_key_material_store.h +++ b/cpp/src/parquet/encryption/file_system_key_material_store.h @@ -24,6 +24,7 @@ #include "arrow/filesystem/filesystem.h" #include "parquet/encryption/file_key_material_store.h" +#include "parquet/exception.h" namespace parquet::encryption { @@ -59,6 +60,9 @@ class PARQUET_EXPORT FileSystemKeyMaterialStore : public FileKeyMaterialStore { LoadKeyMaterialMap(); } auto found = key_material_map_.find(key_id_in_file); + if (found == key_material_map_.end()) { + throw ParquetException("Invalid key id"); + } return found->second; } diff --git a/python/pyarrow/_parquet_encryption.pxd b/python/pyarrow/_parquet_encryption.pxd index d52669501a4..48939fe277f 100644 --- a/python/pyarrow/_parquet_encryption.pxd +++ b/python/pyarrow/_parquet_encryption.pxd @@ -49,6 +49,14 @@ cdef class KmsConnectionConfig(_Weakrefable): @staticmethod cdef wrap(const CKmsConnectionConfig& config) +cdef class KeyMaterial(_Weakrefable): + cdef shared_ptr[CKeyMaterial] key_material + + @staticmethod + cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material) + +cdef class FileSystemKeyMaterialStore(_Weakrefable): + cdef shared_ptr[CFileSystemKeyMaterialStore] store cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except * cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except * diff --git a/python/pyarrow/_parquet_encryption.pyx b/python/pyarrow/_parquet_encryption.pyx index f95464e3031..6185d5f2392 100644 --- a/python/pyarrow/_parquet_encryption.pyx +++ b/python/pyarrow/_parquet_encryption.pyx @@ -25,9 +25,11 @@ from cython.operator cimport dereference as deref from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * +from pyarrow.lib cimport check_status from pyarrow.lib cimport _Weakrefable from pyarrow.lib import tobytes, frombytes - +from pyarrow._fs cimport FileSystem +from pyarrow.fs import _resolve_filesystem_and_path cdef ParquetCipher cipher_from_name(name): name = name.upper() @@ -364,6 +366,13 @@ cdef void _cb_create_kms_client( out[0] = ( result).unwrap() +cdef inline shared_ptr[CFileSystem] _unwrap_fs(filesystem: FileSystem | None): + if isinstance(filesystem, FileSystem): + return filesystem.unwrap() + else: + return nullptr + + cdef class CryptoFactory(_Weakrefable): """ A factory that produces the low-level FileEncryptionProperties and FileDecryptionProperties objects, from the high-level parameters.""" @@ -402,7 +411,9 @@ cdef class CryptoFactory(_Weakrefable): def file_encryption_properties(self, KmsConnectionConfig kms_connection_config, - EncryptionConfiguration encryption_config): + EncryptionConfiguration encryption_config, + parquet_file_path=None, + FileSystem filesystem=None): """Create file encryption properties. Parameters @@ -413,6 +424,17 @@ cdef class CryptoFactory(_Weakrefable): encryption_config : EncryptionConfiguration Configuration of the encryption, such as which columns to encrypt + parquet_file_path : str, pathlib.Path, or None, default None + Path to the parquet file to be encrypted. Only required when the + internal_key_material attribute of EncryptionConfiguration is set + to False. Used to derive the path for storing key material + specific to this parquet file. + + filesystem : FileSystem or None, default None + Used only when internal_key_material is set to False on + EncryptionConfiguration. If None, the file system will be inferred + based on parquet_file_path. + Returns ------- file_encryption_properties : FileEncryptionProperties @@ -421,11 +443,23 @@ cdef class CryptoFactory(_Weakrefable): cdef: CResult[shared_ptr[CFileEncryptionProperties]] \ file_encryption_properties_result + c_string c_parquet_file_path + shared_ptr[CFileSystem] c_filesystem + + filesystem, parquet_file_path = _resolve_filesystem_and_path( + parquet_file_path, filesystem) + if parquet_file_path is not None: + c_parquet_file_path = tobytes(parquet_file_path) + else: + c_parquet_file_path = tobytes("") + c_filesystem = _unwrap_fs(filesystem) + with nogil: file_encryption_properties_result = \ self.factory.get().SafeGetFileEncryptionProperties( deref(kms_connection_config.unwrap().get()), - deref(encryption_config.unwrap().get())) + deref(encryption_config.unwrap().get()), + c_parquet_file_path, c_filesystem) file_encryption_properties = GetResultValue( file_encryption_properties_result) return FileEncryptionProperties.wrap(file_encryption_properties) @@ -433,7 +467,9 @@ cdef class CryptoFactory(_Weakrefable): def file_decryption_properties( self, KmsConnectionConfig kms_connection_config, - DecryptionConfiguration decryption_config=None): + DecryptionConfiguration decryption_config=None, + parquet_file_path=None, + FileSystem filesystem=None): """Create file decryption properties. Parameters @@ -445,6 +481,15 @@ cdef class CryptoFactory(_Weakrefable): Configuration of the decryption, such as cache timeout. Can be None. + parquet_file_path : str, pathlib.Path, or None, default None + Path to the parquet file to be decrypted. Only required when + the parquet file uses external key material. Used to derive + the path to the external key material file. + + filesystem : FileSystem or None, default None + Used only when the parquet file uses external key material. If + None, the file system will be inferred based on parquet_file_path. + Returns ------- file_decryption_properties : FileDecryptionProperties @@ -454,6 +499,17 @@ cdef class CryptoFactory(_Weakrefable): CDecryptionConfiguration c_decryption_config CResult[shared_ptr[CFileDecryptionProperties]] \ c_file_decryption_properties + c_string c_parquet_file_path + shared_ptr[CFileSystem] c_filesystem + + filesystem, parquet_file_path = _resolve_filesystem_and_path( + parquet_file_path, filesystem) + if parquet_file_path is not None: + c_parquet_file_path = tobytes(parquet_file_path) + else: + c_parquet_file_path = tobytes("") + c_filesystem = _unwrap_fs(filesystem) + if decryption_config is None: c_decryption_config = CDecryptionConfiguration() else: @@ -462,7 +518,7 @@ cdef class CryptoFactory(_Weakrefable): c_file_decryption_properties = \ self.factory.get().SafeGetFileDecryptionProperties( deref(kms_connection_config.unwrap().get()), - c_decryption_config) + c_decryption_config, c_parquet_file_path, c_filesystem) file_decryption_properties = GetResultValue( c_file_decryption_properties) return FileDecryptionProperties.wrap(file_decryption_properties) @@ -473,9 +529,161 @@ cdef class CryptoFactory(_Weakrefable): def remove_cache_entries_for_all_tokens(self): self.factory.get().RemoveCacheEntriesForAllTokens() + def rotate_master_keys( + self, + KmsConnectionConfig kms_connection_config, + parquet_file_path, + FileSystem filesystem=None, + double_wrapping=True, + cache_lifetime_seconds=600): + """ Rotates master encryption keys for a Parquet file that uses + external key material. + + Parameters + ---------- + kms_connection_config : KmsConnectionConfig + Configuration of connection to KMS + + parquet_file_path : str or pathlib.Path + Path to a parquet file using external key material. + + filesystem : FileSystem or None, default None + Used only when the parquet file uses external key material. If + None, the file system will be inferred based on parquet_file_path. + + double_wrapping : bool, default True + In the single wrapping mode, encrypts data encryption keys with + new master keys. In the double wrapping mode, generates new + KEKs (key encryption keys) and uses these to encrypt the data keys, + and encrypts the KEKs with the new master keys. + + cache_lifetime_seconds : int or float, default 600 + During key rotation, KMS Client and Key Encryption Keys will be + cached for this duration. + """ + cdef: + c_string c_parquet_file_path + shared_ptr[CFileSystem] c_filesystem + + if parquet_file_path != "": + filesystem, parquet_file_path = _resolve_filesystem_and_path( + parquet_file_path, filesystem) + + c_parquet_file_path = tobytes(parquet_file_path) + c_filesystem = _unwrap_fs(filesystem) + + status = self.factory.get().SafeRotateMasterKeys( + deref(kms_connection_config.unwrap().get()), + c_parquet_file_path, + c_filesystem, + double_wrapping, + cache_lifetime_seconds) + + check_status(status) + cdef inline shared_ptr[CPyCryptoFactory] unwrap(self): return self.factory +cdef class KeyMaterial(_Weakrefable): + + @property + def is_footer_key(self): + return self.key_material.get().is_footer_key() + + @property + def is_double_wrapped(self): + return self.key_material.get().is_double_wrapped() + + @property + def master_key_id(self): + return frombytes(self.key_material.get().master_key_id()) + + @property + def wrapped_dek(self): + return frombytes(self.key_material.get().wrapped_dek()) + + @property + def kek_id(self): + return frombytes(self.key_material.get().kek_id()) + + @property + def wrapped_kek(self): + return frombytes(self.key_material.get().wrapped_kek()) + + @property + def kms_instance_id(self): + return frombytes(self.key_material.get().kms_instance_id()) + + @property + def kms_instance_url(self): + return frombytes(self.key_material.get().kms_instance_url()) + + @staticmethod + cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material): + wrapper = KeyMaterial() + wrapper.key_material = key_material + return wrapper + + @staticmethod + def parse( + const c_string key_material_string): + cdef: + shared_ptr[CKeyMaterial] c_key_material + c_key_material = make_shared[CKeyMaterial](move( + CKeyMaterial.Parse(key_material_string) + )) + return KeyMaterial.wrap(c_key_material) + +cdef class FileSystemKeyMaterialStore(_Weakrefable): + + def get_key_material(self, key_id): + cdef: + c_string c_key_id = tobytes(key_id) + c_string c_key_material_string + + c_key_material_string = self.store.get().GetKeyMaterial(c_key_id) + if c_key_material_string.empty(): + raise KeyError("Invalid key id") + return KeyMaterial.parse(c_key_material_string) + + def get_key_id_set(self): + return self.store.get().GetKeyIDSet() + + @classmethod + def for_file(cls, parquet_file_path, + FileSystem filesystem=None): + """Creates a FileSystemKeyMaterialStore for a parquet file that + was created with external key material. + + Parameters + ---------- + parquet_file_path : str or pathlib.Path + Path to a parquet file using external key material. + + filesystem : FileSystem, default None + FileSystem where the parquet file is located. If None, + will be inferred based on parquet_file_path. + + Returns + ------- + FileSystemKeyMaterialStore + A FileSystemKeyMaterialStore wrapping the external key material. + """ + cdef: + c_string c_parquet_file_path + shared_ptr[CFileSystem] c_filesystem + shared_ptr[CFileSystemKeyMaterialStore] c_store + FileSystemKeyMaterialStore store = cls() + + filesystem, parquet_file_path = _resolve_filesystem_and_path( + parquet_file_path, filesystem) + c_parquet_file_path = tobytes(parquet_file_path) + c_filesystem = _unwrap_fs(filesystem) + + c_store = CFileSystemKeyMaterialStore.Make( + c_parquet_file_path, c_filesystem, False) + store.store = c_store + return store cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *: if isinstance(crypto_factory, CryptoFactory): diff --git a/python/pyarrow/includes/libparquet_encryption.pxd b/python/pyarrow/includes/libparquet_encryption.pxd index 7e031925af6..7024f14ac27 100644 --- a/python/pyarrow/includes/libparquet_encryption.pxd +++ b/python/pyarrow/includes/libparquet_encryption.pxd @@ -19,6 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CSecureString +from pyarrow.includes.libarrow_fs cimport CFileSystem from pyarrow._parquet cimport (ParquetCipher, CFileEncryptionProperties, CFileDecryptionProperties, @@ -91,12 +92,57 @@ cdef extern from "parquet/encryption/crypto_factory.h" \ shared_ptr[CKmsClientFactory] kms_client_factory) except + shared_ptr[CFileEncryptionProperties] GetFileEncryptionProperties( const CKmsConnectionConfig& kms_connection_config, - const CEncryptionConfiguration& encryption_config) except +* + const CEncryptionConfiguration& encryption_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] file_system) except +* shared_ptr[CFileDecryptionProperties] GetFileDecryptionProperties( const CKmsConnectionConfig& kms_connection_config, - const CDecryptionConfiguration& decryption_config) except +* + const CDecryptionConfiguration& decryption_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] file_system) except +* void RemoveCacheEntriesForToken(const c_string& access_token) except + void RemoveCacheEntriesForAllTokens() except + + void RotateMasterKeys(const CKmsConnectionConfig& kms_connection_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] file_system, + c_bool double_wrapping, + double cache_lifetime_seconds) + +cdef extern from "parquet/encryption/file_key_material_store.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CFileKeyMaterialStore\ + "parquet::encryption::FileKeyMaterialStore": + @staticmethod + c_string GetKeyMaterial(c_string key_id_in_file) except + + vector[c_string] GetKeyIDSet() except + + +cdef extern from "parquet/encryption/file_system_key_material_store.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CFileSystemKeyMaterialStore\ + "parquet::encryption::FileSystemKeyMaterialStore": + + @staticmethod + shared_ptr[CFileSystemKeyMaterialStore] Make(c_string parquet_file_path, + shared_ptr[CFileSystem] file_system, + c_bool use_tmp_prefix) except + + + c_string GetKeyMaterial(c_string key_id_in_file) except + + + vector[c_string] GetKeyIDSet() except + + +cdef extern from "parquet/encryption/key_material.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CKeyMaterial "parquet::encryption::KeyMaterial": + @staticmethod + CKeyMaterial Parse(const c_string& key_material_string) + c_bool is_footer_key() + c_bool is_double_wrapped() + const c_string& master_key_id() + const c_string& wrapped_dek() + const c_string& kek_id() + const c_string& wrapped_kek() + const c_string& kms_instance_id() + const c_string& kms_instance_url() cdef extern from "arrow/python/parquet_encryption.h" \ namespace "arrow::py::parquet::encryption" nogil: @@ -125,8 +171,17 @@ cdef extern from "arrow/python/parquet_encryption.h" \ CResult[shared_ptr[CFileEncryptionProperties]] \ SafeGetFileEncryptionProperties( const CKmsConnectionConfig& kms_connection_config, - const CEncryptionConfiguration& encryption_config) + const CEncryptionConfiguration& encryption_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] filesystem) CResult[shared_ptr[CFileDecryptionProperties]] \ SafeGetFileDecryptionProperties( const CKmsConnectionConfig& kms_connection_config, - const CDecryptionConfiguration& decryption_config) + const CDecryptionConfiguration& decryption_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] filesystem) + CStatus SafeRotateMasterKeys(const CKmsConnectionConfig& kms_connection_config, + const c_string parquet_file_path, + const shared_ptr[CFileSystem] filesystem, + c_bool double_wrapping, + double cache_lifetime_seconds) diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.cc b/python/pyarrow/src/arrow/python/parquet_encryption.cc index 1016cdd3a37..4fcce64cdbe 100644 --- a/python/pyarrow/src/arrow/python/parquet_encryption.cc +++ b/python/pyarrow/src/arrow/python/parquet_encryption.cc @@ -79,17 +79,32 @@ std::shared_ptr<::parquet::encryption::KmsClient> PyKmsClientFactory::CreateKmsC arrow::Result> PyCryptoFactory::SafeGetFileEncryptionProperties( const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, - const ::parquet::encryption::EncryptionConfiguration& encryption_config) { - PARQUET_CATCH_AND_RETURN( - this->GetFileEncryptionProperties(kms_connection_config, encryption_config)); + const ::parquet::encryption::EncryptionConfiguration& encryption_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) { + PARQUET_CATCH_AND_RETURN(this->GetFileEncryptionProperties( + kms_connection_config, encryption_config, parquet_file_path, filesystem)); } arrow::Result> PyCryptoFactory::SafeGetFileDecryptionProperties( const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, - const ::parquet::encryption::DecryptionConfiguration& decryption_config) { - PARQUET_CATCH_AND_RETURN( - this->GetFileDecryptionProperties(kms_connection_config, decryption_config)); + const ::parquet::encryption::DecryptionConfiguration& decryption_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem) { + PARQUET_CATCH_AND_RETURN(this->GetFileDecryptionProperties( + kms_connection_config, decryption_config, parquet_file_path, filesystem)); +} + +arrow::Status PyCryptoFactory::SafeRotateMasterKeys( + const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool double_wrapping, + double cache_lifetime_seconds) { + PARQUET_CATCH_NOT_OK(this->RotateMasterKeys(kms_connection_config, parquet_file_path, + filesystem, double_wrapping, + cache_lifetime_seconds)); + return arrow::Status::OK(); } } // namespace encryption diff --git a/python/pyarrow/src/arrow/python/parquet_encryption.h b/python/pyarrow/src/arrow/python/parquet_encryption.h index 3e57a761945..b485b8b1153 100644 --- a/python/pyarrow/src/arrow/python/parquet_encryption.h +++ b/python/pyarrow/src/arrow/python/parquet_encryption.h @@ -18,12 +18,14 @@ #pragma once #include - #include "arrow/python/common.h" #include "arrow/python/visibility.h" +#include "arrow/result.h" #include "arrow/util/macros.h" #include "arrow/util/secure_string.h" #include "parquet/encryption/crypto_factory.h" +#include "parquet/encryption/file_system_key_material_store.h" +#include "parquet/encryption/key_material.h" #include "parquet/encryption/kms_client.h" #include "parquet/encryption/kms_client_factory.h" @@ -116,7 +118,9 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT PyCryptoFactory arrow::Result> SafeGetFileEncryptionProperties( const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, - const ::parquet::encryption::EncryptionConfiguration& encryption_config); + const ::parquet::encryption::EncryptionConfiguration& encryption_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem); /// The returned FileDecryptionProperties object will use the cache inside this /// CryptoFactory object, so please keep this @@ -125,7 +129,15 @@ class ARROW_PYTHON_PARQUET_ENCRYPTION_EXPORT PyCryptoFactory arrow::Result> SafeGetFileDecryptionProperties( const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, - const ::parquet::encryption::DecryptionConfiguration& decryption_config); + const ::parquet::encryption::DecryptionConfiguration& decryption_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem); + + arrow::Status SafeRotateMasterKeys( + const ::parquet::encryption::KmsConnectionConfig& kms_connection_config, + const std::string& parquet_file_path, + const std::shared_ptr<::arrow::fs::FileSystem>& filesystem, bool double_wrapping, + double cache_lifetime_seconds); }; } // namespace encryption diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index b5d2216d70e..d9685d6b8bb 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -103,3 +103,8 @@ def s3_example_fs(s3_server): fs.create_dir("mybucket") yield fs, uri, path + + +@pytest.fixture(scope="class") +def reusable_tempdir(tmp_path_factory): + return tmp_path_factory.mktemp('pyarrow-parquet') diff --git a/python/pyarrow/tests/parquet/encryption.py b/python/pyarrow/tests/parquet/encryption.py index d07f8ae2735..a103404995d 100644 --- a/python/pyarrow/tests/parquet/encryption.py +++ b/python/pyarrow/tests/parquet/encryption.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. import base64 - import pyarrow.parquet.encryption as pe +from pyarrow._parquet_encryption import FileSystemKeyMaterialStore +import re class InMemoryKmsClient(pe.KmsClient): @@ -51,6 +52,58 @@ def unwrap_key(self, wrapped_key, master_key_identifier): master_key_bytes, decrypted_key) +def parse_wrapped_key(wrapped_key: str) -> tuple[str, int, bytes]: + """Parses a wrapped key string into a tuple: (key id, version, key) given + input in the form: :v:""" + ptn = re.compile("(.+?):v([0-9]+?):(.+)") + if m := ptn.fullmatch(wrapped_key): + id, version, b64key = m.groups() + version = int(version) + key = base64.b64decode(b64key) + return (id, version, key) + else: + raise ValueError("Cannot parse wrapped key", wrapped_key) + + +MASTER_KEY_VERSION = "master_key_version" + + +class MockVersioningKmsClient(pe.KmsClient): + """This is a mock class implementation of KmsClient, built for testing + only. + + During tests that involve CryptoFactory.rotate_master_keys, separate + instances of this client will be created when writing, rotating keys, and + reading back parquet data. To help unit tests verify that external key + material was stored correctly at each step, this client wraps keys with a + master_key_identifier and a version number. To ensure each client wraps + with the correct version, the current version is persisted in the + key_access_token attribute of the KmsConnectionConfig shared by all clients + """ + + def __init__(self, connection_config) -> None: + pe.KmsClient.__init__(self) + self.connection_config = connection_config + + @property + def master_key_version(self) -> int: + return int(self.connection_config.key_access_token) + + def wrap_key(self, key_bytes: bytes, master_key_identifier: str) -> str: + b64key = base64.b64encode(key_bytes).decode('utf-8') + return f"{master_key_identifier}:v{self.master_key_version}:{b64key}" + + def unwrap_key( + self, + wrapped_key: str, + master_key_identifier: str) -> bytes: + key_id, _, key = parse_wrapped_key(wrapped_key) + if key_id != master_key_identifier: + raise ValueError("Mismatched master key identifiers:", + key_id, master_key_identifier) + return key + + def verify_file_encrypted(path): """Verify that the file is encrypted by looking at its first 4 bytes. If it's the magic string PARE @@ -59,3 +112,14 @@ def verify_file_encrypted(path): magic_str = file.read(4) # Verify magic string for parquet with encrypted footer is PARE assert magic_str == b'PARE' + + +def read_external_keys_to_dict(path): + """Reads an external key material store given a parquet file path and + returns a dict mapping master_key_id to KeyMaterial objects""" + store = FileSystemKeyMaterialStore.for_file(path) + keys = dict() + for id in store.get_key_id_set(): + key_material = store.get_key_material(id) + keys[key_material.master_key_id] = key_material + return keys diff --git a/python/pyarrow/tests/parquet/test_encryption.py b/python/pyarrow/tests/parquet/test_encryption.py index a11a4935a1c..4e2fb069bd0 100644 --- a/python/pyarrow/tests/parquet/test_encryption.py +++ b/python/pyarrow/tests/parquet/test_encryption.py @@ -16,7 +16,6 @@ # under the License. import pytest from datetime import timedelta - import pyarrow as pa try: import pyarrow.parquet as pq @@ -25,8 +24,11 @@ pq = None pe = None else: - from pyarrow.tests.parquet.encryption import ( - InMemoryKmsClient, verify_file_encrypted) + from pyarrow.tests.parquet.encryption import (InMemoryKmsClient, + MockVersioningKmsClient, + verify_file_encrypted, + read_external_keys_to_dict, + parse_wrapped_key) PARQUET_NAME = 'encrypted_table.in_mem.parquet' @@ -65,6 +67,17 @@ def basic_encryption_config(): return basic_encryption_config +@pytest.fixture(scope='module') +def external_encryption_config(): + external_encryption_config = pe.EncryptionConfiguration( + footer_key=FOOTER_KEY_NAME, + column_keys={ + COL_KEY_NAME: ["a", "b"], + }, + internal_key_material=False) + return external_encryption_config + + def setup_encryption_environment(custom_kms_conf): """ Sets up and returns the KMS connection configuration and crypto factory @@ -163,8 +176,12 @@ def test_uniform_encrypted_parquet_write_read(tempdir, data_table): def write_encrypted_parquet(path, table, encryption_config, kms_connection_config, crypto_factory): - file_encryption_properties = crypto_factory.file_encryption_properties( - kms_connection_config, encryption_config) + if encryption_config.internal_key_material: + file_encryption_properties = crypto_factory.file_encryption_properties( + kms_connection_config, encryption_config) + else: + file_encryption_properties = crypto_factory.file_encryption_properties( + kms_connection_config, encryption_config, path) assert file_encryption_properties is not None with pq.ParquetWriter( path, table.schema, @@ -173,9 +190,15 @@ def write_encrypted_parquet(path, table, encryption_config, def read_encrypted_parquet(path, decryption_config, - kms_connection_config, crypto_factory): - file_decryption_properties = crypto_factory.file_decryption_properties( - kms_connection_config, decryption_config) + kms_connection_config, crypto_factory, + internal_key_material=True): + if internal_key_material: + file_decryption_properties = crypto_factory.file_decryption_properties( + kms_connection_config, decryption_config) + else: + file_decryption_properties = crypto_factory.file_decryption_properties( + kms_connection_config, decryption_config, path) + assert file_decryption_properties is not None meta = pq.read_metadata( path, decryption_properties=file_decryption_properties) @@ -514,31 +537,112 @@ def kms_factory(kms_connection_configuration): # assert table.num_rows == result_table.num_rows -@pytest.mark.xfail(reason="External key material not supported yet") -def test_encrypted_parquet_write_external(tempdir, data_table): - """Write an encrypted parquet, with external key - material. - Currently it's not implemented, so should throw - an exception""" +def test_encrypted_parquet_write_read_external(tempdir, data_table, + external_encryption_config): + """Write an encrypted parquet file with external key material, verify + it's encrypted, then read both the table and external store. + """ path = tempdir / PARQUET_NAME - # Encrypt the file with the footer key + kms_connection_config, crypto_factory = write_encrypted_file( + path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY, + external_encryption_config) + + verify_file_encrypted(path) + + decryption_config = pe.DecryptionConfiguration() + result_table = read_encrypted_parquet( + path, decryption_config, kms_connection_config, crypto_factory, + internal_key_material=False) + store = pa._parquet_encryption.FileSystemKeyMaterialStore.for_file(path) + + assert len(key_ids := store.get_key_id_set()) == ( + len(external_encryption_config.column_keys[COL_KEY_NAME]) + 1) + assert all([store.get_key_material(k) is not None for k in key_ids]) + assert data_table.equals(result_table) + + +@pytest.mark.parametrize( + ("double_wrap_initial", "double_wrap_rotated"), [ + pytest.param(True, True, id="double wrapping"), + pytest.param(False, True, id="single to double wrapped"), + pytest.param(True, False, id="double to singe wrapped"), + pytest.param(False, False, id="single wrapping")]) +def test_external_key_material_rotation( + reusable_tempdir, + data_table, + double_wrap_initial, + double_wrap_rotated): + """Tests CryptoFactory.rotate_master_keys + + Note: The CryptoFactory.rotate_master_keys() double_wrapping keword arg + may be either True (the default) or False regardless of whether + EncryptionConfig.double_wrapping was set to true (also the default) when + the external key material store was written. This means double wrapping may + be set one way initially and then applied or removed during rotation. + """ + path = reusable_tempdir / PARQUET_NAME encryption_config = pe.EncryptionConfiguration( footer_key=FOOTER_KEY_NAME, - column_keys={}, - internal_key_material=False) + column_keys={COL_KEY_NAME: ["a", "b"]}, + internal_key_material=False, + double_wrapping=double_wrap_initial) - kms_connection_config = pe.KmsConnectionConfig( - custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")} - ) + # initial master key version - see MockVersioningKmsClient docstring + kms_connection_config = pe.KmsConnectionConfig(key_access_token="1") def kms_factory(kms_connection_configuration): - return InMemoryKmsClient(kms_connection_configuration) - + return MockVersioningKmsClient(kms_connection_configuration) crypto_factory = pe.CryptoFactory(kms_factory) - # Write with encryption properties - write_encrypted_parquet(path, data_table, encryption_config, - kms_connection_config, crypto_factory) + write_encrypted_parquet( + path, + data_table, + encryption_config, + kms_connection_config, + crypto_factory) + before_keys = read_external_keys_to_dict(path) + + # "rotate" kms master key + kms_connection_config.refresh_key_access_token("2") + + crypto_factory.rotate_master_keys( + kms_connection_config, + path, + double_wrapping=double_wrap_rotated) + + after_keys = read_external_keys_to_dict(path) + verify_file_encrypted(path) + table_read_after_rotation = read_encrypted_parquet( + path, + pe.DecryptionConfiguration(), + kms_connection_config, + crypto_factory, + internal_key_material=False) + assert FOOTER_KEY_NAME in before_keys + assert COL_KEY_NAME in before_keys + assert FOOTER_KEY_NAME in after_keys + assert COL_KEY_NAME in after_keys + + def check_rotated_external_keys(master_key_id: str) -> None: + before_key_mat = before_keys[master_key_id] + if double_wrap_initial: + before_key_wrapped = before_key_mat.wrapped_kek + else: + before_key_wrapped = before_key_mat.wrapped_dek + _, before_ver, _ = parse_wrapped_key(before_key_wrapped) + + after_key_mat = after_keys[master_key_id] + if double_wrap_rotated: + after_key_wrapped = after_key_mat.wrapped_kek + else: + after_key_wrapped = after_key_mat.wrapped_dek + _, after_ver, _ = parse_wrapped_key(after_key_wrapped) + + # CryptoFactory rewrapped keys if after version is later than before + assert before_ver < after_ver + check_rotated_external_keys(FOOTER_KEY_NAME) + check_rotated_external_keys(COL_KEY_NAME) + assert data_table.equals(table_read_after_rotation) def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config):