Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jun 2, 2024
1 parent a0d14e8 commit 7be8f5e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 68 deletions.
8 changes: 5 additions & 3 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
Expand Down Expand Up @@ -316,7 +315,8 @@ TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {
stream, ::arrow::io::BufferOutputStream::Create(1024, default_memory_pool()));
file_encryption_properties = crypto_factory->GetFileEncryptionProperties(
*kms_connection_config, *encryption_config);
::parquet::WriteEncryptedMetadataFile(*metadata, stream, file_encryption_properties);
::parquet::WriteEncryptedMetadataFile(*metadata.get(), stream,
file_encryption_properties);
EXPECT_OK_AND_ASSIGN(buffer, stream->Finish());

ASSERT_OK(reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer),
Expand All @@ -325,7 +325,9 @@ TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {

ASSERT_TRUE(
metadata->schema()->Equals(*reader->parquet_reader()->metadata()->schema()));
ASSERT_TRUE(reader->parquet_reader()->metadata()->Equals(*metadata));
ASSERT_EQ(metadata->num_columns(), reader->parquet_reader()->metadata()->num_columns());
ASSERT_EQ(reader->parquet_reader()->metadata()->num_rows(), 0);
ASSERT_EQ(reader->parquet_reader()->metadata()->num_row_groups(), 0);
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
Expand Down
93 changes: 42 additions & 51 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "parquet/arrow/schema.h"
#include "parquet/column_writer.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
Expand Down Expand Up @@ -568,61 +567,53 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

void WriteEncryptedMetadataFile2(
// TODO: remove
// void WriteEncryptedMetadataFile2(
// const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
// std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
// auto file_encryptor = std::make_unique<InternalFileEncryptor>(
// file_encryption_properties.get(), ::arrow::default_memory_pool());
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else {
// // Encrypted file with plaintext footer mode.
// PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
// }
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
// uint64_t metadata_start = static_cast<uint64_t>(position);
//
// auto writer_props = parquet::WriterProperties::Builder()
// .encryption(file_encryption_properties)
// ->build();
// auto crypto_metadata =
// FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
// WriteFileCryptoMetaData(*crypto_metadata, sink.get());
//
// auto footer_encryptor = file_encryptor->GetFooterEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);
// PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
// uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
// PARQUET_THROW_NOT_OK(
// sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else { // Encrypted file with plaintext footer
// auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
// }
//}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
// std::shared_ptr<::arrow::Schema> arrow_schema;
// ArrowReaderProperties props;
// PARQUET_THROW_NOT_OK(::parquet::arrow::FromParquetSchema(metadata.schema(), props,
// &arrow_schema));

auto gn = ::arrow::internal::checked_pointer_cast<GroupNode>(
auto schema = ::arrow::internal::checked_pointer_cast<GroupNode>(
metadata.schema()->schema_root());
auto writer_props =
WriterProperties::Builder().encryption(file_encryption_properties)->build();
std::unique_ptr<ParquetFileWriter> pw = ParquetFileWriter::Open(sink, gn, writer_props);
pw->AddKeyValueMetadata(metadata.key_value_metadata());
pw->AppendBufferedRowGroup();
// pw->AppendRowGroup();
// pw->AddKeyValueMetadata(metadata.key_value_metadata());
pw->Close();
}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto file_encryptor = std::make_unique<InternalFileEncryptor>(
file_encryption_properties.get(), ::arrow::default_memory_pool());

if (file_encryption_properties->encrypted_footer()) {
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
}

if (file_encryption_properties->encrypted_footer()) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);

auto writer_props = parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
auto crypto_metadata =
FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
WriteFileCryptoMetaData(*crypto_metadata, sink.get());

auto footer_encryptor = file_encryptor->GetFooterEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);
PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
PARQUET_THROW_NOT_OK(
sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
}
auto writer = ParquetFileWriter::Open(sink, schema, writer_props);
writer->Close();
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:

CStatus WriteEncryptedMetadataFile(
const CFileMetaData& file_metadata,
const shared_ptr[COutputStream]& sink,
const shared_ptr[CFileEncryptionProperties]& encryption_properties)
shared_ptr[COutputStream]& sink,
shared_ptr[CFileEncryptionProperties]& encryption_properties)

cdef class FileEncryptionProperties:
"""File-level encryption properties for the low-level API"""
Expand Down
49 changes: 37 additions & 12 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,16 @@ def test_dataset_metadata_encryption_decryption(tempdir):
write_options = pformat.make_write_options(encryption_config=parquet_encryption_cfg)

path = str(tempdir / "sample_dataset")
metadata_path = str(tempdir / "metadata")
metadata_file = str(tempdir / "metadata" / "_metadata")
metadata_file = str(tempdir / "_metadata")
mockfs = fs._MockFileSystem()
mockfs.create_dir(path)
mockfs.create_dir(metadata_path)

partitioning = ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int64())
]),
flavor="hive"
)

ds.write_dataset(
data=table,
Expand All @@ -279,19 +284,27 @@ def test_dataset_metadata_encryption_decryption(tempdir):

metadata_collector = []

# pq.write_to_dataset(
# table,
# metadata_path,
# encryption_config=parquet_encryption_cfg,
# metadata_collector=metadata_collector,
# filesystem=mockfs
# )
pq.write_to_dataset(
table,
path,
partitioning=partitioning,
encryption_config=parquet_encryption_cfg,
metadata_collector=metadata_collector,
filesystem=mockfs
)

encryption_properties = crypto_factory.file_encryption_properties(
kms_connection_config, encryption_config)
decryption_properties = crypto_factory.file_decryption_properties(
kms_connection_config, decryption_config)

metadata_schema = pa.schema(
field
for field in table.schema
if field.name != "year"
)
pq.write_metadata(
table.schema,
metadata_schema,
metadata_file,
metadata_collector,
encryption_properties=encryption_properties,
Expand All @@ -301,8 +314,20 @@ def test_dataset_metadata_encryption_decryption(tempdir):
dataset = ds.parquet_dataset(
metadata_file,
format=pformat,
partitioning=partitioning,
filesystem=mockfs
)

new_table = dataset.to_table()
assert table.schema == new_table.schema

for field in table.schema:
# Schema order is not persevered
assert field == new_table.schema.field_by_name(field.name)

metadata = pq.read_metadata(
metadata_file, decryption_properties=decryption_properties, filesystem=mockfs)

assert metadata.num_columns == 2
assert metadata.num_rows == 0
assert metadata.num_row_groups == 0
assert metadata.schema.to_arrow_schema() == metadata_schema

0 comments on commit 7be8f5e

Please sign in to comment.