Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jun 2, 2024
1 parent 19044ee commit 63b6a61
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 6 deletions.
90 changes: 90 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"
#include "parquet/file_writer.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
Expand Down Expand Up @@ -240,6 +242,94 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {
std::string_view kFooterKeyMasterKey = "0123456789012345";
std::string_view kFooterKeyMasterKeyId = "footer_key";
std::string_view kFooterKeyName = "footer_key";
std::string_view kColumnMasterKey = "1234567890123450";
std::string_view kColumnMasterKeyId = "col_key";

auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
auto table = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "c" ],
[ 3, 6, 2, "c" ],
[ 4, 5, 1, "e" ],
[ 5, 4, 2, "e" ],
[ 6, 3, 1, "g" ],
[ 7, 2, 2, "g" ],
[ 8, 1, 1, "i" ],
[ 9, 0, 2, "i" ]
])"});

// Prepare encryption properties.
std::unordered_map<std::string, std::string> key_map;
key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);

auto crypto_factory = std::make_shared<parquet::encryption::CryptoFactory>();
auto kms_client_factory =
std::make_shared<::parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
/*wrap_locally=*/true, key_map);
crypto_factory->RegisterKmsClientFactory(std::move(kms_client_factory));
auto kms_connection_config =
std::make_shared<parquet::encryption::KmsConnectionConfig>();

auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto file_encryption_properties = crypto_factory->GetFileEncryptionProperties(
*kms_connection_config, *encryption_config);
auto writer_properties = ::parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();

// Create the ReaderProperties object using the FileDecryptionProperties object
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();

auto file_decryption_properties = crypto_factory->GetFileDecryptionProperties(
*kms_connection_config, *decryption_config);
auto reader_properties = parquet::default_reader_properties();
reader_properties.file_decryption_properties(file_decryption_properties);

PARQUET_ASSIGN_OR_THROW(
auto stream, ::arrow::io::BufferOutputStream::Create(1024, default_memory_pool()));
ASSERT_OK_NO_THROW(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
stream, 10, writer_properties));

EXPECT_OK_AND_ASSIGN(auto buffer, stream->Finish())

// Read entire file as a single Arrow table
std::unique_ptr<::parquet::arrow::FileReader> reader;
::parquet::arrow::FileReaderBuilder reader_builder;
ASSERT_OK(reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer),
reader_properties));
ASSERT_OK(reader_builder.Build(&reader));
auto metadata = reader->parquet_reader()->metadata();

PARQUET_ASSIGN_OR_THROW(
stream, ::arrow::io::BufferOutputStream::Create(1024, default_memory_pool()));
file_encryption_properties = crypto_factory->GetFileEncryptionProperties(
*kms_connection_config, *encryption_config);
::parquet::WriteEncryptedMetadataFile(*metadata.get(), stream,
file_encryption_properties);
EXPECT_OK_AND_ASSIGN(buffer, stream->Finish());

ASSERT_OK(reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer),
reader_properties));
ASSERT_OK(reader_builder.Build(&reader));

ASSERT_TRUE(
metadata->schema()->Equals(*reader->parquet_reader()->metadata()->schema()));
ASSERT_EQ(metadata->num_columns(), reader->parquet_reader()->metadata()->num_columns());
ASSERT_EQ(reader->parquet_reader()->metadata()->num_rows(), 0);
ASSERT_EQ(reader->parquet_reader()->metadata()->num_row_groups(), 0);
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,14 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
return Status::OK();
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
return Status::OK();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
std::shared_ptr<WriterProperties> properties,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ PARQUET_EXPORT
::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
///
/// This writes one table in a single shot. To write a Parquet file with
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,55 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

// TODO: remove
// void WriteEncryptedMetadataFile2(
// const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
// std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
// auto file_encryptor = std::make_unique<InternalFileEncryptor>(
// file_encryption_properties.get(), ::arrow::default_memory_pool());
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else {
// // Encrypted file with plaintext footer mode.
// PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
// }
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
// uint64_t metadata_start = static_cast<uint64_t>(position);
//
// auto writer_props = parquet::WriterProperties::Builder()
// .encryption(file_encryption_properties)
// ->build();
// auto crypto_metadata =
// FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
// WriteFileCryptoMetaData(*crypto_metadata, sink.get());
//
// auto footer_encryptor = file_encryptor->GetFooterEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);
// PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
// uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
// PARQUET_THROW_NOT_OK(
// sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else { // Encrypted file with plaintext footer
// auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
// }
//}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto schema = ::arrow::internal::checked_pointer_cast<GroupNode>(
metadata.schema()->schema_root());
auto writer_props =
WriterProperties::Builder().encryption(file_encryption_properties)->build();
auto writer = ParquetFileWriter::Open(sink, schema, writer_props);
writer->Close();
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
ArrowOutputStream* sink) {
crypto_metadata.WriteTo(sink);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ PARQUET_EXPORT
void WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

PARQUET_EXPORT
void WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
ArrowOutputStream* sink,
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,11 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
const CFileMetaData& file_metadata,
const COutputStream* sink)

CStatus WriteEncryptedMetadataFile(
const CFileMetaData& file_metadata,
shared_ptr[COutputStream]& sink,
shared_ptr[CFileEncryptionProperties]& encryption_properties)

cdef class FileEncryptionProperties:
"""File-level encryption properties for the low-level API"""
cdef:
Expand Down
15 changes: 12 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ cdef class FileMetaData(_Weakrefable):
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))

def write_metadata_file(self, where):
def write_metadata_file(self, where, encryption_properties=None):
"""
Write the metadata to a metadata-only Parquet file.
Expand All @@ -1054,10 +1054,13 @@ cdef class FileMetaData(_Weakrefable):
where : path or file-like object
Where to write the metadata. Should be a writable path on
the local filesystem, or a writable file-like object.
encryption_properties : EncryptionProperties
Optional encryption properties to use when encrypting metadata.
"""
cdef:
shared_ptr[COutputStream] sink
c_string c_where
shared_ptr[CFileEncryptionProperties] properties

try:
where = _stringify_path(where)
Expand All @@ -1068,9 +1071,15 @@ cdef class FileMetaData(_Weakrefable):
with nogil:
sink = GetResultValue(FileOutputStream.Open(c_where))

if encryption_properties is not None:
properties = (<FileEncryptionProperties> encryption_properties).unwrap()

with nogil:
check_status(
WriteMetaDataFile(deref(self._metadata), sink.get()))
if encryption_properties is not None:
check_status(
WriteEncryptedMetadataFile(deref(self._metadata), sink, properties))
else:
check_status(WriteMetaDataFile(deref(self._metadata), sink.get()))


cdef class ParquetSchema(_Weakrefable):
Expand Down
7 changes: 4 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,7 +2147,7 @@ def file_visitor(written_file):


def write_metadata(schema, where, metadata_collector=None, filesystem=None,
**kwargs):
encryption_properties=None, **kwargs):
"""
Write metadata-only Parquet file from schema. This can be used with
`write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
Expand All @@ -2162,6 +2162,7 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
filesystem : FileSystem, default None
If nothing passed, will be inferred from `where` if path-like, else
`where` is already a file-like object so no filesystem is needed.
encryption_properties : FileEncryptionProperties, default None
**kwargs : dict,
Additional kwargs for ParquetWriter class. See docstring for
`ParquetWriter` for more information.
Expand Down Expand Up @@ -2213,9 +2214,9 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
metadata.append_row_groups(m)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f)
metadata.write_metadata_file(f, encryption_properties)
else:
metadata.write_metadata_file(where)
metadata.write_metadata_file(where, encryption_properties)


def read_metadata(where, memory_map=False, decryption_properties=None,
Expand Down
104 changes: 104 additions & 0 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,107 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes:
dataset = ds.dataset(path, format=file_format, filesystem=mockfs)
new_table = dataset.to_table()
assert table == new_table


def test_dataset_metadata_encryption_decryption(tempdir):
table = create_sample_table()

encryption_config = create_encryption_config()
decryption_config = create_decryption_config()
kms_connection_config = create_kms_connection_config()

crypto_factory = pe.CryptoFactory(kms_factory)
parquet_encryption_cfg = ds.ParquetEncryptionConfig(
crypto_factory, kms_connection_config, encryption_config
)
parquet_decryption_cfg = ds.ParquetDecryptionConfig(
crypto_factory, kms_connection_config, decryption_config
)

# create write_options with dataset encryption config
pformat = pa.dataset.ParquetFileFormat()
write_options = pformat.make_write_options(encryption_config=parquet_encryption_cfg)

path = str(tempdir / "sample_dataset")
metadata_file = str(tempdir / "_metadata")
mockfs = fs._MockFileSystem()
mockfs.create_dir(path)

partitioning = ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int64())
]),
flavor="hive"
)

ds.write_dataset(
data=table,
base_dir=path,
format=pformat,
file_options=write_options,
filesystem=mockfs,
)

# read without decryption config -> should error is dataset was properly encrypted
pformat = pa.dataset.ParquetFileFormat()
with pytest.raises(IOError, match=r"no decryption"):
ds.dataset(path, format=pformat, filesystem=mockfs)

# set decryption config for parquet fragment scan options
pq_scan_opts = ds.ParquetFragmentScanOptions(
decryption_config=parquet_decryption_cfg
)
pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)
dataset = ds.dataset(path, format=pformat, filesystem=mockfs)

assert table.equals(dataset.to_table())

metadata_collector = []

pq.write_to_dataset(
table,
path,
partitioning=partitioning,
encryption_config=parquet_encryption_cfg,
metadata_collector=metadata_collector,
filesystem=mockfs
)

encryption_properties = crypto_factory.file_encryption_properties(
kms_connection_config, encryption_config)
decryption_properties = crypto_factory.file_decryption_properties(
kms_connection_config, decryption_config)

metadata_schema = pa.schema(
field
for field in table.schema
if field.name != "year"
)
pq.write_metadata(
metadata_schema,
metadata_file,
metadata_collector,
encryption_properties=encryption_properties,
filesystem=mockfs,
)

dataset = ds.parquet_dataset(
metadata_file,
format=pformat,
partitioning=partitioning,
filesystem=mockfs
)

new_table = dataset.to_table()

for field in table.schema:
# Schema order is not persevered
assert field == new_table.schema.field_by_name(field.name)

metadata = pq.read_metadata(
metadata_file, decryption_properties=decryption_properties, filesystem=mockfs)

assert metadata.num_columns == 2
assert metadata.num_rows == 0
assert metadata.num_row_groups == 0
assert metadata.schema.to_arrow_schema() == metadata_schema

0 comments on commit 63b6a61

Please sign in to comment.