Skip to content

Commit

Permalink
changes to WriteEncryptedFileMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jul 29, 2024
1 parent f734012 commit 02e2e21
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 89 deletions.
188 changes: 159 additions & 29 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,62 @@ using arrow::internal::checked_pointer_cast;
namespace arrow {
namespace dataset {

class DatasetTestBase : public ::testing::Test {
public:
void SetUp() override {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Init dataset and partitioning.
ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning());

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

void PrepareTableAndPartitioning() {
// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "a" ],
[ 3, 6, 2, "a" ],
[ 4, 5, 1, "a" ],
[ 5, 4, 2, "a" ],
[ 6, 3, 1, "b" ],
[ 7, 2, 2, "b" ],
[ 8, 1, 1, "b" ],
[ 9, 0, 2, "b" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));
}

protected:
std::shared_ptr<fs::FileSystem> file_system_;
std::shared_ptr<Table> table_;
std::shared_ptr<Partitioning> partitioning_;
};

// Base class to test writing and reading encrypted dataset.
class DatasetEncryptionTestBase : public ::testing::Test {
public:
Expand Down Expand Up @@ -242,10 +298,67 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {
TEST_F(DatasetTestBase, ReadDatasetFromMetadata) {
auto reader_properties = parquet::default_reader_properties();

std::vector<std::string> paths = {"part=a/part0.parquet", "part=b/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
metadata[0]->AppendRowGroups(*metadata[1]);

std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteMetaDataFile(*metadata[0], stream.get());
ARROW_EXPECT_OK(stream->Close());

auto file_format = std::make_shared<ParquetFileFormat>();
ParquetFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

TEST_F(DatasetTestBase, ReadDatasetFromEncryptedMetadata) {
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;
auto crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>();

// Prepare encryption properties.
std::unordered_map<std::string, std::string> key_map;
key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);
auto kms_client_factory =
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
/*wrap_locally=*/true, key_map);
crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory));
auto kms_connection_config_ =
std::make_shared<parquet::encryption::KmsConnectionConfig>();

auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
Expand All @@ -257,42 +370,59 @@ TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {
auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);
auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties(
*kms_connection_config_, *decryption_config.get());
*kms_connection_config_, *decryption_config);

auto reader_properties = parquet::default_reader_properties();
reader_properties.file_decryption_properties(file_decryption_properties);

auto writer_properties = ::parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
std::vector<std::string> paths = {"part=a/part0.parquet", "part=b/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

EXPECT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream("/foo.parquet"));
ASSERT_OK_NO_THROW(::parquet::arrow::WriteTable(*table_, ::arrow::default_memory_pool(),
stream, 3, writer_properties));
ARROW_EXPECT_OK(stream->Close());

std::shared_ptr<Table> out;
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("/foo.parquet"));
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::Open(input, reader_properties);

auto metadata = parquet_reader->metadata();

file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);
for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
metadata[0]->AppendRowGroups(*metadata[1]);

ASSERT_OK_AND_ASSIGN(stream, file_system_->OpenOutputStream("/_metadata"));
WriteEncryptedMetadataFile(*metadata.get(), stream.get(), file_encryption_properties);
std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteEncryptedMetadataFile(*metadata[0], stream, file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

ASSERT_OK_AND_ASSIGN(auto input2, file_system_->OpenInputFile("/_metadata"));
parquet_reader = parquet::ParquetFileReader::Open(input2, reader_properties);
auto metadata2 = parquet_reader->metadata();

ASSERT_EQ(metadata2->num_row_groups(), 4);
ASSERT_EQ(metadata2->num_rows(), 10);
ASSERT_EQ(metadata2->num_columns(), 4);
ASSERT_TRUE(metadata->Equals(*metadata2.get()));
// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

ParquetFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Reuse the dataset above to scan it twice to make sure decryption works correctly.
for (size_t i = 0; i < 2; ++i) {
// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, ArrowOutputStream* sink,
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink,
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
Expand Down
38 changes: 13 additions & 25 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,53 +567,41 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

// TODO: remove
void WriteEncryptedMetadataFile(
const FileMetaData& metadata, ::arrow::io::OutputStream* sink,
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto file_encryptor = std::make_unique<InternalFileEncryptor>(
file_encryption_properties.get(), ::arrow::default_memory_pool());

if (file_encryption_properties->encrypted_footer()) {
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
}

if (file_encryption_properties->encrypted_footer()) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);
auto metadata_start = static_cast<uint64_t>(position);

auto writer_props = parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
auto crypto_metadata =
FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
WriteFileCryptoMetaData(*crypto_metadata, sink);

auto builder = FileMetaDataBuilder::Make(metadata.schema(), writer_props);
auto crypto_metadata = builder->GetCryptoMetaData();
WriteFileCryptoMetaData(*crypto_metadata, sink.get());

auto footer_encryptor = file_encryptor->GetFooterEncryptor();
WriteEncryptedFileMetadata(metadata, sink, footer_encryptor, true);
WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);
PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
auto footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
PARQUET_THROW_NOT_OK(
sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(metadata, sink, footer_signing_encryptor, false);
WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
}
}

void WriteEncryptedMetadataFile2(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto schema = ::arrow::internal::checked_pointer_cast<GroupNode>(
metadata.schema()->schema_root());
auto writer_props =
WriterProperties::Builder().encryption(file_encryption_properties)->build();
auto writer = ParquetFileWriter::Open(sink, schema, writer_props);
writer->Close();
file_encryptor->WipeOutEncryptionKeys();
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void WriteMetaDataFile(const FileMetaData& file_metadata,

PARQUET_EXPORT
void WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink,
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

PARQUET_EXPORT
Expand All @@ -130,9 +130,10 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink,
std::shared_ptr<::arrow::io::OutputStream> sink,
const std::shared_ptr<Encryptor>& encryptor = NULLPTR,
bool encrypt_footer = false);

PARQUET_EXPORT
void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
::arrow::io::OutputStream* sink);
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:

CStatus WriteEncryptedMetadataFile(
const CFileMetaData& file_metadata,
const COutputStream* sink,
shared_ptr[CFileEncryptionProperties]& encryption_properties)
shared_ptr[COutputStream] sink,
shared_ptr[CFileEncryptionProperties] encryption_properties)

cdef class FileEncryptionProperties:
"""File-level encryption properties for the low-level API"""
Expand Down
9 changes: 6 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,10 @@ cdef class FileMetaData(_Weakrefable):
cdef:
shared_ptr[COutputStream] sink
c_string c_where
shared_ptr[CFileEncryptionProperties] properties
shared_ptr[CFileEncryptionProperties] c_properties
# Builder* encryption(
# shared_ptr[CFileEncryptionProperties]
# file_encryption_properties)

try:
where = _stringify_path(where)
Expand All @@ -1073,12 +1076,12 @@ cdef class FileMetaData(_Weakrefable):
sink = GetResultValue(FileOutputStream.Open(c_where))

if encryption_properties is not None:
properties = (<FileEncryptionProperties> encryption_properties).unwrap()
c_properties = (<FileEncryptionProperties> encryption_properties).unwrap()

with nogil:
if encryption_properties is not None:
check_status(
WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), properties))
WriteEncryptedMetadataFile(deref(self._metadata), sink, c_properties))
else:
check_status(WriteMetaDataFile(deref(self._metadata), sink.get()))

Expand Down
Loading

0 comments on commit 02e2e21

Please sign in to comment.