diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 3745e893c3e..14ad84709d5 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -1,12 +1,25 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint8_t from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.libcudf.contiguous_split cimport packed_columns from .gpumemoryview cimport gpumemoryview from .table cimport Table +cdef class HostBuffer: + cdef unique_ptr[vector[uint8_t]] c_obj + cdef size_t nbytes + cdef Py_ssize_t[1] shape + cdef Py_ssize_t[1] strides + + @staticmethod + cdef HostBuffer from_unique_ptr( + unique_ptr[vector[uint8_t]] vec + ) + cdef class PackedColumns: cdef unique_ptr[packed_columns] c_obj diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 2a40d42e6e9..b24b7a21af1 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -29,18 +29,14 @@ __all__ = [ cdef class HostBuffer: """Owning host buffer that implements the buffer protocol""" - cdef unique_ptr[vector[uint8_t]] c_obj - cdef size_t nbytes - cdef Py_ssize_t[1] shape - cdef Py_ssize_t[1] strides - @staticmethod cdef HostBuffer from_unique_ptr( unique_ptr[vector[uint8_t]] vec ): - cdef HostBuffer out = HostBuffer() + cdef HostBuffer out = HostBuffer.__new__(HostBuffer) + # Allow construction from nullptr + out.nbytes = 0 if vec.get() == NULL else dereference(vec).size() out.c_obj = move(vec) - out.nbytes = dereference(out.c_obj).size() out.shape[0] = out.nbytes out.strides[0] = 1 return out @@ -48,7 +44,8 @@ cdef class HostBuffer: __hash__ = None def __getbuffer__(self, Py_buffer *buffer, int flags): - buffer.buf = dereference(self.c_obj).data() + # Empty vec produces empty buffer + buffer.buf = NULL if self.nbytes == 0 else dereference(self.c_obj).data() buffer.format = NULL # byte buffer.internal = NULL buffer.itemsize = 1 diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 9c476030ded..1a61c20d783 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -1,14 +1,26 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport ( + compression_type, + dictionary_policy, + statistics_freq, + SinkInfo, + SourceInfo, + TableInputMetadata, + TableWithMetadata, +) from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, + parquet_writer_options, + parquet_writer_options_builder, ) from pylibcudf.libcudf.types cimport size_type +from pylibcudf.table cimport Table from pylibcudf.types cimport DataType @@ -33,3 +45,49 @@ cpdef read_parquet( # ReaderColumnSchema reader_column_schema = *, # DataType timestamp_type = * ) + +cdef class ParquetWriterOptions: + cdef parquet_writer_options c_obj + cdef Table table_ref + cdef SinkInfo sink_ref + + cpdef void set_partitions(self, list partitions) + + cpdef void set_column_chunks_file_paths(self, list file_paths) + + cpdef void set_row_group_size_bytes(self, size_t size_bytes) + + cpdef void set_row_group_size_rows(self, size_type size_rows) + + cpdef void set_max_page_size_bytes(self, size_t size_bytes) + + cpdef void set_max_page_size_rows(self, size_type size_rows) + + cpdef void set_max_dictionary_size(self, size_t size_bytes) + +cdef class ParquetWriterOptionsBuilder: + cdef parquet_writer_options_builder c_obj + cdef Table table_ref + cdef SinkInfo sink_ref + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata) + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata) + + cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression) + + cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf) + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy val) + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled) + + cpdef ParquetWriterOptions build(self) + +cpdef memoryview write_parquet(ParquetWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index bcf1d1cce09..eb2ca68109b 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -1,7 +1,20 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from collections.abc import Mapping +from typing import Self + from pylibcudf.expressions import Expression -from pylibcudf.io.types import SourceInfo, TableWithMetadata +from pylibcudf.io.types import ( + CompressionType, + DictionaryPolicy, + PartitionInfo, + SinkInfo, + SourceInfo, + StatisticsFreq, + TableInputMetadata, + TableWithMetadata, +) +from pylibcudf.table import Table class ChunkedParquetReader: def __init__( @@ -34,3 +47,34 @@ def read_parquet( # reader_column_schema: ReaderColumnSchema = *, # timestamp_type: DataType = * ) -> TableWithMetadata: ... + +class ParquetWriterOptions: + def __init__(self): ... + @staticmethod + def builder( + sink: SinkInfo, table: Table + ) -> ParquetWriterOptionsBuilder: ... + def set_partitions(self, partitions: list[PartitionInfo]) -> None: ... + def set_column_chunks_file_paths(self, file_paths: list[str]) -> None: ... + def set_row_group_size_bytes(self, size_bytes: int) -> None: ... + def set_row_group_size_rows(self, size_rows: int) -> None: ... + def set_max_page_size_bytes(self, size_bytes: int) -> None: ... + def set_max_page_size_rows(self, size_rows: int) -> None: ... + def set_max_dictionary_size(self, size_bytes: int) -> None: ... + +class ParquetWriterOptionsBuilder: + def __init__(self): ... + def metadata(self, metadata: TableInputMetadata) -> Self: ... + def key_value_metadata( + self, metadata: list[Mapping[str, str]] + ) -> Self: ... + def compression(self, compression: CompressionType) -> Self: ... + def stats_level(self, sf: StatisticsFreq) -> Self: ... + def int96_timestamps(self, enabled: bool) -> Self: ... + def write_v2_headers(self, enabled: bool) -> Self: ... + def dictionary_policy(self, val: DictionaryPolicy) -> Self: ... + def utc_timestamps(self, enabled: bool) -> Self: ... + def write_arrow_schema(self, enabled: bool) -> Self: ... + def build(self) -> ParquetWriterOptions: ... + +def write_parquet(options: ParquetWriterOptions) -> memoryview: ... diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index b76a352d633..b95b1f39de1 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -1,22 +1,45 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from cython.operator cimport dereference -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from pylibcudf.contiguous_split cimport HostBuffer from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport ( + SinkInfo, + SourceInfo, + PartitionInfo, + TableInputMetadata, + TableWithMetadata +) from pylibcudf.libcudf.expressions cimport expression from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, parquet_reader_options, read_parquet as cpp_read_parquet, + write_parquet as cpp_write_parquet, + parquet_writer_options, +) +from pylibcudf.libcudf.io.types cimport ( + compression_type, + dictionary_policy as dictionary_policy_t, + partition_info, + statistics_freq, + table_with_metadata, ) -from pylibcudf.libcudf.io.types cimport table_with_metadata from pylibcudf.libcudf.types cimport size_type +from pylibcudf.table cimport Table -__all__ = ["ChunkedParquetReader", "read_parquet"] +__all__ = [ + "ChunkedParquetReader", + "ParquetWriterOptions", + "ParquetWriterOptionsBuilder", + "read_parquet", + "write_parquet" +] cdef parquet_reader_options _setup_parquet_reader_options( @@ -221,3 +244,336 @@ cpdef read_parquet( c_result = move(cpp_read_parquet(opts)) return TableWithMetadata.from_libcudf(c_result) + + +cdef class ParquetWriterOptions: + + @staticmethod + def builder(SinkInfo sink, Table table): + """ + Create builder to create ParquetWriterOptionsBuilder. + + Parameters + ---------- + sink : SinkInfo + The sink used for writer output + + table : Table + Table to be written to output + + Returns + ------- + ParquetWriterOptionsBuilder + """ + cdef ParquetWriterOptionsBuilder bldr = ParquetWriterOptionsBuilder.__new__( + ParquetWriterOptionsBuilder + ) + bldr.c_obj = parquet_writer_options.builder(sink.c_obj, table.view()) + bldr.table_ref = table + bldr.sink_ref = sink + return bldr + + cpdef void set_partitions(self, list partitions): + """ + Sets partitions. + + Parameters + ---------- + partitions : list[Partitions] + Partitions of input table in {start_row, num_rows} pairs. + + Returns + ------- + None + """ + cdef vector[partition_info] c_partions + cdef PartitionInfo partition + + c_partions.reserve(len(partitions)) + for partition in partitions: + c_partions.push_back(partition.c_obj) + + self.c_obj.set_partitions(c_partions) + + cpdef void set_column_chunks_file_paths(self, list file_paths): + """ + Sets column chunks file path to be set in the raw output metadata. + + Parameters + ---------- + file_paths : list[str] + Vector of strings which indicate file paths. + + Returns + ------- + None + """ + self.c_obj.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) + + cpdef void set_row_group_size_bytes(self, size_t size_bytes): + """ + Sets the maximum row group size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum row group size, in bytes to set + + Returns + ------- + None + """ + self.c_obj.set_row_group_size_bytes(size_bytes) + + cpdef void set_row_group_size_rows(self, size_type size_rows): + """ + Sets the maximum row group size, in rows. + + Parameters + ---------- + size_rows : int + Maximum row group size, in rows to set + + Returns + ------- + None + """ + self.c_obj.set_row_group_size_rows(size_rows) + + cpdef void set_max_page_size_bytes(self, size_t size_bytes): + """ + Sets the maximum uncompressed page size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum uncompressed page size, in bytes to set + + Returns + ------- + None + """ + self.c_obj.set_max_page_size_bytes(size_bytes) + + cpdef void set_max_page_size_rows(self, size_type size_rows): + """ + Sets the maximum page size, in rows. + + Parameters + ---------- + size_rows : int + Maximum page size, in rows to set. + + Returns + ------- + None + """ + self.c_obj.set_max_page_size_rows(size_rows) + + cpdef void set_max_dictionary_size(self, size_t size_bytes): + """ + Sets the maximum dictionary size, in bytes. + + Parameters + ---------- + size_bytes : int + Sets the maximum dictionary size, in bytes. + + Returns + ------- + None + """ + self.c_obj.set_max_dictionary_size(size_bytes) + + +cdef class ParquetWriterOptionsBuilder: + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata): + """ + Sets metadata. + + Parameters + ---------- + metadata : TableInputMetadata + Associated metadata + + Returns + ------- + Self + """ + self.c_obj.metadata(metadata.c_obj) + return self + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + metadata : list[dict[str, str]] + Key-Value footer metadata + + Returns + ------- + Self + """ + self.c_obj.key_value_metadata( + [ + {key.encode(): value.encode() for key, value in mapping.items()} + for mapping in metadata + ] + ) + return self + + cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression): + """ + Sets compression type. + + Parameters + ---------- + compression : CompressionType + The compression type to use + + Returns + ------- + Self + """ + self.c_obj.compression(compression) + return self + + cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf): + """ + Sets the level of statistics. + + Parameters + ---------- + sf : StatisticsFreq + Level of statistics requested in the output file + + Returns + ------- + Self + """ + self.c_obj.stats_level(sf) + return self + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled): + """ + Sets whether timestamps are written as int96 or timestamp micros. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable int96 timestamps + + Returns + ------- + Self + """ + self.c_obj.int96_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled): + """ + Set to true to write V2 page headers, otherwise false to write V1 page headers. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of V2 page headers. + + Returns + ------- + Self + """ + self.c_obj.write_v2_headers(enabled) + return self + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy_t val): + """ + Sets the policy for dictionary use. + + Parameters + ---------- + val : DictionaryPolicy + Policy for dictionary use. + + Returns + ------- + Self + """ + self.c_obj.dictionary_policy(val) + return self + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled): + """ + Set to true if timestamps are to be written as UTC. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of timestamps as UTC. + + Returns + ------- + Self + """ + self.c_obj.utc_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled): + """ + Set to true if arrow schema is to be written. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of arrow schema. + + Returns + ------- + Self + """ + self.c_obj.write_arrow_schema(enabled) + return self + + cpdef ParquetWriterOptions build(self): + """ + Create a ParquetWriterOptions from the set options. + + Returns + ------- + ParquetWriterOptions + """ + cdef ParquetWriterOptions parquet_options = ParquetWriterOptions.__new__( + ParquetWriterOptions + ) + parquet_options.c_obj = move(self.c_obj.build()) + parquet_options.table_ref = self.table_ref + parquet_options.sink_ref = self.sink_ref + return parquet_options + + +cpdef memoryview write_parquet(ParquetWriterOptions options): + """ + Writes a set of columns to parquet format. + + Parameters + ---------- + options : ParquetWriterOptions + Settings for controlling writing behavior + + Returns + ------- + memoryview + A blob that contains the file metadata + (parquet FileMetadata thrift message) if requested in + parquet_writer_options (empty blob otherwise). + """ + cdef parquet_writer_options c_options = options.c_obj + cdef unique_ptr[vector[uint8_t]] c_result + + with nogil: + c_result = cpp_write_parquet(c_options) + + return memoryview(HostBuffer.from_unique_ptr(move(c_result))) diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 0ab28cb0973..90b43cf0ff5 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -1,4 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint8_t, int32_t +from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector from pylibcudf.libcudf.io.data_sink cimport data_sink @@ -18,9 +20,46 @@ from pylibcudf.libcudf.io.types cimport ( table_metadata, table_with_metadata, ) +from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table +cdef class PartitionInfo: + cdef partition_info c_obj + +cdef class ColumnInMetadata: + cdef column_in_metadata c_obj + + @staticmethod + cdef ColumnInMetadata from_metadata(column_in_metadata metadata) + + cpdef ColumnInMetadata set_name(self, str name) + + cpdef ColumnInMetadata set_name(self, str name) + + cpdef ColumnInMetadata set_nullability(self, bool nullable) + + cpdef ColumnInMetadata set_list_column_as_map(self) + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req) + + cpdef ColumnInMetadata set_decimal_precision(self, uint8_t req) + + cpdef ColumnInMetadata child(self, size_type i) + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary) + + cpdef ColumnInMetadata set_type_length(self, int32_t type_length) + + cpdef ColumnInMetadata set_skip_compression(self, bool skip) + + cpdef ColumnInMetadata set_encoding(self, column_encoding encoding) + + cpdef str get_name(self) + +cdef class TableInputMetadata: + cdef table_input_metadata c_obj + cdef class TableWithMetadata: cdef public Table tbl cdef table_metadata metadata diff --git a/python/pylibcudf/pylibcudf/io/types.pyi b/python/pylibcudf/pylibcudf/io/types.pyi index a4f4fc13bdc..04f276cfeee 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyi +++ b/python/pylibcudf/pylibcudf/io/types.pyi @@ -59,6 +59,25 @@ class QuoteStyle(IntEnum): ColumnNameSpec: TypeAlias = tuple[str, list[ColumnNameSpec]] ChildNameSpec: TypeAlias = Mapping[str, ChildNameSpec] +class PartitionInfo: + def __init__(self, start_row: int, num_rows: int): ... + +class TableInputMetadata: + def __init__(self, table: Table): ... + +class ColumnInMetadata: + def set_name(self, name: str) -> ColumnInMetadata: ... + def set_nullability(self, nullable: bool) -> ColumnInMetadata: ... + def set_list_column_as_map(self) -> ColumnInMetadata: ... + def set_int96_timestamps(self, req: bool) -> ColumnInMetadata: ... + def set_decimal_precision(self, precision: int) -> ColumnInMetadata: ... + def child(self, i: int) -> ColumnInMetadata: ... + def set_output_as_binary(self, binary: bool) -> ColumnInMetadata: ... + def set_type_length(self, type_length: int) -> ColumnInMetadata: ... + def set_skip_compression(self, skip: bool) -> ColumnInMetadata: ... + def set_encoding(self, encoding: ColumnEncoding) -> ColumnInMetadata: ... + def get_name(self) -> str: ... + class TableWithMetadata: tbl: Table def __init__( diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 51d5bda75c7..460ab6844c3 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -2,6 +2,8 @@ from cpython.buffer cimport PyBUF_READ from cpython.memoryview cimport PyMemoryView_FromMemory +from libc.stdint cimport uint8_t, int32_t +from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move @@ -10,11 +12,16 @@ from pylibcudf.io.datasource cimport Datasource from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.datasource cimport datasource from pylibcudf.libcudf.io.types cimport ( + column_encoding, + column_in_metadata, column_name_info, host_buffer, + partition_info, source_info, + table_input_metadata, table_with_metadata, ) +from pylibcudf.libcudf.types cimport size_type import codecs import errno @@ -37,13 +44,233 @@ __all__ = [ "CompressionType", "DictionaryPolicy", "JSONRecoveryMode", + "PartitionInfo", "QuoteStyle", "SinkInfo", "SourceInfo", "StatisticsFreq", + "TableInputMetadata", "TableWithMetadata", ] +cdef class PartitionInfo: + """ + Information used while writing partitioned datasets. + + Parameters + ---------- + start_row : int + The start row of the partition. + + num_rows : int + The number of rows in the partition. + """ + def __init__(self, size_type start_row, size_type num_rows): + self.c_obj = partition_info(start_row, num_rows) + + +cdef class ColumnInMetadata: + """ + Metadata for a column + """ + + @staticmethod + cdef ColumnInMetadata from_metadata(column_in_metadata metadata): + """ + Construct a ColumnInMetadata. + + Parameters + ---------- + metadata : column_in_metadata + """ + cdef ColumnInMetadata col_metadata = ColumnInMetadata.__new__(ColumnInMetadata) + col_metadata.c_obj = metadata + return col_metadata + + cpdef ColumnInMetadata set_name(self, str name): + """ + Set the name of this column. + + Parameters + ---------- + name : str + Name of the column + + Returns + ------- + Self + """ + self.c_obj.set_name(name.encode()) + return self + + cpdef ColumnInMetadata set_nullability(self, bool nullable): + """ + Set the nullability of this column. + + Parameters + ---------- + nullable : bool + Whether this column is nullable + + Returns + ------- + Self + """ + self.c_obj.set_nullability(nullable) + return self + + cpdef ColumnInMetadata set_list_column_as_map(self): + """ + Specify that this list column should be encoded as a map in the + written file. + + Returns + ------- + Self + """ + self.c_obj.set_list_column_as_map() + return self + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req): + """ + Specifies whether this timestamp column should be encoded using + the deprecated int96. + + Parameters + ---------- + req : bool + True = use int96 physical type. False = use int64 physical type. + + Returns + ------- + Self + """ + self.c_obj.set_int96_timestamps(req) + return self + + cpdef ColumnInMetadata set_decimal_precision(self, uint8_t precision): + """ + Set the decimal precision of this column. + Only valid if this column is a decimal (fixed-point) type. + + Parameters + ---------- + precision : int + The integer precision to set for this decimal column + + Returns + ------- + Self + """ + self.c_obj.set_decimal_precision(precision) + return self + + cpdef ColumnInMetadata child(self, size_type i): + """ + Get reference to a child of this column. + + Parameters + ---------- + i : int + Index of the child to get. + + Returns + ------- + ColumnInMetadata + """ + return ColumnInMetadata.from_metadata(self.c_obj.child(i)) + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary): + """ + Specifies whether this column should be written as binary or string data. + + Parameters + ---------- + binary : bool + True = use binary data type. False = use string data type + + Returns + ------- + Self + """ + self.c_obj.set_output_as_binary(binary) + return self + + cpdef ColumnInMetadata set_type_length(self, int32_t type_length): + """ + Sets the length of fixed length data. + + Parameters + ---------- + type_length : int + Size of the data type in bytes + + Returns + ------- + Self + """ + self.c_obj.set_type_length(type_length) + return self + + cpdef ColumnInMetadata set_skip_compression(self, bool skip): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + skip : bool + If `true` do not compress this column + + Returns + ------- + Self + """ + self.c_obj.set_skip_compression(skip) + return self + + cpdef ColumnInMetadata set_encoding(self, column_encoding encoding): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + encoding : ColumnEncoding + The encoding to use + + Returns + ------- + ColumnInMetadata + """ + self.c_obj.set_encoding(encoding) + return self + + cpdef str get_name(self): + """ + Get the name of this column. + + Returns + ------- + str + The name of this column + """ + return self.c_obj.get_name().decode() + + +cdef class TableInputMetadata: + """ + Metadata for a table + + Parameters + ---------- + table : Table + The Table to construct metadata for + """ + def __init__(self, Table table): + self.c_obj = table_input_metadata(table.view()) + + cdef class TableWithMetadata: """A container holding a table and its associated metadata (e.g. column names) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index 110c9d4a0b9..e03fe7e921e 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -156,7 +156,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: table_input_metadata m ) except +libcudf_exception_handler BuilderT& key_value_metadata( - vector[map[string, string]] kvm + vector[map[string, string]] metadata ) except +libcudf_exception_handler BuilderT& stats_level( statistics_freq sf @@ -189,7 +189,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_t val ) except +libcudf_exception_handler BuilderT& write_v2_headers( - bool val + bool enabled ) except +libcudf_exception_handler BuilderT& dictionary_policy( dictionary_policy val @@ -212,7 +212,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except +libcudf_exception_handler cdef unique_ptr[vector[uint8_t]] write_parquet( - parquet_writer_options args + parquet_writer_options options ) except +libcudf_exception_handler cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 41298601539..94524acbcc8 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -1,4 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import io + import pyarrow as pa import pyarrow.compute as pc import pytest @@ -107,3 +109,70 @@ def test_read_parquet_filters( # ^^^ This one is not tested since it's not in pyarrow/pandas, deprecate? # bool convert_strings_to_categories = False, # bool use_pandas_metadata = True + + +@pytest.mark.parametrize("write_v2_headers", [True, False]) +@pytest.mark.parametrize("utc_timestamps", [True, False]) +@pytest.mark.parametrize("write_arrow_schema", [True, False]) +@pytest.mark.parametrize( + "partitions", + [None, [plc.io.types.PartitionInfo(0, 10)]], +) +@pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]]) +@pytest.mark.parametrize("row_group_size_bytes", [None, 1024]) +@pytest.mark.parametrize("row_group_size_rows", [None, 1]) +@pytest.mark.parametrize("max_page_size_bytes", [None, 1024]) +@pytest.mark.parametrize("max_page_size_rows", [None, 1]) +@pytest.mark.parametrize("max_dictionary_size", [None, 100]) +def test_write_parquet( + table_data, + write_v2_headers, + utc_timestamps, + write_arrow_schema, + partitions, + column_chunks_file_paths, + row_group_size_bytes, + row_group_size_rows, + max_page_size_bytes, + max_page_size_rows, + max_dictionary_size, +): + _, pa_table = table_data + if len(pa_table) == 0 and partitions is not None: + pytest.skip("https://github.com/rapidsai/cudf/issues/17361") + plc_table = plc.interop.from_arrow(pa_table) + table_meta = plc.io.types.TableInputMetadata(plc_table) + sink = plc.io.SinkInfo([io.BytesIO()]) + user_data = [{"foo": "{'bar': 'baz'}"}] + compression = plc.io.types.CompressionType.SNAPPY + stats_level = plc.io.types.StatisticsFreq.STATISTICS_COLUMN + dictionary_policy = plc.io.types.DictionaryPolicy.ADAPTIVE + options = ( + plc.io.parquet.ParquetWriterOptions.builder(sink, plc_table) + .metadata(table_meta) + .key_value_metadata(user_data) + .compression(compression) + .stats_level(stats_level) + .write_v2_headers(write_v2_headers) + .dictionary_policy(dictionary_policy) + .utc_timestamps(utc_timestamps) + .write_arrow_schema(write_arrow_schema) + .build() + ) + if partitions is not None: + options.set_partitions(partitions) + if column_chunks_file_paths is not None: + options.set_column_chunks_file_paths(column_chunks_file_paths) + if row_group_size_bytes is not None: + options.set_row_group_size_bytes(row_group_size_bytes) + if row_group_size_rows is not None: + options.set_row_group_size_rows(row_group_size_rows) + if max_page_size_bytes is not None: + options.set_max_page_size_bytes(max_page_size_bytes) + if max_page_size_rows is not None: + options.set_max_page_size_rows(max_page_size_rows) + if max_dictionary_size is not None: + options.set_max_dictionary_size(max_dictionary_size) + + result = plc.io.parquet.write_parquet(options) + assert isinstance(result, memoryview)