Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

#include "parquet/encryption/file_system_key_material_store.h"
#include "parquet/encryption/key_material.h"
#include "parquet/exception.h"


namespace parquet::encryption {

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/encryption/file_system_key_material_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/filesystem/filesystem.h"

#include "parquet/encryption/file_key_material_store.h"
#include "parquet/exception.h"

namespace parquet::encryption {

Expand Down Expand Up @@ -59,6 +60,9 @@ class PARQUET_EXPORT FileSystemKeyMaterialStore : public FileKeyMaterialStore {
LoadKeyMaterialMap();
}
auto found = key_material_map_.find(key_id_in_file);
if (found == key_material_map_.end()) {
throw ParquetException("Invalid key id");
}
return found->second;
}

Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/_parquet_encryption.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ cdef class KmsConnectionConfig(_Weakrefable):
@staticmethod
cdef wrap(const CKmsConnectionConfig& config)

cdef class KeyMaterial(_Weakrefable):
cdef shared_ptr[CKeyMaterial] key_material

@staticmethod
cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material)

cdef class FileSystemKeyMaterialStore(_Weakrefable):
cdef shared_ptr[CFileSystemKeyMaterialStore] store

cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *
cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *
Expand Down
218 changes: 213 additions & 5 deletions python/pyarrow/_parquet_encryption.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ from cython.operator cimport dereference as deref

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport check_status
from pyarrow.lib cimport _Weakrefable
from pyarrow.lib import tobytes, frombytes

from pyarrow._fs cimport FileSystem
from pyarrow.fs import _resolve_filesystem_and_path

cdef ParquetCipher cipher_from_name(name):
name = name.upper()
Expand Down Expand Up @@ -364,6 +366,13 @@ cdef void _cb_create_kms_client(
out[0] = (<KmsClient> result).unwrap()


cdef inline shared_ptr[CFileSystem] _unwrap_fs(filesystem: FileSystem | None):
if isinstance(filesystem, FileSystem):
return filesystem.unwrap()
else:
return <shared_ptr[CFileSystem]>nullptr


cdef class CryptoFactory(_Weakrefable):
""" A factory that produces the low-level FileEncryptionProperties and
FileDecryptionProperties objects, from the high-level parameters."""
Expand Down Expand Up @@ -400,9 +409,12 @@ cdef class CryptoFactory(_Weakrefable):
static_pointer_cast[CKmsClientFactory, CPyKmsClientFactory](
kms_client_factory))


def file_encryption_properties(self,
KmsConnectionConfig kms_connection_config,
EncryptionConfiguration encryption_config):
EncryptionConfiguration encryption_config,
parquet_file_path="",
FileSystem filesystem=None):
"""Create file encryption properties.

Parameters
Expand All @@ -413,6 +425,17 @@ cdef class CryptoFactory(_Weakrefable):
encryption_config : EncryptionConfiguration
Configuration of the encryption, such as which columns to encrypt

parquet_file_path : str or pathlib.Path, default ""
Path to the parquet file to be encrypted. Only required when the
internal_key_material attribute of EncryptionConfiguration is set
to False. Used to derive the path for storing key material
specific to this parquet file.

filesystem : FileSystem, default None
Used only when internal_key_material is set to False on
EncryptionConfiguration. If None, the file system will be inferred
based on parquet_file_path.

Returns
-------
file_encryption_properties : FileEncryptionProperties
Expand All @@ -421,19 +444,33 @@ cdef class CryptoFactory(_Weakrefable):
cdef:
CResult[shared_ptr[CFileEncryptionProperties]] \
file_encryption_properties_result
c_string c_parquet_file_path
shared_ptr[CFileSystem] c_filesystem

if parquet_file_path != "":
filesystem, parquet_file_path = _resolve_filesystem_and_path(
parquet_file_path, filesystem)

c_parquet_file_path = tobytes(parquet_file_path)
c_filesystem = _unwrap_fs(filesystem)

with nogil:
file_encryption_properties_result = \
self.factory.get().SafeGetFileEncryptionProperties(
deref(kms_connection_config.unwrap().get()),
deref(encryption_config.unwrap().get()))
deref(encryption_config.unwrap().get()),
c_parquet_file_path, c_filesystem)
file_encryption_properties = GetResultValue(
file_encryption_properties_result)
return FileEncryptionProperties.wrap(file_encryption_properties)

def file_decryption_properties(
self,
KmsConnectionConfig kms_connection_config,
DecryptionConfiguration decryption_config=None):
DecryptionConfiguration decryption_config=None,
parquet_file_path="",
FileSystem filesystem=None):

"""Create file decryption properties.

Parameters
Expand All @@ -445,6 +482,15 @@ cdef class CryptoFactory(_Weakrefable):
Configuration of the decryption, such as cache timeout.
Can be None.

parquet_file_path : str or pathlib.Path, default ""
Path to the parquet file to be decrypted. Only required when
the parquet file uses external key material. Used to derive
the path to the external key material file.

filesystem : FileSystem, default None
Used only when the parquet file uses external key material. If
None, the file system will be inferred based on parquet_file_path.

Returns
-------
file_decryption_properties : FileDecryptionProperties
Expand All @@ -454,6 +500,16 @@ cdef class CryptoFactory(_Weakrefable):
CDecryptionConfiguration c_decryption_config
CResult[shared_ptr[CFileDecryptionProperties]] \
c_file_decryption_properties
c_string c_parquet_file_path
shared_ptr[CFileSystem] c_filesystem

if parquet_file_path != "":
filesystem, parquet_file_path = _resolve_filesystem_and_path(
parquet_file_path, filesystem)

c_parquet_file_path = tobytes(parquet_file_path)
c_filesystem = _unwrap_fs(filesystem)

if decryption_config is None:
c_decryption_config = CDecryptionConfiguration()
else:
Expand All @@ -462,7 +518,7 @@ cdef class CryptoFactory(_Weakrefable):
c_file_decryption_properties = \
self.factory.get().SafeGetFileDecryptionProperties(
deref(kms_connection_config.unwrap().get()),
c_decryption_config)
c_decryption_config, c_parquet_file_path, c_filesystem)
file_decryption_properties = GetResultValue(
c_file_decryption_properties)
return FileDecryptionProperties.wrap(file_decryption_properties)
Expand All @@ -473,9 +529,161 @@ cdef class CryptoFactory(_Weakrefable):
def remove_cache_entries_for_all_tokens(self):
self.factory.get().RemoveCacheEntriesForAllTokens()

def rotate_master_keys(
self,
KmsConnectionConfig kms_connection_config,
parquet_file_path,
FileSystem filesystem=None,
double_wrapping = True,
cache_lifetime_seconds = 600):
""" Rotates master encryption keys for a Parquet file that uses
external key material.

Parameters
----------
kms_connection_config : KmsConnectionConfig
Configuration of connection to KMS

parquet_file_path : str or pathlib.Path
Path to a parquet file using external key material.

filesystem : FileSystem, default None
Used only when the parquet file uses external key material. If
None, the file system will be inferred based on parquet_file_path.

double_wrapping : bool, default True
In the single wrapping mode, encrypts data encryption keys with
new master keys. In the double wrapping mode, generates new
KEKs (key encryption keys) and uses these to encrypt the data keys,
and encrypts the KEKs with the new master keys.

cache_lifetime_seconds : int or float, default 600
During key rotation, KMS Client and Key Encryption Keys will be
cached for this duration.
"""
cdef:
c_string c_parquet_file_path
shared_ptr[CFileSystem] c_filesystem

if parquet_file_path != "":
filesystem, parquet_file_path = _resolve_filesystem_and_path(
parquet_file_path, filesystem)

c_parquet_file_path = tobytes(parquet_file_path)
c_filesystem = _unwrap_fs(filesystem)

status = self.factory.get().SafeRotateMasterKeys(
deref(kms_connection_config.unwrap().get()),
c_parquet_file_path,
c_filesystem,
double_wrapping,
cache_lifetime_seconds);

check_status(status)

cdef inline shared_ptr[CPyCryptoFactory] unwrap(self):
return self.factory

cdef class KeyMaterial(_Weakrefable):

@property
def is_footer_key(self):
return self.key_material.get().is_footer_key()

@property
def is_double_wrapped(self):
return self.key_material.get().is_double_wrapped()

@property
def master_key_id(self):
return frombytes(self.key_material.get().master_key_id())

@property
def wrapped_dek(self):
return frombytes(self.key_material.get().wrapped_dek())

@property
def kek_id(self):
return frombytes(self.key_material.get().kek_id())

@property
def wrapped_kek(self):
return frombytes(self.key_material.get().wrapped_kek())

@property
def kms_instance_id(self):
return frombytes(self.key_material.get().kms_instance_id())

@property
def kms_instance_url(self):
return frombytes(self.key_material.get().kms_instance_url())

@staticmethod
cdef inline KeyMaterial wrap(shared_ptr[CKeyMaterial] key_material):
wrapper = KeyMaterial()
wrapper.key_material = key_material
return wrapper

@staticmethod
def parse(
const c_string key_material_string):
cdef:
shared_ptr[CKeyMaterial] c_key_material
c_key_material = make_shared[CKeyMaterial](move(
CKeyMaterial.Parse(key_material_string)
))
return KeyMaterial.wrap(c_key_material)

cdef class FileSystemKeyMaterialStore(_Weakrefable):

def get_key_material(self, key_id):
cdef:
c_string c_key_id = tobytes(key_id)
c_string c_key_material_string

c_key_material_string = self.store.get().GetKeyMaterial(c_key_id)
if c_key_material_string.empty():
raise KeyError("Invalid key id")
return KeyMaterial.parse(c_key_material_string)

def get_key_id_set(self):
return self.store.get().GetKeyIDSet()

@classmethod
def for_file(cls, parquet_file_path,
FileSystem filesystem = None):
"""Creates a FileSystemKeyMaterialStore for a parquet file that
was created with external key material.

Parameters
----------
parquet_file_path : str or pathlib.Path
Path to a parquet file using external key material.

filesystem : FileSystem, default None
FileSystem where the parquet file is located. If None,
will be inferred based on parquet_file_path.

Returns
-------
FileSystemKeyMaterialStore
A FileSystemKeyMaterialStore wrapping the external key material.
"""
cdef:
c_string c_parquet_file_path
shared_ptr[CFileSystem] c_filesystem
shared_ptr[CFileSystemKeyMaterialStore] c_store
FileSystemKeyMaterialStore store = cls()

filesystem, parquet_file_path = _resolve_filesystem_and_path(
parquet_file_path, filesystem)
c_parquet_file_path = tobytes(parquet_file_path)
c_filesystem = _unwrap_fs(filesystem)

c_store = CFileSystemKeyMaterialStore.Make(
c_parquet_file_path, c_filesystem, False)
store.store = c_store
return store

cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *:
if isinstance(crypto_factory, CryptoFactory):
Expand Down
Loading
Loading