From b5d7464b61be594307fe7ae0bb788ef189dea564 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 24 May 2024 23:39:12 +0200 Subject: [PATCH] first commit --- cpp/src/parquet/arrow/writer.h | 6 ++ cpp/src/parquet/file_writer.cc | 11 +++ cpp/src/parquet/file_writer.h | 5 ++ python/pyarrow/_parquet.pxd | 5 ++ python/pyarrow/_parquet.pyx | 15 +++- python/pyarrow/parquet/core.py | 7 +- .../pyarrow/tests/test_dataset_encryption.py | 87 +++++++++++++++++++ 7 files changed, 130 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 1decafedc97fd..53538dfdf4760 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -157,6 +157,12 @@ PARQUET_EXPORT ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink); +/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream +PARQUET_EXPORT +::arrow::Status WriteEncryptedMetadataFile( + const FileMetaData& file_metadata, ArrowOutputStream* sink, + FileEncryptionProperties* file_encryption_properties); + /// \brief Write a Table to Parquet. /// /// This writes one table in a single shot. To write a Parquet file with diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index baa9e00da2351..939e8c88a1c9d 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -567,6 +567,17 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, } } +void WriteEncryptedMetadataFile( + const FileMetaData& metadata, ArrowOutputStream* sink, + FileEncryptionProperties* file_encryption_properties, + const std::shared_ptr writer_properties) { + auto encryptor = std::make_shared( + file_encryption_properties, ::arrow::default_memory_pool()); + + return WriteEncryptedFileMetadata(metadata, sink, encryptor->GetFooterEncryptor(), + file_encryption_properties->encrypted_footer()); +} + void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, ArrowOutputStream* sink) { crypto_metadata.WriteTo(sink); diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 31706af86dbde..860e3ac6564e3 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -117,6 +117,11 @@ PARQUET_EXPORT void WriteMetaDataFile(const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink); +PARQUET_EXPORT +void WriteEncryptedMetadataFile(const FileMetaData& file_metadata, + ArrowOutputStream* sink, + FileEncryptionProperties* file_encryption_properties); + PARQUET_EXPORT void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, ArrowOutputStream* sink, diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index ae4094d8b4b5f..aca9c24630726 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -561,6 +561,11 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: const CFileMetaData& file_metadata, const COutputStream* sink) + CStatus WriteEncryptedMetadataFile( + const CFileMetaData& file_metadata, + COutputStream* sink, + CFileEncryptionProperties* encryption_properties) + cdef class FileEncryptionProperties: """File-level encryption properties for the low-level API""" cdef: diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index f7724b9b1fdc7..6f6a97d3f3624 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1045,7 +1045,7 @@ cdef class FileMetaData(_Weakrefable): c_metadata = other.sp_metadata self._metadata.AppendRowGroups(deref(c_metadata)) - def write_metadata_file(self, where): + def write_metadata_file(self, where, encryption_properties=None): """ Write the metadata to a metadata-only Parquet file. @@ -1054,10 +1054,13 @@ cdef class FileMetaData(_Weakrefable): where : path or file-like object Where to write the metadata. Should be a writable path on the local filesystem, or a writable file-like object. + encryption_properties : EncryptionProperties + Optional encryption properties to use when encrypting metadata. """ cdef: shared_ptr[COutputStream] sink c_string c_where + shared_ptr[CFileEncryptionProperties] properties try: where = _stringify_path(where) @@ -1068,9 +1071,15 @@ cdef class FileMetaData(_Weakrefable): with nogil: sink = GetResultValue(FileOutputStream.Open(c_where)) + if encryption_properties is not None: + properties = ( encryption_properties).unwrap() + with nogil: - check_status( - WriteMetaDataFile(deref(self._metadata), sink.get())) + if encryption_properties is not None: + check_status( + WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), properties.get())) + else: + check_status(WriteMetaDataFile(deref(self._metadata), sink.get())) cdef class ParquetSchema(_Weakrefable): diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 81798b1544474..658472c8e312b 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -2147,7 +2147,7 @@ def file_visitor(written_file): def write_metadata(schema, where, metadata_collector=None, filesystem=None, - **kwargs): + encryption_properties=None, **kwargs): """ Write metadata-only Parquet file from schema. This can be used with `write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar @@ -2162,6 +2162,7 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None, filesystem : FileSystem, default None If nothing passed, will be inferred from `where` if path-like, else `where` is already a file-like object so no filesystem is needed. + encryption_properties : FileEncryptionProperties, default None **kwargs : dict, Additional kwargs for ParquetWriter class. See docstring for `ParquetWriter` for more information. @@ -2213,9 +2214,9 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None, metadata.append_row_groups(m) if filesystem is not None: with filesystem.open_output_stream(where) as f: - metadata.write_metadata_file(f) + metadata.write_metadata_file(f, encryption_properties) else: - metadata.write_metadata_file(where) + metadata.write_metadata_file(where, encryption_properties) def read_metadata(where, memory_map=False, decryption_properties=None, diff --git a/python/pyarrow/tests/test_dataset_encryption.py b/python/pyarrow/tests/test_dataset_encryption.py index 0d8b4a152ab9f..5659736b7eaef 100644 --- a/python/pyarrow/tests/test_dataset_encryption.py +++ b/python/pyarrow/tests/test_dataset_encryption.py @@ -227,3 +227,90 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes: dataset = ds.dataset(path, format=file_format, filesystem=mockfs) new_table = dataset.to_table() assert table == new_table + + +def test_dataset_metadata_encryption_decryption(tempdir): + directory = tempdir / "data_dir" + directory.mkdir() + metadata_path = directory / "_metadata" + + table = pa.table( + { + "col1": [1, 2, 3], + "col2": [1, 2, 3], + "year": [2020, 2020, 2021] + } + ) + + class KmsClient(pe.KmsClient): + def unwrap_key(self, wrapped_key, master_key_identifier): + return base64.b64decode(wrapped_key) + + def wrap_key(self, key_bytes, master_key_identifier): + return base64.b64encode(key_bytes) + + crypto_factory = pe.CryptoFactory(lambda *a, **k: KmsClient()) + encryption_config = pe.EncryptionConfiguration( + footer_key="TEST", + column_keys={ + "TEST": ["col2"] + }, + double_wrapping=False, + plaintext_footer=False, + ) + kms_connection_config = pe.KmsConnectionConfig() + parquet_encryption_cfg = ds.ParquetEncryptionConfig( + crypto_factory, kms_connection_config, encryption_config + ) + encryption_properties = crypto_factory.file_encryption_properties( + kms_connection_config, encryption_config) + + metadata_collector = [] + + pq.write_to_dataset( + table, + directory, + partitioning=ds.partitioning( + schema=pa.schema([ + pa.field("year", pa.int16()) + ]), + flavor="hive" + ), + encryption_config=parquet_encryption_cfg, + metadata_collector=metadata_collector + ) + + pq.write_metadata( + pa.schema( + field + for field in table.schema + if field.name != "year" + ), + metadata_path, + metadata_collector, + encryption_properties=encryption_properties + ) + + decryption_config = pe.DecryptionConfiguration(cache_lifetime=300) + kms_connection_config = pe.KmsConnectionConfig() + + decryption_properties = crypto_factory.file_decryption_properties( + kms_connection_config, decryption_config) + pq_scan_opts = ds.ParquetFragmentScanOptions( + decryption_properties=decryption_properties + ) + pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts) + + dataset = ds.parquet_dataset( + metadata_path, + format=pformat, + partitioning=ds.partitioning( + schema=pa.schema([ + pa.field("year", pa.int16()) + ]), + flavor="hive" + ) + ) + + new_table = dataset.to_table() + assert table == new_table