From ff0d51f787b99020e910307f473056c4779b7a47 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:19:56 -0800 Subject: [PATCH 01/23] Add writer, supporting objects, and tests; compilation passes --- python/pylibcudf/pylibcudf/io/parquet.pxd | 75 +++- python/pylibcudf/pylibcudf/io/parquet.pyx | 384 +++++++++++++++++- python/pylibcudf/pylibcudf/io/types.pxd | 41 ++ python/pylibcudf/pylibcudf/io/types.pyx | 232 +++++++++++ .../pylibcudf/libcudf/io/parquet.pxd | 20 +- .../pylibcudf/tests/io/test_parquet.py | 90 ++++ 6 files changed, 827 insertions(+), 15 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 9c476030ded..165eaa49270 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -1,14 +1,26 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport ( + compression_type, + dictionary_policy, + statistics_freq, + SinkInfo, + SourceInfo, + TableInputMetadata, + TableWithMetadata, +) from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, + parquet_writer_options, + parquet_writer_options_builder, ) from pylibcudf.libcudf.types cimport size_type +from pylibcudf.table cimport Table from pylibcudf.types cimport DataType @@ -33,3 +45,62 @@ cpdef read_parquet( # ReaderColumnSchema reader_column_schema = *, # DataType timestamp_type = * ) + +cdef class ParquetWriterOptions: + cdef parquet_writer_options options + + @staticmethod + cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table) + + cpdef void set_partitions(self, list partitions) + + cpdef void set_column_chunks_file_paths(self, list file_paths) + + cpdef void set_row_group_size_bytes(self, int size_bytes) + + cpdef void set_row_group_size_rows(self, int size_rows) + + cpdef void set_max_page_size_bytes(self, int size_bytes) + + cpdef void set_max_page_size_rows(self, int size_rows) + + cpdef void set_max_dictionary_size(self, int size_rows) + +cdef class ParquetWriterOptionsBuilder: + cdef parquet_writer_options_builder builder + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata) + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata) + + cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression) + + cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf) + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy val) + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled) + + cpdef ParquetWriterOptions build(self) + + +cdef class BufferArrayFromVector: + cdef Py_ssize_t length + cdef unique_ptr[vector[uint8_t]] in_vec + + # these two things declare part of the buffer interface + cdef Py_ssize_t shape[1] + cdef Py_ssize_t strides[1] + + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ) + +cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 981ca7b8159..19ffedfd5fa 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -1,20 +1,36 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from cython.operator cimport dereference -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport ( + SinkInfo, + SourceInfo, + PartitionInfo, + TableInputMetadata, + TableWithMetadata +) from pylibcudf.libcudf.expressions cimport expression from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, parquet_reader_options, read_parquet as cpp_read_parquet, + write_parquet as cpp_write_parquet, + parquet_writer_options, +) +from pylibcudf.libcudf.io.types cimport ( + compression_type, + dictionary_policy as dictionary_policy_t, + partition_info, + statistics_freq, + table_with_metadata, ) -from pylibcudf.libcudf.io.types cimport table_with_metadata from pylibcudf.libcudf.types cimport size_type +from pylibcudf.table cimport Table cdef parquet_reader_options _setup_parquet_reader_options( @@ -217,3 +233,365 @@ cpdef read_parquet( c_result = move(cpp_read_parquet(opts)) return TableWithMetadata.from_libcudf(c_result) + + +cdef class ParquetWriterOptions: + + @staticmethod + cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table): + """ + Create builder to create ParquetWriterOptionsBuilder. + + Parameters + ---------- + sink : SinkInfo + The sink used for writer output + + table : Table + Table to be written to output + + Returns + ------- + ParquetWriterOptionsBuilder + """ + cdef ParquetWriterOptionsBuilder bldr = ParquetWriterOptionsBuilder.__new__( + ParquetWriterOptionsBuilder + ) + bldr.builder = parquet_writer_options.builder(sink.c_obj, table.view()) + return bldr + + cpdef void set_partitions(self, list partitions): + """ + Sets partitions. + + Parameters + ---------- + partitions : list[Partitions] + Partitions of input table in {start_row, num_rows} pairs. + + Returns + ------- + None + """ + cdef vector[partition_info] c_partions + cdef PartitionInfo partition + + c_partions.reserve(len(partitions)) + for partition in partitions: + c_partions.push_back(partition.c_obj) + + self.options.set_partitions(c_partions) + + cpdef void set_column_chunks_file_paths(self, list file_paths): + """ + Sets column chunks file path to be set in the raw output metadata. + + Parameters + ---------- + file_paths : list[str] + Vector of Strings which indicates file path. + + Returns + ------- + None + """ + self.options.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) + + cpdef void set_row_group_size_bytes(self, int size_bytes): + """ + Sets the maximum row group size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum row group size, in bytes to set + + Returns + ------- + None + """ + self.options.set_row_group_size_bytes(size_bytes) + + cpdef void set_row_group_size_rows(self, int size_rows): + """ + Sets the maximum row group size, in rows. + + Parameters + ---------- + size_rows : int + Maximum row group size, in rows to set + + Returns + ------- + None + """ + self.options.set_row_group_size_rows(size_rows) + + cpdef void set_max_page_size_bytes(self, int size_bytes): + """ + Sets the maximum uncompressed page size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum uncompressed page size, in bytes to set + + Returns + ------- + None + """ + self.options.set_max_page_size_bytes(size_bytes) + + cpdef void set_max_page_size_rows(self, int size_rows): + """ + Sets the maximum page size, in rows. + + Parameters + ---------- + size_rows : int + Maximum page size, in rows to set. + + Returns + ------- + None + """ + self.options.set_max_page_size_rows(size_rows) + + cpdef void set_max_dictionary_size(self, int size_rows): + """ + Sets the maximum dictionary size, in bytes. + + Parameters + ---------- + size_rows : int + Sets the maximum dictionary size, in bytes.. + + Returns + ------- + None + """ + self.options.set_max_dictionary_size(size_rows) + + +cdef class ParquetWriterOptionsBuilder: + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata): + """ + Sets metadata. + + Parameters + ---------- + metadata : TableInputMetadata + Associated metadata + + Returns + ------- + Self + """ + self.builder.metadata(metadata.c_obj) + return self + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + metadata : list[dict[str, str]] + Key-Value footer metadata + + Returns + ------- + Self + """ + self.builder.key_value_metadata( + [ + {key.encode(): value.encode() for key, value in mapping.items()} + for mapping in metadata + ] + ) + return self + + cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + compression : CompressionType + The compression type to use + + Returns + ------- + Self + """ + self.builder.compression(compression) + return self + + cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf): + """ + Sets the level of statistics. + + Parameters + ---------- + sf : StatisticsFreq + Level of statistics requested in the output file + + Returns + ------- + Self + """ + self.builder.stats_level(sf) + return self + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled): + """ + Sets whether int96 timestamps are written or not. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable int96 timestamps + + Returns + ------- + Self + """ + self.builder.int96_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled): + """ + Set to true if V2 page headers are to be written. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of V2 page headers. + + Returns + ------- + Self + """ + self.builder.write_v2_headers(enabled) + return self + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy_t val): + """ + Sets the policy for dictionary use. + + Parameters + ---------- + val : DictionaryPolicy + Policy for dictionary use. + + Returns + ------- + Self + """ + self.builder.dictionary_policy(val) + return self + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled): + """ + Set to true if timestamps are to be written as UTC. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of timestamps as UTC. + + Returns + ------- + Self + """ + self.builder.utc_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled): + """ + Set to true if arrow schema is to be written. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of arrow schema. + + Returns + ------- + Self + """ + self.builder.write_arrow_schema(enabled) + return self + + cpdef ParquetWriterOptions build(self): + """ + Options member once it's built + + Returns + ------- + ParquetWriterOptions + """ + cdef ParquetWriterOptions parquet_options = ParquetWriterOptions.__new__( + ParquetWriterOptions + ) + parquet_options.options = move(self.builder.build()) + return parquet_options + + +cdef class BufferArrayFromVector: + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ): + cdef BufferArrayFromVector buf = BufferArrayFromVector() + buf.in_vec = move(in_vec) + buf.length = dereference(buf.in_vec).size() + return buf + + def __getbuffer__(self, Py_buffer *buffer, int flags): + cdef Py_ssize_t itemsize = sizeof(uint8_t) + + self.shape[0] = self.length + self.strides[0] = 1 + + buffer.buf = dereference(self.in_vec).data() + + buffer.format = NULL # byte + buffer.internal = NULL + buffer.itemsize = itemsize + buffer.len = self.length * itemsize # product(shape) * itemsize + buffer.ndim = 1 + buffer.obj = self + buffer.readonly = 0 + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + + def __releasebuffer__(self, Py_buffer *buffer): + pass + + +cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options): + """ + Writes a set of columns to parquet format. + + Parameters + ---------- + options : ParquetWriterOptions + Settings for controlling writing behavior + + Returns + ------- + BufferArrayFromVector + A blob that contains the file metadata + (parquet FileMetadata thrift message) if requested in + parquet_writer_options (empty blob otherwise). + """ + cdef parquet_writer_options c_options = options.options + cdef unique_ptr[vector[uint8_t]] c_result + + with nogil: + c_result = cpp_write_parquet(c_options) + + return BufferArrayFromVector.from_unique_ptr(move(c_result)) diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 0ab28cb0973..50862ef69e4 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -1,4 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector from pylibcudf.libcudf.io.data_sink cimport data_sink @@ -21,6 +22,46 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.table cimport Table +cdef class PartitionInfo: + cdef partition_info c_obj + + @staticmethod + cdef PartitionInfo from_start_and_num(int start_row, int num_rows) + +cdef class ColumnInMetadata: + cdef column_in_metadata c_obj + + @staticmethod + cdef ColumnInMetadata from_metadata(column_in_metadata metadata) + + cpdef ColumnInMetadata set_name(self, str name) + + cpdef ColumnInMetadata set_name(self, str name) + + cpdef ColumnInMetadata set_nullability(self, bool nullable) + + cpdef ColumnInMetadata set_list_column_as_map(self) + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req) + + cpdef ColumnInMetadata set_decimal_precision(self, int req) + + cpdef ColumnInMetadata child(self, int i) + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary) + + cpdef ColumnInMetadata set_type_length(self, int type_length) + + cpdef ColumnInMetadata set_skip_compression(self, bool skip) + + cpdef ColumnInMetadata set_encoding(self, column_encoding encoding) + + cpdef str get_name(self) + +cdef class TableInputMetadata: + cdef public Table table + cdef table_input_metadata c_obj + cdef class TableWithMetadata: cdef public Table tbl cdef table_metadata metadata diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 967d05e7057..5f57a856a10 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -2,6 +2,7 @@ from cpython.buffer cimport PyBUF_READ from cpython.memoryview cimport PyMemoryView_FromMemory +from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move @@ -10,9 +11,13 @@ from pylibcudf.io.datasource cimport Datasource from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.datasource cimport datasource from pylibcudf.libcudf.io.types cimport ( + column_encoding, + column_in_metadata, column_name_info, host_buffer, + partition_info, source_info, + table_input_metadata, table_with_metadata, ) @@ -30,6 +35,233 @@ from pylibcudf.libcudf.io.types import ( statistics_freq as StatisticsFreq, # no-cython-lint ) +cdef class PartitionInfo: + """ + Information used while writing partitioned datasets. + """ + @staticmethod + cdef PartitionInfo from_start_and_num(int start_row, int num_rows): + """ + Construct a PartitionInfo. + + Parameters + ---------- + start_row : int + The start row of the partition. + + num_rows : int + The number of rows in the partition. + """ + cdef PartitionInfo parition_info = PartitionInfo.__new__(PartitionInfo) + parition_info.c_obj = partition_info(start_row, num_rows) + return parition_info + + +cdef class ColumnInMetadata: + """ + Metadata for a column + """ + + @staticmethod + cdef ColumnInMetadata from_metadata(column_in_metadata metadata): + """ + Construct a ColumnInMetadata. + + Parameters + ---------- + metadata : column_in_metadata + """ + cdef ColumnInMetadata col_metadata = ColumnInMetadata.__new__(ColumnInMetadata) + col_metadata.c_obj = metadata + return col_metadata + + cpdef ColumnInMetadata set_name(self, str name): + """ + Set the name of this column. + + Parameters + ---------- + name : str + Name of the column + + Returns + ------- + Self + """ + self.c_obj.set_name(name.encode()) + return self + + cpdef ColumnInMetadata set_nullability(self, bool nullable): + """ + Set the nullability of this column. + + Parameters + ---------- + nullable : bool + Whether this column is nullable + + Returns + ------- + Self + """ + self.c_obj.set_nullability(nullable) + return self + + cpdef ColumnInMetadata set_list_column_as_map(self): + """ + Specify that this list column should be encoded as a map in the + written file. + + Returns + ------- + Self + """ + self.c_obj.set_list_column_as_map() + return self + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req): + """ + Specifies whether this timestamp column should be encoded using + the deprecated int96. + + Parameters + ---------- + req : bool + True = use int96 physical type. False = use int64 physical type. + + Returns + ------- + Self + """ + self.c_obj.set_int96_timestamps(req) + return self + + cpdef ColumnInMetadata set_decimal_precision(self, int precision): + """ + Set the decimal precision of this column. + Only valid if this column is a decimal (fixed-point) type. + + Parameters + ---------- + precision : int + The integer precision to set for this decimal column + + Returns + ------- + Self + """ + self.c_obj.set_decimal_precision(precision) + return self + + cpdef ColumnInMetadata child(self, int i): + """ + Get reference to a child of this column. + + Parameters + ---------- + i : int + Index of the child to get. + + Returns + ------- + ColumnInMetadata + """ + return ColumnInMetadata.from_metadata(self.c_obj.child(i)) + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary): + """ + Specifies whether this column should be written as binary or string data. + + Parameters + ---------- + binary : bool + True = use binary data type. False = use string data type + + Returns + ------- + Self + """ + self.c_obj.set_output_as_binary(binary) + return self + + cpdef ColumnInMetadata set_type_length(self, int type_length): + """ + Sets the length of fixed length data. + + Parameters + ---------- + type_length : int + Size of the data type in bytes + + Returns + ------- + Self + """ + self.c_obj.set_type_length(type_length) + return self + + cpdef ColumnInMetadata set_skip_compression(self, bool skip): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + skip : bool + If `true` do not compress this column + + Returns + ------- + Self + """ + self.c_obj.set_skip_compression(skip) + return self + + cpdef ColumnInMetadata set_encoding(self, column_encoding encoding): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + encoding : ColumnEncoding + The encoding to use + + Returns + ------- + ColumnInMetadata + """ + self.c_obj.set_encoding(encoding) + return self + + cpdef str get_name(self): + """ + Get the name of this column. + + Returns + ------- + str + The name of this column + """ + return self.c_obj.get_name().decode() + + +cdef class TableInputMetadata: + """ + Metadata for a table + + Parameters + ---------- + table : Table + The Table to construct metadata for + """ + def __init__(self, Table table): + self.c_obj = table_input_metadata(table.view()) + self.column_metadata = [ + ColumnInMetadata.from_metadata(metadata) + for metadata in self.c_obj.column_metadata + ] + cdef class TableWithMetadata: """A container holding a table and its associated metadata diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index de6a6c1e82d..557995d9db0 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -116,11 +116,11 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_utc_timestamps( bool enabled ) except + - void set_row_group_size_bytes(size_t val) except + - void set_row_group_size_rows(size_type val) except + - void set_max_page_size_bytes(size_t val) except + - void set_max_page_size_rows(size_type val) except + - void set_max_dictionary_size(size_t val) except + + void set_row_group_size_bytes(size_t size_bytes) except + + void set_row_group_size_rows(size_type size_rows) except + + void set_max_page_size_bytes(size_t size_bytes) except + + void set_max_page_size_rows(size_type size_rows) except + + void set_max_dictionary_size(size_t size_bytes) except + void enable_write_v2_headers(bool val) except + void enable_write_arrow_schema(bool val) except + void set_dictionary_policy(dictionary_policy policy) except + @@ -133,7 +133,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: vector[partition_info] partitions ) except + void set_column_chunks_file_paths( - vector[string] column_chunks_file_paths + vector[string] file_paths ) except + @staticmethod @@ -146,10 +146,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder_base() except + BuilderT& metadata( - table_input_metadata m + table_input_metadata metadata ) except + BuilderT& key_value_metadata( - vector[map[string, string]] kvm + vector[map[string, string]] metadata ) except + BuilderT& stats_level( statistics_freq sf @@ -182,7 +182,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_t val ) except + BuilderT& write_v2_headers( - bool val + bool enabled ) except + BuilderT& dictionary_policy( dictionary_policy val @@ -205,7 +205,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except + cdef unique_ptr[vector[uint8_t]] write_parquet( - parquet_writer_options args + parquet_writer_options options ) except + cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 41298601539..cc73c464d78 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -1,4 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import io + import pyarrow as pa import pyarrow.compute as pc import pytest @@ -107,3 +109,91 @@ def test_read_parquet_filters( # ^^^ This one is not tested since it's not in pyarrow/pandas, deprecate? # bool convert_strings_to_categories = False, # bool use_pandas_metadata = True + + +@pytest.mark.parametrize( + "compression", + [ + plc.io.types.CompressionType.NONE, + plc.io.types.CompressionType.GZIP, + ], +) +@pytest.mark.parametrize( + "stats_level", + [ + plc.io.types.StatisticsFreq.STATISTICS_NONE, + plc.io.types.StatisticsFreq.STATISTICS_COLUMN, + ], +) +@pytest.mark.parametrize("int96_timestamps", [True, False]) +@pytest.mark.parametrize("write_v2_headers", [True, False]) +@pytest.mark.parametrize( + "dictionary_policy", + [ + plc.io.types.DictionaryPolicy.ADAPTIVE, + plc.io.types.DictionaryPolicy.NEVER, + ], +) +@pytest.mark.parametrize("utc_timestamps", [True, False]) +@pytest.mark.parametrize("write_arrow_schema", [True, False]) +@pytest.mark.parametrize( + "partitions", + [None, [plc.io.types.PartitionInfo.from_start_and_num(0, 10)]], +) +@pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]]) +@pytest.mark.parametrize("row_group_size_bytes", [None, 100]) +@pytest.mark.parametrize("row_group_size_rows", [None, 1]) +@pytest.mark.parametrize("max_page_size_bytes", [None, 100]) +@pytest.mark.parametrize("max_page_size_rows", [None, 1]) +@pytest.mark.parametrize("max_dictionary_size", [None, 100]) +def test_write_parquet( + table_data, + compression, + stats_level, + int96_timestamps, + write_v2_headers, + dictionary_policy, + utc_timestamps, + write_arrow_schema, + partitions, + column_chunks_file_paths, + row_group_size_bytes, + row_group_size_rows, + max_page_size_bytes, + max_page_size_rows, + max_dictionary_size, +): + plc_table, _ = table_data + table_meta = plc.io.types.TableInputMetadata(plc_table) + sink = plc.io.SinkInfo([io.BytesIO()]) + user_data = [{"foo": "{'bar': 'baz'}"}] + options = ( + plc.io.parquet.ParquetWriterOptions.builder(sink, plc_table) + .metadata(table_meta) + .key_value_metadata(user_data) + .compression(compression) + .stats_level(stats_level) + .int96_timestamps(int96_timestamps) + .write_v2_headers(write_v2_headers) + .dictionary_policy(dictionary_policy) + .utc_timestamps(utc_timestamps) + .write_arrow_schema(write_arrow_schema) + .build() + ) + if partitions is not None: + options.set_partitions(partitions) + if column_chunks_file_paths is not None: + options.set_column_chunks_file_paths(column_chunks_file_paths) + if row_group_size_bytes is not None: + options.set_row_group_size_bytes(row_group_size_bytes) + if row_group_size_rows is not None: + options.set_row_group_size_rows(row_group_size_rows) + if max_page_size_bytes is not None: + options.set_max_page_size_bytes(max_page_size_bytes) + if max_page_size_rows is not None: + options.set_max_page_size_rows(max_page_size_rows) + if max_dictionary_size is not None: + options.set_max_dictionary_size(max_dictionary_size) + + result = plc.io.parquet.write_parquet(options) + assert isinstance(result, plc.io.parquet.BufferArrayFromVector) From bb2c258b57d6a930569616448e5efdb651be5e95 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 6 Nov 2024 18:12:36 -0800 Subject: [PATCH 02/23] Add fix test, add python method for construction --- python/pylibcudf/pylibcudf/io/types.pyx | 5 +++++ python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 5f57a856a10..26319330be4 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -39,6 +39,11 @@ cdef class PartitionInfo: """ Information used while writing partitioned datasets. """ + + @staticmethod + def from_start_and_rows(int start_row, int num_rows): + return PartitionInfo.from_start_and_num(start_row, num_rows) + @staticmethod cdef PartitionInfo from_start_and_num(int start_row, int num_rows): """ diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index cc73c464d78..ef4986496e2 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -138,7 +138,7 @@ def test_read_parquet_filters( @pytest.mark.parametrize("write_arrow_schema", [True, False]) @pytest.mark.parametrize( "partitions", - [None, [plc.io.types.PartitionInfo.from_start_and_num(0, 10)]], + [None, [plc.io.types.PartitionInfo.from_start_and_rows(0, 10)]], ) @pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]]) @pytest.mark.parametrize("row_group_size_bytes", [None, 100]) @@ -163,7 +163,8 @@ def test_write_parquet( max_page_size_rows, max_dictionary_size, ): - plc_table, _ = table_data + _, pa_table = table_data + plc_table = plc.interop.from_arrow(pa_table) table_meta = plc.io.types.TableInputMetadata(plc_table) sink = plc.io.SinkInfo([io.BytesIO()]) user_data = [{"foo": "{'bar': 'baz'}"}] From 097decb103fa77340c4ddd41b49a1f8e8577a104 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 14 Nov 2024 16:22:35 -0800 Subject: [PATCH 03/23] Use HostBuffer, rename to c_obj --- python/pylibcudf/pylibcudf/io/parquet.pxd | 21 +---- python/pylibcudf/pylibcudf/io/parquet.pyx | 90 +++++++------------ python/pylibcudf/pylibcudf/io/types.pyx | 29 ++---- .../pylibcudf/tests/io/test_parquet.py | 2 +- 4 files changed, 46 insertions(+), 96 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 165eaa49270..cf73c89156f 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -4,6 +4,7 @@ from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector +from pylibcudf.contiguous_split cimport HostBuffer from pylibcudf.expressions cimport Expression from pylibcudf.io.types cimport ( compression_type, @@ -47,7 +48,7 @@ cpdef read_parquet( ) cdef class ParquetWriterOptions: - cdef parquet_writer_options options + cdef parquet_writer_options c_obj @staticmethod cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table) @@ -67,7 +68,7 @@ cdef class ParquetWriterOptions: cpdef void set_max_dictionary_size(self, int size_rows) cdef class ParquetWriterOptionsBuilder: - cdef parquet_writer_options_builder builder + cdef parquet_writer_options_builder c_obj cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata) @@ -89,18 +90,4 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) - -cdef class BufferArrayFromVector: - cdef Py_ssize_t length - cdef unique_ptr[vector[uint8_t]] in_vec - - # these two things declare part of the buffer interface - cdef Py_ssize_t shape[1] - cdef Py_ssize_t strides[1] - - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] in_vec - ) - -cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options) +cpdef HostBuffer write_parquet(ParquetWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 9e150969c04..5ac250181b5 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -6,6 +6,7 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from pylibcudf.contiguous_split cimport HostBuffer from pylibcudf.expressions cimport Expression from pylibcudf.io.types cimport ( SinkInfo, @@ -32,7 +33,13 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table -__all__ = ["ChunkedParquetReader", "read_parquet"] +__all__ = [ + "ChunkedParquetReader", + "ParquetWriterOptions", + "ParquetWriterOptionsBuilder", + "read_parquet", + "write_parquet" +] cdef parquet_reader_options _setup_parquet_reader_options( @@ -261,7 +268,7 @@ cdef class ParquetWriterOptions: cdef ParquetWriterOptionsBuilder bldr = ParquetWriterOptionsBuilder.__new__( ParquetWriterOptionsBuilder ) - bldr.builder = parquet_writer_options.builder(sink.c_obj, table.view()) + bldr.c_obj = parquet_writer_options.builder(sink.c_obj, table.view()) return bldr cpdef void set_partitions(self, list partitions): @@ -280,11 +287,11 @@ cdef class ParquetWriterOptions: cdef vector[partition_info] c_partions cdef PartitionInfo partition - c_partions.reserve(len(partitions)) + c_obj.reserve(len(partitions)) for partition in partitions: - c_partions.push_back(partition.c_obj) + c_obj.push_back(partition.c_obj) - self.options.set_partitions(c_partions) + self.c_obj.set_partitions(c_partions) cpdef void set_column_chunks_file_paths(self, list file_paths): """ @@ -299,7 +306,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) + self.c_obj.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) cpdef void set_row_group_size_bytes(self, int size_bytes): """ @@ -314,7 +321,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_row_group_size_bytes(size_bytes) + self.c_obj.set_row_group_size_bytes(size_bytes) cpdef void set_row_group_size_rows(self, int size_rows): """ @@ -329,7 +336,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_row_group_size_rows(size_rows) + self.c_obj.set_row_group_size_rows(size_rows) cpdef void set_max_page_size_bytes(self, int size_bytes): """ @@ -344,7 +351,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_max_page_size_bytes(size_bytes) + self.c_obj.set_max_page_size_bytes(size_bytes) cpdef void set_max_page_size_rows(self, int size_rows): """ @@ -359,7 +366,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_max_page_size_rows(size_rows) + self.c_obj.set_max_page_size_rows(size_rows) cpdef void set_max_dictionary_size(self, int size_rows): """ @@ -374,7 +381,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_max_dictionary_size(size_rows) + self.c_obj.set_max_dictionary_size(size_rows) cdef class ParquetWriterOptionsBuilder: @@ -392,7 +399,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.metadata(metadata.c_obj) + self.c_obj.metadata(metadata.c_obj) return self cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata): @@ -408,7 +415,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.key_value_metadata( + self.c_obj.key_value_metadata( [ {key.encode(): value.encode() for key, value in mapping.items()} for mapping in metadata @@ -429,7 +436,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.compression(compression) + self.c_obj.compression(compression) return self cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf): @@ -445,7 +452,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.stats_level(sf) + self.c_obj.stats_level(sf) return self cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled): @@ -461,7 +468,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.int96_timestamps(enabled) + self.c_obj.int96_timestamps(enabled) return self cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled): @@ -477,7 +484,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.write_v2_headers(enabled) + self.c_obj.write_v2_headers(enabled) return self cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy_t val): @@ -493,7 +500,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.dictionary_policy(val) + self.c_obj.dictionary_policy(val) return self cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled): @@ -509,7 +516,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.utc_timestamps(enabled) + self.c_obj.utc_timestamps(enabled) return self cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled): @@ -525,7 +532,7 @@ cdef class ParquetWriterOptionsBuilder: ------- Self """ - self.builder.write_arrow_schema(enabled) + self.c_obj.write_arrow_schema(enabled) return self cpdef ParquetWriterOptions build(self): @@ -539,44 +546,11 @@ cdef class ParquetWriterOptionsBuilder: cdef ParquetWriterOptions parquet_options = ParquetWriterOptions.__new__( ParquetWriterOptions ) - parquet_options.options = move(self.builder.build()) + parquet_options.c_obj = move(self.c_obj.build()) return parquet_options -cdef class BufferArrayFromVector: - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] in_vec - ): - cdef BufferArrayFromVector buf = BufferArrayFromVector() - buf.in_vec = move(in_vec) - buf.length = dereference(buf.in_vec).size() - return buf - - def __getbuffer__(self, Py_buffer *buffer, int flags): - cdef Py_ssize_t itemsize = sizeof(uint8_t) - - self.shape[0] = self.length - self.strides[0] = 1 - - buffer.buf = dereference(self.in_vec).data() - - buffer.format = NULL # byte - buffer.internal = NULL - buffer.itemsize = itemsize - buffer.len = self.length * itemsize # product(shape) * itemsize - buffer.ndim = 1 - buffer.obj = self - buffer.readonly = 0 - buffer.shape = self.shape - buffer.strides = self.strides - buffer.suboffsets = NULL - - def __releasebuffer__(self, Py_buffer *buffer): - pass - - -cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options): +cpdef HostBuffer write_parquet(ParquetWriterOptions options): """ Writes a set of columns to parquet format. @@ -587,15 +561,15 @@ cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options): Returns ------- - BufferArrayFromVector + pylibcudf.contiguous_split.HostBuffer A blob that contains the file metadata (parquet FileMetadata thrift message) if requested in parquet_writer_options (empty blob otherwise). """ - cdef parquet_writer_options c_options = options.options + cdef parquet_writer_options c_options = options.c_obj cdef unique_ptr[vector[uint8_t]] c_result with nogil: c_result = cpp_write_parquet(c_options) - return BufferArrayFromVector.from_unique_ptr(move(c_result)) + return HostBuffer.from_unique_ptr(move(c_result)) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index f0fc1d98318..20961fb97a8 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -52,28 +52,17 @@ __all__ = [ cdef class PartitionInfo: """ Information used while writing partitioned datasets. - """ - - @staticmethod - def from_start_and_rows(int start_row, int num_rows): - return PartitionInfo.from_start_and_num(start_row, num_rows) - - @staticmethod - cdef PartitionInfo from_start_and_num(int start_row, int num_rows): - """ - Construct a PartitionInfo. - Parameters - ---------- - start_row : int - The start row of the partition. + Parameters + ---------- + start_row : int + The start row of the partition. - num_rows : int - The number of rows in the partition. - """ - cdef PartitionInfo parition_info = PartitionInfo.__new__(PartitionInfo) - parition_info.c_obj = partition_info(start_row, num_rows) - return parition_info + num_rows : int + The number of rows in the partition. + """ + def __init__(int start_row, int num_rows): + self.c_obj = partition_info(start_row, num_rows) cdef class ColumnInMetadata: diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index ef4986496e2..1dcbd4b731b 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -138,7 +138,7 @@ def test_read_parquet_filters( @pytest.mark.parametrize("write_arrow_schema", [True, False]) @pytest.mark.parametrize( "partitions", - [None, [plc.io.types.PartitionInfo.from_start_and_rows(0, 10)]], + [None, [plc.io.types.PartitionInfo(0, 10)]], ) @pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]]) @pytest.mark.parametrize("row_group_size_bytes", [None, 100]) From 3133ee1967afc7af5221aab0439a49cbc172685f Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:23:04 -0800 Subject: [PATCH 04/23] fix up tests --- python/pylibcudf/pylibcudf/contiguous_split.pxd | 13 +++++++++++++ python/pylibcudf/pylibcudf/contiguous_split.pyx | 4 ---- python/pylibcudf/pylibcudf/io/parquet.pxd | 3 --- python/pylibcudf/pylibcudf/io/parquet.pyx | 6 +++--- python/pylibcudf/pylibcudf/io/types.pxd | 4 ---- python/pylibcudf/pylibcudf/io/types.pyx | 6 +----- python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 5 +---- 7 files changed, 18 insertions(+), 23 deletions(-) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 2a10cb5b3d5..8100d010fa3 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -1,12 +1,25 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint8_t from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.libcudf.contiguous_split cimport packed_columns from .gpumemoryview cimport gpumemoryview from .table cimport Table +cdef class HostBuffer: + cdef unique_ptr[vector[uint8_t]] c_obj + cdef size_t nbytes + cdef Py_ssize_t[1] shape + cdef Py_ssize_t[1] strides + + @staticmethod + cdef HostBuffer from_unique_ptr( + unique_ptr[vector[uint8_t]] vec + ) + cdef class PackedColumns: cdef unique_ptr[packed_columns] c_obj diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 94873e079c9..c4fddc26a37 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -29,10 +29,6 @@ __all__ = [ cdef class HostBuffer: """Owning host buffer that implements the buffer protocol""" - cdef unique_ptr[vector[uint8_t]] c_obj - cdef size_t nbytes - cdef Py_ssize_t[1] shape - cdef Py_ssize_t[1] strides @staticmethod cdef HostBuffer from_unique_ptr( diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index cf73c89156f..00d957cfacf 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -50,9 +50,6 @@ cpdef read_parquet( cdef class ParquetWriterOptions: cdef parquet_writer_options c_obj - @staticmethod - cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table) - cpdef void set_partitions(self, list partitions) cpdef void set_column_chunks_file_paths(self, list file_paths) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 5ac250181b5..470c776e510 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -249,7 +249,7 @@ cpdef read_parquet( cdef class ParquetWriterOptions: @staticmethod - cdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table): + def builder(SinkInfo sink, Table table): """ Create builder to create ParquetWriterOptionsBuilder. @@ -287,9 +287,9 @@ cdef class ParquetWriterOptions: cdef vector[partition_info] c_partions cdef PartitionInfo partition - c_obj.reserve(len(partitions)) + c_partions.reserve(len(partitions)) for partition in partitions: - c_obj.push_back(partition.c_obj) + c_partions.push_back(partition.c_obj) self.c_obj.set_partitions(c_partions) diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 50862ef69e4..7340697e1e7 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -25,9 +25,6 @@ from pylibcudf.table cimport Table cdef class PartitionInfo: cdef partition_info c_obj - @staticmethod - cdef PartitionInfo from_start_and_num(int start_row, int num_rows) - cdef class ColumnInMetadata: cdef column_in_metadata c_obj @@ -59,7 +56,6 @@ cdef class ColumnInMetadata: cpdef str get_name(self) cdef class TableInputMetadata: - cdef public Table table cdef table_input_metadata c_obj cdef class TableWithMetadata: diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 20961fb97a8..1ccb6145aff 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -61,7 +61,7 @@ cdef class PartitionInfo: num_rows : int The number of rows in the partition. """ - def __init__(int start_row, int num_rows): + def __init__(self, int start_row, int num_rows): self.c_obj = partition_info(start_row, num_rows) @@ -265,10 +265,6 @@ cdef class TableInputMetadata: """ def __init__(self, Table table): self.c_obj = table_input_metadata(table.view()) - self.column_metadata = [ - ColumnInMetadata.from_metadata(metadata) - for metadata in self.c_obj.column_metadata - ] cdef class TableWithMetadata: diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 1dcbd4b731b..c49de1cf618 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -125,7 +125,6 @@ def test_read_parquet_filters( plc.io.types.StatisticsFreq.STATISTICS_COLUMN, ], ) -@pytest.mark.parametrize("int96_timestamps", [True, False]) @pytest.mark.parametrize("write_v2_headers", [True, False]) @pytest.mark.parametrize( "dictionary_policy", @@ -150,7 +149,6 @@ def test_write_parquet( table_data, compression, stats_level, - int96_timestamps, write_v2_headers, dictionary_policy, utc_timestamps, @@ -174,7 +172,6 @@ def test_write_parquet( .key_value_metadata(user_data) .compression(compression) .stats_level(stats_level) - .int96_timestamps(int96_timestamps) .write_v2_headers(write_v2_headers) .dictionary_policy(dictionary_policy) .utc_timestamps(utc_timestamps) @@ -197,4 +194,4 @@ def test_write_parquet( options.set_max_dictionary_size(max_dictionary_size) result = plc.io.parquet.write_parquet(options) - assert isinstance(result, plc.io.parquet.BufferArrayFromVector) + assert isinstance(result, plc.contiguous_split.HostBuffer) From 14c45018cc980b4d5b66b68c64e68f8054a5974e Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:07:05 -0800 Subject: [PATCH 05/23] keep table and sink references alive --- python/pylibcudf/pylibcudf/io/parquet.pxd | 4 ++++ python/pylibcudf/pylibcudf/io/parquet.pyx | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 00d957cfacf..d2f907031cb 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -49,6 +49,8 @@ cpdef read_parquet( cdef class ParquetWriterOptions: cdef parquet_writer_options c_obj + cdef Table table_ref + cdef SinkInfo sink_ref cpdef void set_partitions(self, list partitions) @@ -66,6 +68,8 @@ cdef class ParquetWriterOptions: cdef class ParquetWriterOptionsBuilder: cdef parquet_writer_options_builder c_obj + cdef Table table_ref + cdef SinkInfo sink_ref cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 470c776e510..79a85f2853c 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -269,6 +269,8 @@ cdef class ParquetWriterOptions: ParquetWriterOptionsBuilder ) bldr.c_obj = parquet_writer_options.builder(sink.c_obj, table.view()) + bldr.table_ref = table + bldr.sink_ref = sink return bldr cpdef void set_partitions(self, list partitions): @@ -547,6 +549,8 @@ cdef class ParquetWriterOptionsBuilder: ParquetWriterOptions ) parquet_options.c_obj = move(self.c_obj.build()) + parquet_options.table_ref = self.table_ref + parquet_options.sink_ref = self.sink_ref return parquet_options From 46cbb46a1c6513316ee9db57e9964c67da460c8a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:08:47 -0800 Subject: [PATCH 06/23] Return memoryview --- python/pylibcudf/pylibcudf/contiguous_split.pxd | 13 ------------- python/pylibcudf/pylibcudf/contiguous_split.pyx | 4 ++++ python/pylibcudf/pylibcudf/io/parquet.pxd | 3 +-- python/pylibcudf/pylibcudf/io/parquet.pyx | 6 +++--- 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 8100d010fa3..2a10cb5b3d5 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -1,25 +1,12 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t from libcpp.memory cimport unique_ptr -from libcpp.vector cimport vector from pylibcudf.libcudf.contiguous_split cimport packed_columns from .gpumemoryview cimport gpumemoryview from .table cimport Table -cdef class HostBuffer: - cdef unique_ptr[vector[uint8_t]] c_obj - cdef size_t nbytes - cdef Py_ssize_t[1] shape - cdef Py_ssize_t[1] strides - - @staticmethod - cdef HostBuffer from_unique_ptr( - unique_ptr[vector[uint8_t]] vec - ) - cdef class PackedColumns: cdef unique_ptr[packed_columns] c_obj diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index c4fddc26a37..94873e079c9 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -29,6 +29,10 @@ __all__ = [ cdef class HostBuffer: """Owning host buffer that implements the buffer protocol""" + cdef unique_ptr[vector[uint8_t]] c_obj + cdef size_t nbytes + cdef Py_ssize_t[1] shape + cdef Py_ssize_t[1] strides @staticmethod cdef HostBuffer from_unique_ptr( diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index d2f907031cb..3f95e90e204 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -4,7 +4,6 @@ from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector -from pylibcudf.contiguous_split cimport HostBuffer from pylibcudf.expressions cimport Expression from pylibcudf.io.types cimport ( compression_type, @@ -91,4 +90,4 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) -cpdef HostBuffer write_parquet(ParquetWriterOptions options) +cpdef memoryview write_parquet(ParquetWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 79a85f2853c..7218eb4ab71 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -554,7 +554,7 @@ cdef class ParquetWriterOptionsBuilder: return parquet_options -cpdef HostBuffer write_parquet(ParquetWriterOptions options): +cpdef memoryview write_parquet(ParquetWriterOptions options): """ Writes a set of columns to parquet format. @@ -565,7 +565,7 @@ cpdef HostBuffer write_parquet(ParquetWriterOptions options): Returns ------- - pylibcudf.contiguous_split.HostBuffer + memoryview A blob that contains the file metadata (parquet FileMetadata thrift message) if requested in parquet_writer_options (empty blob otherwise). @@ -576,4 +576,4 @@ cpdef HostBuffer write_parquet(ParquetWriterOptions options): with nogil: c_result = cpp_write_parquet(c_options) - return HostBuffer.from_unique_ptr(move(c_result)) + return memoryview(HostBuffer.from_unique_ptr(move(c_result))) From 46db84e131abd40eeca3a377324886ea7b52772a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:09:21 -0800 Subject: [PATCH 07/23] Adjust test too --- python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index c49de1cf618..a2d5ac63d05 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -194,4 +194,4 @@ def test_write_parquet( options.set_max_dictionary_size(max_dictionary_size) result = plc.io.parquet.write_parquet(options) - assert isinstance(result, plc.contiguous_split.HostBuffer) + assert isinstance(result, memoryview) From efe24d465613085de6b95d64bb8e808861b44779 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:14:02 -0800 Subject: [PATCH 08/23] Add back contiguous split changes --- python/pylibcudf/pylibcudf/contiguous_split.pxd | 13 +++++++++++++ python/pylibcudf/pylibcudf/contiguous_split.pyx | 5 ----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 2a10cb5b3d5..8100d010fa3 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -1,12 +1,25 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint8_t from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.libcudf.contiguous_split cimport packed_columns from .gpumemoryview cimport gpumemoryview from .table cimport Table +cdef class HostBuffer: + cdef unique_ptr[vector[uint8_t]] c_obj + cdef size_t nbytes + cdef Py_ssize_t[1] shape + cdef Py_ssize_t[1] strides + + @staticmethod + cdef HostBuffer from_unique_ptr( + unique_ptr[vector[uint8_t]] vec + ) + cdef class PackedColumns: cdef unique_ptr[packed_columns] c_obj diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 94873e079c9..8341cc68fe9 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -29,11 +29,6 @@ __all__ = [ cdef class HostBuffer: """Owning host buffer that implements the buffer protocol""" - cdef unique_ptr[vector[uint8_t]] c_obj - cdef size_t nbytes - cdef Py_ssize_t[1] shape - cdef Py_ssize_t[1] strides - @staticmethod cdef HostBuffer from_unique_ptr( unique_ptr[vector[uint8_t]] vec From a0fdcfa71465af44cc8670c5513d389851500f72 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell <lmitchell@nvidia.com> Date: Tue, 19 Nov 2024 12:20:55 +0000 Subject: [PATCH 09/23] Allow construction of HostBuffer from nullptr Now that we use HostBuffer to take ownership of metadata from write_parquet, we must handle the case of a null pointer as input. --- python/pylibcudf/pylibcudf/contiguous_split.pyx | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 8341cc68fe9..13ceb994024 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -33,9 +33,10 @@ cdef class HostBuffer: cdef HostBuffer from_unique_ptr( unique_ptr[vector[uint8_t]] vec ): - cdef HostBuffer out = HostBuffer() + cdef HostBuffer out = HostBuffer.__new__(HostBuffer) + # Allow construction from nullptr + out.nbytes = 0 if vec.get() == NULL else dereference(vec).size() out.c_obj = move(vec) - out.nbytes = dereference(out.c_obj).size() out.shape[0] = out.nbytes out.strides[0] = 1 return out @@ -43,7 +44,8 @@ cdef class HostBuffer: __hash__ = None def __getbuffer__(self, Py_buffer *buffer, int flags): - buffer.buf = dereference(self.c_obj).data() + # Empty vec produces empty buffer + buffer.buf = NULL if self.nbytes == 0 else dereference(self.c_obj).data() buffer.format = NULL # byte buffer.internal = NULL buffer.itemsize = 1 From 4d802f27bbd2593da8477f67bca6430b90d19072 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell <lmitchell@nvidia.com> Date: Tue, 19 Nov 2024 12:22:20 +0000 Subject: [PATCH 10/23] Parquet writing does not support gzip compression --- python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index a2d5ac63d05..2fa0bff83db 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -115,7 +115,7 @@ def test_read_parquet_filters( "compression", [ plc.io.types.CompressionType.NONE, - plc.io.types.CompressionType.GZIP, + plc.io.types.CompressionType.SNAPPY, ], ) @pytest.mark.parametrize( From 91e847e81a844c5f32e5310cb2b8886a06b8da5f Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell <lmitchell@nvidia.com> Date: Tue, 19 Nov 2024 14:55:18 +0000 Subject: [PATCH 11/23] Use valid values for row_group/max_page_size_bytes --- python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 2fa0bff83db..875b6cef264 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -140,9 +140,9 @@ def test_read_parquet_filters( [None, [plc.io.types.PartitionInfo(0, 10)]], ) @pytest.mark.parametrize("column_chunks_file_paths", [None, ["tmp.parquet"]]) -@pytest.mark.parametrize("row_group_size_bytes", [None, 100]) +@pytest.mark.parametrize("row_group_size_bytes", [None, 1024]) @pytest.mark.parametrize("row_group_size_rows", [None, 1]) -@pytest.mark.parametrize("max_page_size_bytes", [None, 100]) +@pytest.mark.parametrize("max_page_size_bytes", [None, 1024]) @pytest.mark.parametrize("max_page_size_rows", [None, 1]) @pytest.mark.parametrize("max_dictionary_size", [None, 100]) def test_write_parquet( From f2a905eb82bde1d6fb27301cafed7635cff5ec15 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell <lmitchell@nvidia.com> Date: Tue, 19 Nov 2024 14:55:58 +0000 Subject: [PATCH 12/23] Skip zero-sized table and non-None partition info Seems to induce a bug in libcudf for now. --- python/pylibcudf/pylibcudf/tests/io/test_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 875b6cef264..0b368428f7b 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -162,6 +162,8 @@ def test_write_parquet( max_dictionary_size, ): _, pa_table = table_data + if len(pa_table) == 0 and partitions is not None: + pytest.skip("https://github.com/rapidsai/cudf/issues/17361") plc_table = plc.interop.from_arrow(pa_table) table_meta = plc.io.types.TableInputMetadata(plc_table) sink = plc.io.SinkInfo([io.BytesIO()]) From 010c1da8a2a3314b3700c3a314a1aeac6c84136c Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell <lmitchell@nvidia.com> Date: Tue, 19 Nov 2024 15:29:25 +0000 Subject: [PATCH 13/23] Add type stub information --- python/pylibcudf/pylibcudf/io/parquet.pyi | 46 ++++++++++++++++++++++- python/pylibcudf/pylibcudf/io/types.pyi | 6 +++ python/pylibcudf/pylibcudf/io/types.pyx | 2 + 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index bcf1d1cce09..fa34f2b938a 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -1,7 +1,20 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from collections.abc import Mapping +from typing import Self + from pylibcudf.expressions import Expression -from pylibcudf.io.types import SourceInfo, TableWithMetadata +from pylibcudf.io.types import ( + CompressionType, + DictionaryPolicy, + PartitionInfo, + SinkInfo, + SourceInfo, + StatisticsFreq, + TableInputMetadata, + TableWithMetadata, +) +from pylibcudf.table import Table class ChunkedParquetReader: def __init__( @@ -34,3 +47,34 @@ def read_parquet( # reader_column_schema: ReaderColumnSchema = *, # timestamp_type: DataType = * ) -> TableWithMetadata: ... + +class ParquetWriterOptions: + def __init__(self): ... + @staticmethod + def builder( + sink: SinkInfo, table: Table + ) -> ParquetWriterOptionsBuilder: ... + def set_partitions(self, partitions: list[PartitionInfo]) -> None: ... + def set_column_chunks_file_paths(self, file_paths: list[str]) -> None: ... + def set_row_group_size_bytes(self, size_bytes: int) -> None: ... + def set_row_group_size_rows(self, size_rows: int) -> None: ... + def set_max_page_size_bytes(self, size_bytes: int) -> None: ... + def set_max_page_size_rows(self, size_rows: int) -> None: ... + def set_max_dictionary_size(self, size_rows: int) -> None: ... + +class ParquetWriterOptionsBuilder: + def __init__(self): ... + def metadata(self, metadata: TableInputMetadata) -> Self: ... + def key_value_metadata( + self, metadata: list[Mapping[str, str]] + ) -> Self: ... + def compression(self, compression: CompressionType) -> Self: ... + def stats_level(self, sf: StatisticsFreq) -> Self: ... + def int96_timestamps(self, enabled: bool) -> Self: ... + def write_v2_headers(self, enabled: bool) -> Self: ... + def dictionary_policy(self, val: DictionaryPolicy) -> Self: ... + def utc_timestamps(self, enabled: bool) -> Self: ... + def write_arrow_schema(self, enabled: bool) -> Self: ... + def build(self) -> ParquetWriterOptions: ... + +def write_parquet(options: ParquetWriterOptions) -> memoryview: ... diff --git a/python/pylibcudf/pylibcudf/io/types.pyi b/python/pylibcudf/pylibcudf/io/types.pyi index a4f4fc13bdc..ebaaf054e66 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyi +++ b/python/pylibcudf/pylibcudf/io/types.pyi @@ -59,6 +59,12 @@ class QuoteStyle(IntEnum): ColumnNameSpec: TypeAlias = tuple[str, list[ColumnNameSpec]] ChildNameSpec: TypeAlias = Mapping[str, ChildNameSpec] +class PartitionInfo: + def __init__(self, start_row: int, num_rows: int): ... + +class TableInputMetadata: + def __init__(self, table: Table): ... + class TableWithMetadata: tbl: Table def __init__( diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 1ccb6145aff..80542059aed 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -42,10 +42,12 @@ __all__ = [ "CompressionType", "DictionaryPolicy", "JSONRecoveryMode", + "PartitionInfo", "QuoteStyle", "SinkInfo", "SourceInfo", "StatisticsFreq", + "TableInputMetadata", "TableWithMetadata", ] From 81c9839f1e741a617c58dbfa9679e73199ad6519 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Tue, 19 Nov 2024 18:11:24 -0800 Subject: [PATCH 14/23] Address reviews --- python/pylibcudf/pylibcudf/io/parquet.pxd | 10 +++++----- python/pylibcudf/pylibcudf/io/parquet.pyx | 12 ++++++------ python/pylibcudf/pylibcudf/io/types.pxd | 8 +++++--- python/pylibcudf/pylibcudf/io/types.pyi | 13 +++++++++++++ python/pylibcudf/pylibcudf/io/types.pyx | 10 ++++++---- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 3f95e90e204..e19d61e4d1d 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -55,15 +55,15 @@ cdef class ParquetWriterOptions: cpdef void set_column_chunks_file_paths(self, list file_paths) - cpdef void set_row_group_size_bytes(self, int size_bytes) + cpdef void set_row_group_size_bytes(self, size_t size_bytes) - cpdef void set_row_group_size_rows(self, int size_rows) + cpdef void set_row_group_size_rows(self, size_type size_rows) - cpdef void set_max_page_size_bytes(self, int size_bytes) + cpdef void set_max_page_size_bytes(self, size_t size_bytes) - cpdef void set_max_page_size_rows(self, int size_rows) + cpdef void set_max_page_size_rows(self, size_type size_rows) - cpdef void set_max_dictionary_size(self, int size_rows) + cpdef void set_max_dictionary_size(self, size_t size_rows) cdef class ParquetWriterOptionsBuilder: cdef parquet_writer_options_builder c_obj diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 7218eb4ab71..18ab3429431 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -310,7 +310,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) - cpdef void set_row_group_size_bytes(self, int size_bytes): + cpdef void set_row_group_size_bytes(self, size_t size_bytes): """ Sets the maximum row group size, in bytes. @@ -325,7 +325,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_row_group_size_bytes(size_bytes) - cpdef void set_row_group_size_rows(self, int size_rows): + cpdef void set_row_group_size_rows(self, size_type size_rows): """ Sets the maximum row group size, in rows. @@ -340,7 +340,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_row_group_size_rows(size_rows) - cpdef void set_max_page_size_bytes(self, int size_bytes): + cpdef void set_max_page_size_bytes(self, size_t size_bytes): """ Sets the maximum uncompressed page size, in bytes. @@ -355,7 +355,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_max_page_size_bytes(size_bytes) - cpdef void set_max_page_size_rows(self, int size_rows): + cpdef void set_max_page_size_rows(self, size_type size_rows): """ Sets the maximum page size, in rows. @@ -370,7 +370,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_max_page_size_rows(size_rows) - cpdef void set_max_dictionary_size(self, int size_rows): + cpdef void set_max_dictionary_size(self, size_t size_rows): """ Sets the maximum dictionary size, in bytes. @@ -539,7 +539,7 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self): """ - Options member once it's built + Create a ParquetWriterOptions from the set options. Returns ------- diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 7340697e1e7..90b43cf0ff5 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -1,4 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from libc.stdint cimport uint8_t, int32_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector @@ -19,6 +20,7 @@ from pylibcudf.libcudf.io.types cimport ( table_metadata, table_with_metadata, ) +from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table @@ -41,13 +43,13 @@ cdef class ColumnInMetadata: cpdef ColumnInMetadata set_int96_timestamps(self, bool req) - cpdef ColumnInMetadata set_decimal_precision(self, int req) + cpdef ColumnInMetadata set_decimal_precision(self, uint8_t req) - cpdef ColumnInMetadata child(self, int i) + cpdef ColumnInMetadata child(self, size_type i) cpdef ColumnInMetadata set_output_as_binary(self, bool binary) - cpdef ColumnInMetadata set_type_length(self, int type_length) + cpdef ColumnInMetadata set_type_length(self, int32_t type_length) cpdef ColumnInMetadata set_skip_compression(self, bool skip) diff --git a/python/pylibcudf/pylibcudf/io/types.pyi b/python/pylibcudf/pylibcudf/io/types.pyi index ebaaf054e66..04f276cfeee 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyi +++ b/python/pylibcudf/pylibcudf/io/types.pyi @@ -65,6 +65,19 @@ class PartitionInfo: class TableInputMetadata: def __init__(self, table: Table): ... +class ColumnInMetadata: + def set_name(self, name: str) -> ColumnInMetadata: ... + def set_nullability(self, nullable: bool) -> ColumnInMetadata: ... + def set_list_column_as_map(self) -> ColumnInMetadata: ... + def set_int96_timestamps(self, req: bool) -> ColumnInMetadata: ... + def set_decimal_precision(self, precision: int) -> ColumnInMetadata: ... + def child(self, i: int) -> ColumnInMetadata: ... + def set_output_as_binary(self, binary: bool) -> ColumnInMetadata: ... + def set_type_length(self, type_length: int) -> ColumnInMetadata: ... + def set_skip_compression(self, skip: bool) -> ColumnInMetadata: ... + def set_encoding(self, encoding: ColumnEncoding) -> ColumnInMetadata: ... + def get_name(self) -> str: ... + class TableWithMetadata: tbl: Table def __init__( diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 80542059aed..f4bff285d68 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -2,6 +2,7 @@ from cpython.buffer cimport PyBUF_READ from cpython.memoryview cimport PyMemoryView_FromMemory +from libc.stdint cimport uint8_t, int32_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.string cimport string @@ -20,6 +21,7 @@ from pylibcudf.libcudf.io.types cimport ( table_input_metadata, table_with_metadata, ) +from pylibcudf.libcudf.types cimport size_type import codecs import errno @@ -63,7 +65,7 @@ cdef class PartitionInfo: num_rows : int The number of rows in the partition. """ - def __init__(self, int start_row, int num_rows): + def __init__(self, size_type start_row, size_type num_rows): self.c_obj = partition_info(start_row, num_rows) @@ -146,7 +148,7 @@ cdef class ColumnInMetadata: self.c_obj.set_int96_timestamps(req) return self - cpdef ColumnInMetadata set_decimal_precision(self, int precision): + cpdef ColumnInMetadata set_decimal_precision(self, uint8_t precision): """ Set the decimal precision of this column. Only valid if this column is a decimal (fixed-point) type. @@ -163,7 +165,7 @@ cdef class ColumnInMetadata: self.c_obj.set_decimal_precision(precision) return self - cpdef ColumnInMetadata child(self, int i): + cpdef ColumnInMetadata child(self, size_type i): """ Get reference to a child of this column. @@ -194,7 +196,7 @@ cdef class ColumnInMetadata: self.c_obj.set_output_as_binary(binary) return self - cpdef ColumnInMetadata set_type_length(self, int type_length): + cpdef ColumnInMetadata set_type_length(self, int32_t type_length): """ Sets the length of fixed length data. From 85a550506399f9cba9ca38a1f79b280e6a6e4435 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:06:05 -0800 Subject: [PATCH 15/23] Update python/pylibcudf/pylibcudf/io/parquet.pxd Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index e19d61e4d1d..1a61c20d783 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -63,7 +63,7 @@ cdef class ParquetWriterOptions: cpdef void set_max_page_size_rows(self, size_type size_rows) - cpdef void set_max_dictionary_size(self, size_t size_rows) + cpdef void set_max_dictionary_size(self, size_t size_bytes) cdef class ParquetWriterOptionsBuilder: cdef parquet_writer_options_builder c_obj From 2f65032f5aab9613b7a6db301400a95eed415387 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:06:14 -0800 Subject: [PATCH 16/23] Update python/pylibcudf/pylibcudf/io/parquet.pyi Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index fa34f2b938a..eb2ca68109b 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -60,7 +60,7 @@ class ParquetWriterOptions: def set_row_group_size_rows(self, size_rows: int) -> None: ... def set_max_page_size_bytes(self, size_bytes: int) -> None: ... def set_max_page_size_rows(self, size_rows: int) -> None: ... - def set_max_dictionary_size(self, size_rows: int) -> None: ... + def set_max_dictionary_size(self, size_bytes: int) -> None: ... class ParquetWriterOptionsBuilder: def __init__(self): ... From 44a7bba94904a3c6fddc77a11a0dbd8f9e404b0a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:06:22 -0800 Subject: [PATCH 17/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 18ab3429431..3dde6486c1b 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -376,7 +376,7 @@ cdef class ParquetWriterOptions: Parameters ---------- - size_rows : int + size_bytes : int Sets the maximum dictionary size, in bytes.. Returns From 1564fae8aa775137b964008e481f636e87719259 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:06:35 -0800 Subject: [PATCH 18/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 3dde6486c1b..e955207f43f 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -302,7 +302,7 @@ cdef class ParquetWriterOptions: Parameters ---------- file_paths : list[str] - Vector of Strings which indicates file path. + Vector of strings which indicate file paths. Returns ------- From 91ed0383f775e3518c71b73b838fc6c5874e7c96 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:06:47 -0800 Subject: [PATCH 19/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index e955207f43f..87cb18e0e3a 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -370,7 +370,7 @@ cdef class ParquetWriterOptions: """ self.c_obj.set_max_page_size_rows(size_rows) - cpdef void set_max_dictionary_size(self, size_t size_rows): + cpdef void set_max_dictionary_size(self, size_t size_bytes): """ Sets the maximum dictionary size, in bytes. From 2faa151ecaf410fb04dcbd816f81e8eb63cca293 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:07:01 -0800 Subject: [PATCH 20/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 87cb18e0e3a..894c04c5ad0 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -427,7 +427,7 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression): """ - Sets Key-Value footer metadata. + Sets compression type. Parameters ---------- From ed096f364bfb58f920e9579a02b6a86552007104 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:07:15 -0800 Subject: [PATCH 21/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 894c04c5ad0..9cd3a20da66 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -377,7 +377,7 @@ cdef class ParquetWriterOptions: Parameters ---------- size_bytes : int - Sets the maximum dictionary size, in bytes.. + Sets the maximum dictionary size, in bytes. Returns ------- From 66f555f0fa6f3b3fd17b31609a3be48a9d3c7dc9 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 12:07:27 -0800 Subject: [PATCH 22/23] Update python/pylibcudf/pylibcudf/io/parquet.pyx Co-authored-by: Bradley Dice <bdice@bradleydice.com> --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 9cd3a20da66..3d5461f19d3 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -383,7 +383,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.c_obj.set_max_dictionary_size(size_rows) + self.c_obj.set_max_dictionary_size(size_bytes) cdef class ParquetWriterOptionsBuilder: From 0df14b184e23dff818ccbb2f5e9a594524a936cb Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:05:59 -0800 Subject: [PATCH 23/23] address docstring review, reduce test parameterization --- python/pylibcudf/pylibcudf/io/parquet.pyx | 4 +-- .../pylibcudf/tests/io/test_parquet.py | 27 +++---------------- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 3d5461f19d3..b95b1f39de1 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -459,7 +459,7 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled): """ - Sets whether int96 timestamps are written or not. + Sets whether timestamps are written as int96 or timestamp micros. Parameters ---------- @@ -475,7 +475,7 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled): """ - Set to true if V2 page headers are to be written. + Set to true to write V2 page headers, otherwise false to write V1 page headers. Parameters ---------- diff --git a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py index 0b368428f7b..94524acbcc8 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_parquet.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_parquet.py @@ -111,28 +111,7 @@ def test_read_parquet_filters( # bool use_pandas_metadata = True -@pytest.mark.parametrize( - "compression", - [ - plc.io.types.CompressionType.NONE, - plc.io.types.CompressionType.SNAPPY, - ], -) -@pytest.mark.parametrize( - "stats_level", - [ - plc.io.types.StatisticsFreq.STATISTICS_NONE, - plc.io.types.StatisticsFreq.STATISTICS_COLUMN, - ], -) @pytest.mark.parametrize("write_v2_headers", [True, False]) -@pytest.mark.parametrize( - "dictionary_policy", - [ - plc.io.types.DictionaryPolicy.ADAPTIVE, - plc.io.types.DictionaryPolicy.NEVER, - ], -) @pytest.mark.parametrize("utc_timestamps", [True, False]) @pytest.mark.parametrize("write_arrow_schema", [True, False]) @pytest.mark.parametrize( @@ -147,10 +126,7 @@ def test_read_parquet_filters( @pytest.mark.parametrize("max_dictionary_size", [None, 100]) def test_write_parquet( table_data, - compression, - stats_level, write_v2_headers, - dictionary_policy, utc_timestamps, write_arrow_schema, partitions, @@ -168,6 +144,9 @@ def test_write_parquet( table_meta = plc.io.types.TableInputMetadata(plc_table) sink = plc.io.SinkInfo([io.BytesIO()]) user_data = [{"foo": "{'bar': 'baz'}"}] + compression = plc.io.types.CompressionType.SNAPPY + stats_level = plc.io.types.StatisticsFreq.STATISTICS_COLUMN + dictionary_policy = plc.io.types.DictionaryPolicy.ADAPTIVE options = ( plc.io.parquet.ParquetWriterOptions.builder(sink, plc_table) .metadata(table_meta)