From 767898250a2ba9b0d372c06bd64ea3970c594ef9 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 22 Jul 2024 20:28:36 +0200 Subject: [PATCH] changes to WriteEncryptedFileMetadata --- .../dataset/file_parquet_encryption_test.cc | 187 +++++++++++++++--- cpp/src/parquet/arrow/writer.cc | 2 +- cpp/src/parquet/file_writer.cc | 38 ++-- cpp/src/parquet/file_writer.h | 5 +- python/pyarrow/_parquet.pxd | 2 +- python/pyarrow/_parquet.pyx | 9 +- .../pyarrow/tests/test_dataset_encryption.py | 69 ++++++- 7 files changed, 242 insertions(+), 70 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc index b9cca81286fa1..6092ea50cafe6 100644 --- a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc @@ -53,6 +53,62 @@ using arrow::internal::checked_pointer_cast; namespace arrow { namespace dataset { +class DatasetTestBase : public ::testing::Test { + public: + void SetUp() override { + // Creates a mock file system using the current time point. + EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make( + std::chrono::system_clock::now(), {})); + ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir))); + + // Init dataset and partitioning. + ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning()); + + auto file_format = std::make_shared(); + auto parquet_file_write_options = + checked_pointer_cast(file_format->DefaultWriteOptions()); + + // Write dataset. + auto dataset = std::make_shared(table_); + EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = parquet_file_write_options; + write_options.filesystem = file_system_; + write_options.base_dir = kBaseDir; + write_options.partitioning = partitioning_; + write_options.basename_template = "part{i}.parquet"; + ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); + } + + void PrepareTableAndPartitioning() { + // Prepare table data. + auto table_schema = schema({field("a", int64()), field("c", int64()), + field("e", int64()), field("part", utf8())}); + table_ = TableFromJSON(table_schema, {R"([ + [ 0, 9, 1, "a" ], + [ 1, 8, 2, "a" ], + [ 2, 7, 1, "a" ], + [ 3, 6, 2, "a" ], + [ 4, 5, 1, "a" ], + [ 5, 4, 2, "a" ], + [ 6, 3, 1, "b" ], + [ 7, 2, 2, "b" ], + [ 8, 1, 1, "b" ], + [ 9, 0, 2, "b" ] + ])"}); + + // Use a Hive-style partitioning scheme. + partitioning_ = std::make_shared(schema({field("part", utf8())})); + } + + protected: + std::shared_ptr file_system_; + std::shared_ptr table_; + std::shared_ptr partitioning_; +}; + // Base class to test writing and reading encrypted dataset. class DatasetEncryptionTestBase : public ::testing::Test { public: @@ -242,10 +298,67 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) { ASSERT_EQ(checked_pointer_cast(table->column(2)->chunk(0))->GetView(0), 1); } -TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) { +TEST_F(DatasetTestBase, ReadDatasetFromMetadata) { + auto reader_properties = parquet::default_reader_properties(); + + std::vector paths = {"part=a/part0.parquet", "part=b/part0.parquet"}; + std::vector> metadata; + + for (const auto& path : paths) { + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path)); + auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties); + auto file_metadata = parquet_reader->metadata(); + // Make sure file_paths are stored in metadata + file_metadata->set_file_path(path); + metadata.push_back(file_metadata); + } + metadata[0]->AppendRowGroups(*metadata[1]); + + std::string metadata_path = "_metadata"; + ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path)); + WriteMetaDataFile(*metadata[0], stream.get()); + ARROW_EXPECT_OK(stream->Close()); + + auto file_format = std::make_shared(); + ParquetFactoryOptions factory_options; + factory_options.partitioning = partitioning_; + factory_options.partition_base_dir = kBaseDir; + ASSERT_OK_AND_ASSIGN(auto dataset_factory, + ParquetDatasetFactory::Make(metadata_path, file_system_, + file_format, factory_options)); + + // Create the dataset + ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); + + // Read dataset into table + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); + + // Verify the data was read correctly + ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); + + // Validate the table + ASSERT_OK(combined_table->ValidateFull()); + AssertTablesEqual(*combined_table, *table_); +} + +TEST_F(DatasetTestBase, ReadDatasetFromEncryptedMetadata) { auto encryption_config = std::make_shared( std::string(kFooterKeyName)); encryption_config->column_keys = kColumnKeyMapping; + auto crypto_factory_ = std::make_shared(); + + // Prepare encryption properties. + std::unordered_map key_map; + key_map.emplace(kColumnMasterKeyId, kColumnMasterKey); + key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey); + auto kms_client_factory = + std::make_shared( + /*wrap_locally=*/true, key_map); + crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory)); + auto kms_connection_config_ = std::make_shared(); + auto decryption_config = std::make_shared(); @@ -257,42 +370,60 @@ TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) { auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties( *kms_connection_config_, *encryption_config); auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties( - *kms_connection_config_, *decryption_config.get()); + *kms_connection_config_, *decryption_config); auto reader_properties = parquet::default_reader_properties(); reader_properties.file_decryption_properties(file_decryption_properties); - auto writer_properties = ::parquet::WriterProperties::Builder() - .encryption(file_encryption_properties) - ->build(); - - EXPECT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream("/foo.parquet")); - ASSERT_OK_NO_THROW(::parquet::arrow::WriteTable(*table_, ::arrow::default_memory_pool(), - stream, 3, writer_properties)); - ARROW_EXPECT_OK(stream->Close()); - - std::shared_ptr
out; - ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("/foo.parquet")); - std::unique_ptr parquet_reader = - parquet::ParquetFileReader::Open(input, reader_properties); - auto metadata = parquet_reader->metadata(); + std::vector paths = {"part=a/part0.parquet", "part=b/part0.parquet"}; + std::vector> metadata; - file_encryption_properties = crypto_factory_->GetFileEncryptionProperties( - *kms_connection_config_, *encryption_config); + for (const auto& path : paths) { + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path)); + auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties); + auto file_metadata = parquet_reader->metadata(); + // Make sure file_paths are stored in metadata + file_metadata->set_file_path(path); + metadata.push_back(file_metadata); + } + metadata[0]->AppendRowGroups(*metadata[1]); - ASSERT_OK_AND_ASSIGN(stream, file_system_->OpenOutputStream("/_metadata")); - WriteEncryptedMetadataFile(*metadata.get(), stream.get(), file_encryption_properties); + std::string metadata_path = "_metadata"; + ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path)); + WriteEncryptedMetadataFile(*metadata[0], stream, file_encryption_properties); ARROW_EXPECT_OK(stream->Close()); - ASSERT_OK_AND_ASSIGN(auto input2, file_system_->OpenInputFile("/_metadata")); - parquet_reader = parquet::ParquetFileReader::Open(input2, reader_properties); - auto metadata2 = parquet_reader->metadata(); - - ASSERT_EQ(metadata2->num_row_groups(), 4); - ASSERT_EQ(metadata2->num_rows(), 10); - ASSERT_EQ(metadata2->num_columns(), 4); - ASSERT_TRUE(metadata->Equals(*metadata2.get())); + // Set scan options. + auto parquet_scan_options = std::make_shared(); + parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config); + + auto file_format = std::make_shared(); + file_format->default_fragment_scan_options = std::move(parquet_scan_options); + + ParquetFactoryOptions factory_options; + factory_options.partitioning = partitioning_; + factory_options.partition_base_dir = kBaseDir; + ASSERT_OK_AND_ASSIGN(auto dataset_factory, + ParquetDatasetFactory::Make(metadata_path, file_system_, + file_format, factory_options)); + + // Create the dataset + ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); + + // Reuse the dataset above to scan it twice to make sure decryption works correctly. + for (size_t i = 0; i < 2; ++i) { + // Read dataset into table + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); + + // Verify the data was read correctly + ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); + // Validate the table + ASSERT_OK(combined_table->ValidateFull()); + AssertTablesEqual(*combined_table, *table_); + } } // GH-39444: This test covers the case where parquet dataset scanner crashes when diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 2d8dd4a7997da..e49df7d651c76 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -610,7 +610,7 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata, } Status WriteEncryptedMetadataFile( - const FileMetaData& file_metadata, ArrowOutputStream* sink, + const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink, std::shared_ptr file_encryption_properties) { PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink, file_encryption_properties)); diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 8728edf78ce75..88c65e5f9a43a 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -567,53 +567,41 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, } } -// TODO: remove void WriteEncryptedMetadataFile( - const FileMetaData& metadata, ::arrow::io::OutputStream* sink, + const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink, std::shared_ptr file_encryption_properties) { auto file_encryptor = std::make_unique( file_encryption_properties.get(), ::arrow::default_memory_pool()); if (file_encryption_properties->encrypted_footer()) { PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4)); - } else { - // Encrypted file with plaintext footer mode. - PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4)); - } - if (file_encryption_properties->encrypted_footer()) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell()); - uint64_t metadata_start = static_cast(position); + auto metadata_start = static_cast(position); auto writer_props = parquet::WriterProperties::Builder() .encryption(file_encryption_properties) ->build(); - auto crypto_metadata = - FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData(); - WriteFileCryptoMetaData(*crypto_metadata, sink); + + auto builder = FileMetaDataBuilder::Make(metadata.schema(), writer_props); + auto crypto_metadata = builder->GetCryptoMetaData(); + WriteFileCryptoMetaData(*crypto_metadata, sink.get()); auto footer_encryptor = file_encryptor->GetFooterEncryptor(); - WriteEncryptedFileMetadata(metadata, sink, footer_encryptor, true); + WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true); PARQUET_ASSIGN_OR_THROW(position, sink->Tell()); - uint32_t footer_and_crypto_len = static_cast(position - metadata_start); + auto footer_and_crypto_len = static_cast(position - metadata_start); PARQUET_THROW_NOT_OK( sink->Write(reinterpret_cast(&footer_and_crypto_len), 4)); PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4)); - } else { // Encrypted file with plaintext footer + } else { + // Encrypted file with plaintext footer mode. + PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4)); auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor(); - WriteEncryptedFileMetadata(metadata, sink, footer_signing_encryptor, false); + WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false); } -} -void WriteEncryptedMetadataFile2( - const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink, - std::shared_ptr file_encryption_properties) { - auto schema = ::arrow::internal::checked_pointer_cast( - metadata.schema()->schema_root()); - auto writer_props = - WriterProperties::Builder().encryption(file_encryption_properties)->build(); - auto writer = ParquetFileWriter::Open(sink, schema, writer_props); - writer->Close(); + file_encryptor->WipeOutEncryptionKeys(); } void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 8a80bbeedd60f..60b8474df90c8 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -119,7 +119,7 @@ void WriteMetaDataFile(const FileMetaData& file_metadata, PARQUET_EXPORT void WriteEncryptedMetadataFile( - const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink, + const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink, std::shared_ptr file_encryption_properties); PARQUET_EXPORT @@ -130,9 +130,10 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, PARQUET_EXPORT void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, - ::arrow::io::OutputStream* sink, + std::shared_ptr<::arrow::io::OutputStream> sink, const std::shared_ptr& encryptor = NULLPTR, bool encrypt_footer = false); + PARQUET_EXPORT void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, ::arrow::io::OutputStream* sink); diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index a66e7e74096cd..a0eff09a8c889 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -568,7 +568,7 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: CStatus WriteEncryptedMetadataFile( const CFileMetaData& file_metadata, const COutputStream* sink, - shared_ptr[CFileEncryptionProperties]& encryption_properties) + shared_ptr[CFileEncryptionProperties] encryption_properties) cdef class FileEncryptionProperties: """File-level encryption properties for the low-level API""" diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 4838d2f51d6f7..5aba6903a485b 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -1061,7 +1061,10 @@ cdef class FileMetaData(_Weakrefable): cdef: shared_ptr[COutputStream] sink c_string c_where - shared_ptr[CFileEncryptionProperties] properties + shared_ptr[CFileEncryptionProperties] c_properties + # Builder* encryption( + # shared_ptr[CFileEncryptionProperties] + # file_encryption_properties) try: where = _stringify_path(where) @@ -1073,12 +1076,12 @@ cdef class FileMetaData(_Weakrefable): sink = GetResultValue(FileOutputStream.Open(c_where)) if encryption_properties is not None: - properties = ( encryption_properties).unwrap() + c_properties = ( encryption_properties).unwrap() with nogil: if encryption_properties is not None: check_status( - WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), properties)) + WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), c_properties)) else: check_status(WriteMetaDataFile(deref(self._metadata), sink.get())) diff --git a/python/pyarrow/tests/test_dataset_encryption.py b/python/pyarrow/tests/test_dataset_encryption.py index acc1863fca590..4968b95aab986 100644 --- a/python/pyarrow/tests/test_dataset_encryption.py +++ b/python/pyarrow/tests/test_dataset_encryption.py @@ -306,23 +306,71 @@ def test_dataset_metadata_encryption_decryption(tempdir): pq.write_metadata( metadata_schema, metadata_file, - metadata_collector, + metadata_collector=metadata_collector, encryption_properties=encryption_properties, filesystem=mockfs, ) - dataset = ds.parquet_dataset( - metadata_file, - format=pformat, - partitioning=partitioning, - filesystem=mockfs + pq_scan_opts = ds.ParquetFragmentScanOptions( + # decryption_config=parquet_decryption_cfg, + # If using build from master + decryption_properties=decryption_properties ) + pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts) - new_table = dataset.to_table() + from pyarrow._dataset_parquet import ( # noqa + ParquetDatasetFactory, + ParquetFactoryOptions) + + options = ParquetFactoryOptions( + partition_base_dir=path, + partitioning=partitioning + ) - for field in table.schema: - # Schema order is not persevered - assert field == new_table.schema.field_by_name(field.name) + factory = ParquetDatasetFactory( + metadata_file, filesystem=mockfs, format=pformat, options=options) + # return factory.finish(schema) + + # dataset = ds.parquet_dataset(metadata_file, partitioning=partitioning, filesystem=mockfs, format=pformat) + + # dataset = ds.parquet_dataset( + # metadata_file, + # format=pformat, + # partitioning=partitioning, + # filesystem=mockfs + # ) + # pq_scan_opts = ds.ParquetFragmentScanOptions( + # decryption_config=parquet_decryption_cfg + # ) + # pq_scan_opts = ds.ParquetFragmentScanOptions( + # # decryption_config=parquet_decryption_cfg, + # # If using build from master + # decryption_properties=decryption_properties + # ) + # dataset = ds.dataset(metadata_file, format=pformat, partitioning=partitioning, filesystem=mockfs) + + # decryption_properties = cf.file_decryption_properties( + # kms_connection_config, decryption_config) + + # pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts) + # + # dataset = ds.parquet_dataset( + # os.path.join(location, "_metadata"), + # format=pformat, + # partitioning=ds.partitioning( + # schema=pa.schema([ + # pa.field("year", pa.int16()) + # ]), + # flavor="hive" + # ) + # ) + + # new_table = dataset.to_table() + # + # for field in table.schema: + # # Schema order is not persevered + # assert field == new_table.schema.field_by_name(field.name) + # metadata = pq.read_metadata( metadata_file, decryption_properties=decryption_properties, filesystem=mockfs) @@ -331,3 +379,4 @@ def test_dataset_metadata_encryption_decryption(tempdir): assert metadata.num_rows == 6 assert metadata.num_row_groups == 4 assert metadata.schema.to_arrow_schema() == metadata_schema + assert False