Skip to content

Commit

Permalink
Python wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Oct 8, 2024
1 parent cef1bae commit 2ebaf3a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Result<std::vector<std::shared_ptr<parquet::FileMetaData>>> ReadMetadata(
PARQUET_ASSIGN_OR_THROW(auto input, file_system->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// file_metadata->set_file_path(path);
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
return metadata;
Expand Down
20 changes: 11 additions & 9 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1097,32 +1097,34 @@ void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
}

::arrow::Result<std::shared_ptr<parquet::FileMetaData>> FileMetaData::CoalesceMetadata(
const std::vector<std::shared_ptr<parquet::FileMetaData>>& metadata_list,
const std::shared_ptr<parquet::WriterProperties>& writer_props) {
std::vector<std::shared_ptr<parquet::FileMetaData>>& metadata_list,
std::shared_ptr<parquet::WriterProperties>& writer_props) {
if (metadata_list.empty()) {
return ::arrow::Status::Invalid("No metadata to coalesce");
}

std::vector<std::string> values, keys;

const auto& metadata = metadata_list[0];
// Read metadata from all dataset files and store AADs and paths as key-value metadata.
for (size_t i = 1; i < metadata_list.size(); i++) {
for (size_t i = 0; i < metadata_list.size(); i++) {
const auto& file_metadata = metadata_list[i];
keys.push_back("row_group_aad_" + std::to_string(i));
values.push_back(file_metadata->file_aad());
metadata->AppendRowGroups(*file_metadata);
if (i > 0) {
metadata_list[0]->AppendRowGroups(*file_metadata);
}
}

// Create a new FileMetadata object with the created AADs and paths as
// key_value_metadata.
auto fmd_builder = parquet::FileMetaDataBuilder::Make(metadata->schema(), writer_props);
auto fmd_builder =
parquet::FileMetaDataBuilder::Make(metadata_list[0]->schema(), writer_props);
const std::shared_ptr<const KeyValueMetadata> file_aad_metadata =
::arrow::key_value_metadata(keys, values);
auto metadata2 = fmd_builder->Finish(file_aad_metadata);
metadata2->AppendRowGroups(*metadata);
auto metadata = fmd_builder->Finish(file_aad_metadata);
metadata->AppendRowGroups(*metadata_list[0]);

return metadata2;
return metadata;
}

class FileCryptoMetaData::FileCryptoMetaDataImpl {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ class PARQUET_EXPORT FileMetaData {
/// \param[in] writer_props
/// \return
static ::arrow::Result<std::shared_ptr<FileMetaData>> CoalesceMetadata(
const std::vector<std::shared_ptr<FileMetaData>>& metadata_list,
const std::shared_ptr<WriterProperties>& writer_props);
std::vector<std::shared_ptr<FileMetaData>>& metadata_list,
std::shared_ptr<WriterProperties>& writer_props);

/// \brief Set the AAD of decryptor of the file.
///
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
inline EncryptionAlgorithm encryption_algorithm() const
inline const c_string& footer_signing_key_metadata() const

cdef CResult[shared_ptr[CFileMetaData]] CFileMetaData_CoalesceMetadata \
" parquet::FileMetaData::CoalesceMetadata"(const vector[shared_ptr[CFileMetaData]]& metadata_list,
const shared_ptr[WriterProperties]& properties)

cdef shared_ptr[CFileMetaData] CFileMetaData_Make \
" parquet::FileMetaData::Make"(const void* serialized_metadata,
uint32_t* metadata_len)
Expand Down
19 changes: 19 additions & 0 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,25 @@ cdef class FileMetaData(_Weakrefable):
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))

@classmethod
def coalesce_metadata(cls, metadata_list):
"""
"""
cdef:
FileMetaData metadata = FileMetaData.__new__(FileMetaData)
vector[shared_ptr[CFileMetaData]] c_metadata_list
shared_ptr[WriterProperties] c_properties = _create_writer_properties()
shared_ptr[CFileMetaData] c_metadata

for metadata in metadata_list:
c_metadata_list.push_back((<FileMetaData> metadata).sp_metadata)

c_metadata = GetResultValue(
CFileMetaData_CoalesceMetadata(c_metadata_list, c_properties))
metadata.init(c_metadata)
return metadata

def write_metadata_file(self, where, encryption_properties=None):
"""
Write the metadata to a metadata-only Parquet file.
Expand Down
12 changes: 7 additions & 5 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import warnings

import pyarrow as pa
# from pyarrow._parquet cimport _create_writer_properties

try:
import pyarrow._parquet as _parquet
Expand Down Expand Up @@ -2053,7 +2054,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
The metadata attribute will be the parquet metadata of the file.
This metadata will have the file path attribute set and can be used
to build a _metadata file. The metadata attribute will be None if
to build a _metadata file. The metadata attribute will be None if
the format is not parquet.
Example visitor which simple collects the filenames created::
Expand Down Expand Up @@ -2149,7 +2150,10 @@ def file_visitor(written_file):

if metadata_collector is not None:
def file_visitor(written_file):
metadata_collector.append(written_file.metadata)
metadata = written_file.metadata
# TODO: is set_file_path needed?
metadata.set_file_path(written_file.path)
metadata_collector.append(metadata)

# map format arguments
parquet_format = ds.ParquetFileFormat()
Expand Down Expand Up @@ -2251,12 +2255,10 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
writer.close()

if metadata_collector is not None:
metadata = read_metadata(where, filesystem=filesystem, **read_metadata_kwargs)
if hasattr(where, "seek"):
where.seek(cursor_position) # file-like, set cursor back.

for m in metadata_collector:
metadata.append_row_groups(m)
metadata = FileMetaData.coalesce_metadata(metadata_collector)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f, **write_metadata_kwargs)
Expand Down

0 comments on commit 2ebaf3a

Please sign in to comment.