diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst new file mode 100644 index 00000000000..5a2276f8b2d --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/csv.rst @@ -0,0 +1,6 @@ +=== +CSV +=== + +.. automodule:: cudf._lib.pylibcudf.io.csv + :members: diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index bde6d8094ce..697bce739de 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -16,4 +16,5 @@ I/O Functions :maxdepth: 1 avro + csv json diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index 9fecff5f5f6..099b61d62ae 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -1,7 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. from libcpp cimport bool -from libcpp.map cimport map from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move @@ -9,8 +8,12 @@ from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.types as libcudf_types from cudf._lib.pylibcudf.io.datasource cimport Datasource, NativeFileDatasource -from cudf._lib.pylibcudf.libcudf.types cimport data_type -from cudf._lib.types cimport dtype_to_data_type +from cudf._lib.types cimport dtype_to_pylibcudf_type + +import errno +import os +from collections import abc +from io import BytesIO, StringIO import numpy as np import pandas as pd @@ -18,65 +21,24 @@ import pandas as pd import cudf from cudf.core.buffer import acquire_spill_lock -from cudf._lib.pylibcudf.libcudf.types cimport size_type - -import errno -import os -from collections import abc -from enum import IntEnum -from io import BytesIO, StringIO - -from libc.stdint cimport int32_t from libcpp cimport bool -from cudf._lib.io.utils cimport make_sink_info, make_source_info +from cudf._lib.io.utils cimport make_sink_info from cudf._lib.pylibcudf.libcudf.io.csv cimport ( - csv_reader_options, csv_writer_options, - read_csv as cpp_read_csv, write_csv as cpp_write_csv, ) from cudf._lib.pylibcudf.libcudf.io.data_sink cimport data_sink -from cudf._lib.pylibcudf.libcudf.io.types cimport ( - compression_type, - quote_style, - sink_info, - source_info, - table_with_metadata, -) +from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type, sink_info from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view -from cudf._lib.utils cimport data_from_unique_ptr, table_view_from_table +from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table from pyarrow.lib import NativeFile +import cudf._lib.pylibcudf as plc from cudf.api.types import is_hashable -ctypedef int32_t underlying_type_t_compression - - -class Compression(IntEnum): - INFER = ( - compression_type.AUTO - ) - SNAPPY = ( - compression_type.SNAPPY - ) - GZIP = ( - compression_type.GZIP - ) - BZ2 = ( - compression_type.BZIP2 - ) - BROTLI = ( - compression_type.BROTLI - ) - ZIP = ( - compression_type.ZIP - ) - XZ = ( - compression_type.XZ - ) - +from cudf._lib.pylibcudf.types cimport DataType CSV_HEX_TYPE_MAP = { "hex": np.dtype("int64"), @@ -84,234 +46,6 @@ CSV_HEX_TYPE_MAP = { "hex32": np.dtype("int32") } -cdef csv_reader_options make_csv_reader_options( - object datasource, - object lineterminator, - object quotechar, - int quoting, - bool doublequote, - object header, - bool mangle_dupe_cols, - object usecols, - object delimiter, - bool delim_whitespace, - bool skipinitialspace, - object names, - object dtype, - int skipfooter, - int skiprows, - bool dayfirst, - object compression, - object thousands, - object decimal, - object true_values, - object false_values, - object nrows, - object byte_range, - bool skip_blank_lines, - object parse_dates, - object comment, - object na_values, - bool keep_default_na, - bool na_filter, - object prefix, - object index_col, -) except *: - cdef source_info c_source_info = make_source_info([datasource]) - cdef compression_type c_compression - cdef vector[string] c_names - cdef size_t c_byte_range_offset = ( - byte_range[0] if byte_range is not None else 0 - ) - cdef size_t c_byte_range_size = ( - byte_range[1] if byte_range is not None else 0 - ) - cdef vector[int] c_use_cols_indexes - cdef vector[string] c_use_cols_names - cdef size_type c_nrows = nrows if nrows is not None else -1 - cdef quote_style c_quoting - cdef vector[string] c_parse_dates_names - cdef vector[int] c_parse_dates_indexes - cdef vector[string] c_hex_col_names - cdef vector[data_type] c_dtypes_list - cdef map[string, data_type] c_dtypes_map - cdef vector[int] c_hex_col_indexes - cdef vector[string] c_true_values - cdef vector[string] c_false_values - cdef vector[string] c_na_values - - # Reader settings - if compression is None: - c_compression = compression_type.NONE - else: - compression = str(compression) - compression = Compression[compression.upper()] - c_compression = ( - compression - ) - - if quoting == 1: - c_quoting = quote_style.ALL - elif quoting == 2: - c_quoting = quote_style.NONNUMERIC - elif quoting == 3: - c_quoting = quote_style.NONE - else: - # Default value - c_quoting = quote_style.MINIMAL - - cdef csv_reader_options csv_reader_options_c = move( - csv_reader_options.builder(c_source_info) - .compression(c_compression) - .mangle_dupe_cols(mangle_dupe_cols) - .byte_range_offset(c_byte_range_offset) - .byte_range_size(c_byte_range_size) - .nrows(c_nrows) - .skiprows(skiprows) - .skipfooter(skipfooter) - .quoting(c_quoting) - .lineterminator(ord(lineterminator)) - .quotechar(ord(quotechar)) - .decimal(ord(decimal)) - .delim_whitespace(delim_whitespace) - .skipinitialspace(skipinitialspace) - .skip_blank_lines(skip_blank_lines) - .doublequote(doublequote) - .keep_default_na(keep_default_na) - .na_filter(na_filter) - .dayfirst(dayfirst) - .build() - ) - - if names is not None: - # explicitly mentioned name, so don't check header - if header is None or header == 'infer': - csv_reader_options_c.set_header(-1) - else: - csv_reader_options_c.set_header(header) - - c_names.reserve(len(names)) - for name in names: - c_names.push_back(str(name).encode()) - csv_reader_options_c.set_names(c_names) - else: - if header is None: - csv_reader_options_c.set_header(-1) - elif header == 'infer': - csv_reader_options_c.set_header(0) - else: - csv_reader_options_c.set_header(header) - - if prefix is not None: - csv_reader_options_c.set_prefix(prefix.encode()) - - if usecols is not None: - all_int = all(isinstance(col, int) for col in usecols) - if all_int: - c_use_cols_indexes.reserve(len(usecols)) - c_use_cols_indexes = usecols - csv_reader_options_c.set_use_cols_indexes(c_use_cols_indexes) - else: - c_use_cols_names.reserve(len(usecols)) - for col_name in usecols: - c_use_cols_names.push_back( - str(col_name).encode() - ) - csv_reader_options_c.set_use_cols_names(c_use_cols_names) - - if delimiter is not None: - csv_reader_options_c.set_delimiter(ord(delimiter)) - - if thousands is not None: - csv_reader_options_c.set_thousands(ord(thousands)) - - if comment is not None: - csv_reader_options_c.set_comment(ord(comment)) - - if parse_dates is not None: - if isinstance(parse_dates, abc.Mapping): - raise NotImplementedError( - "`parse_dates`: dictionaries are unsupported") - if not isinstance(parse_dates, abc.Iterable): - raise NotImplementedError( - "`parse_dates`: an iterable is required") - for col in parse_dates: - if isinstance(col, str): - c_parse_dates_names.push_back(str(col).encode()) - elif isinstance(col, int): - c_parse_dates_indexes.push_back(col) - else: - raise NotImplementedError( - "`parse_dates`: Nesting is unsupported") - csv_reader_options_c.set_parse_dates(c_parse_dates_names) - csv_reader_options_c.set_parse_dates(c_parse_dates_indexes) - - if dtype is not None: - if isinstance(dtype, abc.Mapping): - for k, v in dtype.items(): - col_type = v - if is_hashable(v) and v in CSV_HEX_TYPE_MAP: - col_type = CSV_HEX_TYPE_MAP[v] - c_hex_col_names.push_back(str(k).encode()) - - c_dtypes_map[str(k).encode()] = \ - _get_cudf_data_type_from_dtype( - cudf.dtype(col_type)) - csv_reader_options_c.set_dtypes(c_dtypes_map) - csv_reader_options_c.set_parse_hex(c_hex_col_names) - elif ( - cudf.api.types.is_scalar(dtype) or - isinstance(dtype, ( - np.dtype, pd.api.extensions.ExtensionDtype, type - )) - ): - c_dtypes_list.reserve(1) - if is_hashable(dtype) and dtype in CSV_HEX_TYPE_MAP: - dtype = CSV_HEX_TYPE_MAP[dtype] - c_hex_col_indexes.push_back(0) - - c_dtypes_list.push_back( - _get_cudf_data_type_from_dtype(dtype) - ) - csv_reader_options_c.set_dtypes(c_dtypes_list) - csv_reader_options_c.set_parse_hex(c_hex_col_indexes) - elif isinstance(dtype, abc.Collection): - c_dtypes_list.reserve(len(dtype)) - for index, col_dtype in enumerate(dtype): - if is_hashable(col_dtype) and col_dtype in CSV_HEX_TYPE_MAP: - col_dtype = CSV_HEX_TYPE_MAP[col_dtype] - c_hex_col_indexes.push_back(index) - - c_dtypes_list.push_back( - _get_cudf_data_type_from_dtype(col_dtype) - ) - csv_reader_options_c.set_dtypes(c_dtypes_list) - csv_reader_options_c.set_parse_hex(c_hex_col_indexes) - else: - raise ValueError( - "dtype should be a scalar/str/list-like/dict-like" - ) - - if true_values is not None: - c_true_values.reserve(len(true_values)) - for tv in true_values: - c_true_values.push_back(tv.encode()) - csv_reader_options_c.set_true_values(c_true_values) - - if false_values is not None: - c_false_values.reserve(len(false_values)) - for fv in false_values: - c_false_values.push_back(fv.encode()) - csv_reader_options_c.set_false_values(c_false_values) - - if na_values is not None: - c_na_values.reserve(len(na_values)) - for nv in na_values: - c_na_values.push_back(nv.encode()) - csv_reader_options_c.set_na_values(c_na_values) - - return csv_reader_options_c - def validate_args( object delimiter, @@ -381,7 +115,6 @@ def read_csv( bool na_filter=True, object prefix=None, object index_col=None, - **kwargs, ): """ Cython function to call into libcudf API, see `read_csv`. @@ -413,23 +146,120 @@ def read_csv( if delimiter is None: delimiter = sep - cdef csv_reader_options read_csv_options_c = make_csv_reader_options( - datasource, lineterminator, quotechar, quoting, doublequote, - header, mangle_dupe_cols, usecols, delimiter, delim_whitespace, - skipinitialspace, names, dtype, skipfooter, skiprows, dayfirst, - compression, thousands, decimal, true_values, false_values, nrows, - byte_range, skip_blank_lines, parse_dates, comment, na_values, - keep_default_na, na_filter, prefix, index_col) + delimiter = str(delimiter) + + if byte_range is None: + byte_range = (0, 0) + + if compression is None: + c_compression = compression_type.NONE + else: + compression_map = { + "infer": compression_type.AUTO, + "gzip": compression_type.GZIP, + "bz2": compression_type.BZIP2, + "zip": compression_type.ZIP, + } + c_compression = compression_map[compression] - cdef table_with_metadata c_result - with nogil: - c_result = move(cpp_read_csv(read_csv_options_c)) + # We need this later when setting index cols + orig_header = header + + if names is not None: + # explicitly mentioned name, so don't check header + if header is None or header == 'infer': + header = -1 + else: + header = header + names = list(names) + else: + if header is None: + header = -1 + elif header == 'infer': + header = 0 - meta_names = [info.name.decode() for info in c_result.metadata.schema_info] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=meta_names - )) + hex_cols = [] + + new_dtypes = [] + if dtype is not None: + if isinstance(dtype, abc.Mapping): + new_dtypes = dict() + for k, v in dtype.items(): + col_type = v + if is_hashable(v) and v in CSV_HEX_TYPE_MAP: + col_type = CSV_HEX_TYPE_MAP[v] + hex_cols.append(str(k)) + + new_dtypes[k] = _get_plc_data_type_from_dtype( + cudf.dtype(col_type) + ) + elif ( + cudf.api.types.is_scalar(dtype) or + isinstance(dtype, ( + np.dtype, pd.api.extensions.ExtensionDtype, type + )) + ): + if is_hashable(dtype) and dtype in CSV_HEX_TYPE_MAP: + dtype = CSV_HEX_TYPE_MAP[dtype] + hex_cols.append(0) + + new_dtypes.append( + _get_plc_data_type_from_dtype(dtype) + ) + elif isinstance(dtype, abc.Collection): + for index, col_dtype in enumerate(dtype): + if is_hashable(col_dtype) and col_dtype in CSV_HEX_TYPE_MAP: + col_dtype = CSV_HEX_TYPE_MAP[col_dtype] + hex_cols.append(index) + + new_dtypes.append( + _get_plc_data_type_from_dtype(col_dtype) + ) + else: + raise ValueError( + "dtype should be a scalar/str/list-like/dict-like" + ) + + lineterminator = str(lineterminator) + + df = cudf.DataFrame._from_data( + *data_from_pylibcudf_io( + plc.io.csv.read_csv( + plc.io.SourceInfo([datasource]), + lineterminator=lineterminator, + quotechar = quotechar, + quoting = quoting, + doublequote = doublequote, + header = header, + mangle_dupe_cols = mangle_dupe_cols, + usecols = usecols, + delimiter = delimiter, + delim_whitespace = delim_whitespace, + skipinitialspace = skipinitialspace, + col_names = names, + dtypes = new_dtypes, + skipfooter = skipfooter, + skiprows = skiprows, + dayfirst = dayfirst, + compression = c_compression, + thousands = thousands, + decimal = decimal, + true_values = true_values, + false_values = false_values, + nrows = nrows if nrows is not None else -1, + byte_range_offset = byte_range[0], + byte_range_size = byte_range[1], + skip_blank_lines = skip_blank_lines, + parse_dates = parse_dates, + parse_hex = hex_cols, + comment = comment, + na_values = na_values, + keep_default_na = keep_default_na, + na_filter = na_filter, + prefix = prefix, + ) + ) + ) if dtype is not None: if isinstance(dtype, abc.Mapping): @@ -459,7 +289,7 @@ def read_csv( index_col_name = df._data.select_by_index(index_col).names[0] df = df.set_index(index_col_name) if isinstance(index_col_name, str) and \ - names is None and header in ("infer",): + names is None and orig_header == "infer": if index_col_name.startswith("Unnamed:"): # TODO: Try to upstream it to libcudf # csv reader in future @@ -550,7 +380,7 @@ def write_csv( ) -cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: +cdef DataType _get_plc_data_type_from_dtype(object dtype) except *: # TODO: Remove this work-around Dictionary types # in libcudf are fully mapped to categorical columns: # https://github.com/rapidsai/cudf/issues/3960 @@ -561,36 +391,36 @@ cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *: if isinstance(dtype, str): if str(dtype) == "date32": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_DAYS ) elif str(dtype) in ("date", "date64"): - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp[us]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MICROSECONDS ) elif str(dtype) == "timestamp[s]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_SECONDS ) elif str(dtype) == "timestamp[ms]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_MILLISECONDS ) elif str(dtype) == "timestamp[ns]": - return libcudf_types.data_type( + return DataType( libcudf_types.type_id.TIMESTAMP_NANOSECONDS ) dtype = cudf.dtype(dtype) - return dtype_to_data_type(dtype) + return dtype_to_pylibcudf_type(dtype) def columns_apply_na_rep(column_names, na_rep): diff --git a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt index 084b341ec48..8dd08d11dc8 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx datasource.pyx json.pyx types.pyx) +set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -21,7 +21,7 @@ rapids_cython_create_modules( LINKED_LIBRARIES "${linked_libraries}" MODULE_PREFIX pylibcudf_io_ ASSOCIATED_TARGETS cudf ) -set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_datasource pylibcudf_io_json - pylibcudf_io_types +set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_csv pylibcudf_io_datasource + pylibcudf_io_json pylibcudf_io_types ) link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index ef4c65b277e..5b3272d60e0 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,4 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +# CSV is removed since it is def not cpdef (to force kw-only arguments) from . cimport avro, datasource, json, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py index fb4e4c7e4bb..e17deaa4663 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, datasource, json, types +from . import avro, csv, datasource, json, types from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx new file mode 100644 index 00000000000..e9efb5befee --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/csv.pyx @@ -0,0 +1,264 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.map cimport map +from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.csv cimport ( + csv_reader_options, + read_csv as cpp_read_csv, +) +from cudf._lib.pylibcudf.libcudf.io.types cimport ( + compression_type, + quote_style, + table_with_metadata, +) +from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type +from cudf._lib.pylibcudf.types cimport DataType + + +cdef tuple _process_parse_dates_hex(list cols): + cdef vector[string] str_cols + cdef vector[int] int_cols + for col in cols: + if isinstance(col, str): + str_cols.push_back(col.encode()) + else: + int_cols.push_back(col) + return str_cols, int_cols + +cdef vector[string] _make_str_vector(list vals): + cdef vector[string] res + for val in vals: + res.push_back((val).encode()) + return res + + +def read_csv( + SourceInfo source_info, + *, + compression_type compression = compression_type.AUTO, + size_t byte_range_offset = 0, + size_t byte_range_size = 0, + list col_names = None, + str prefix = "", + bool mangle_dupe_cols = True, + list usecols = None, + size_type nrows = -1, + size_type skiprows = 0, + size_type skipfooter = 0, + size_type header = 0, + str lineterminator = "\n", + str delimiter = None, + str thousands = None, + str decimal = ".", + str comment = None, + bool delim_whitespace = False, + bool skipinitialspace = False, + bool skip_blank_lines = True, + quote_style quoting = quote_style.MINIMAL, + str quotechar = '"', + bool doublequote = True, + list parse_dates = None, + list parse_hex = None, + # Technically this should be dict/list + # but using a fused type prevents using None as default + object dtypes = None, + list true_values = None, + list false_values = None, + list na_values = None, + bool keep_default_na = True, + bool na_filter = True, + bool dayfirst = False, + # Note: These options are supported by the libcudf reader + # but are not exposed here since there is no demand for them + # on the Python side yet. + # bool detect_whitespace_around_quotes = False, + # DataType timestamp_type = DataType(type_id.EMPTY), +): + """Reads a CSV file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo to read the CSV file from. + compression : compression_type, default CompressionType.AUTO + The compression format of the CSV source. + byte_range_offset : size_type, default 0 + Number of bytes to skip from source start. + byte_range_size : size_type, default 0 + Number of bytes to read. By default, will read all bytes. + col_names : list, default None + The column names to use. + prefix : string, default '' + The prefix to apply to the column names. + mangle_dupe_cols : bool, default True + If True, rename duplicate column names. + usecols : list, default None + Specify the string column names/integer column indices of columns to be read. + nrows : size_type, default -1 + The number of rows to read. + skiprows : size_type, default 0 + The number of rows to skip from the start before reading + skipfooter : size_type, default 0 + The number of rows to skip from the end + header : size_type, default 0 + The index of the row that will be used for header names. + Pass -1 to use default column names. + lineterminator : str, default '\\n' + The character used to determine the end of a line. + delimiter : str, default "," + The character used to separate fields in a row. + thousands : str, default None + The character used as the thousands separator. + Cannot match delimiter. + decimal : str, default '.' + The character used as the decimal separator. + Cannot match delimiter. + comment : str, default None + The character used to identify the start of a comment line. + (which will be skipped by the reader) + delim_whitespace : bool, default False + If True, treat whitespace as the field delimiter. + skipinitialspace : bool, default False + If True, skip whitespace after the delimiter. + skip_blank_lines : bool, default True + If True, ignore empty lines (otherwise line values are parsed as null). + quoting : QuoteStyle, default QuoteStyle.MINIMAL + The quoting style used in the input CSV data. One of + { QuoteStyle.MINIMAL, QuoteStyle.ALL, QuoteStyle.NONNUMERIC, QuoteStyle.NONE } + quotechar : str, default '"' + The character used to indicate quoting. + doublequote : bool, default True + If True, a quote inside a value is double-quoted. + parse_dates : list, default None + A list of integer column indices/string column names + of columns to read as datetime. + parse_hex : list, default None + A list of integer column indices/string column names + of columns to read as hexadecimal. + dtypes : Union[Dict[str, DataType], List[DataType]], default None + A list of data types or a dictionary mapping column names + to a DataType. + true_values : List[str], default None + A list of additional values to recognize as True. + false_values : List[str], default None + A list of additional values to recognize as False. + na_values : List[str], default None + A list of additional values to recognize as null. + keep_default_na : bool, default True + Whether to keep the built-in default N/A values. + na_filter : bool, default True + Whether to detect missing values. If False, can + improve performance. + dayfirst : bool, default False + If True, interpret dates as being in the DD/MM format. + + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. + """ + cdef vector[string] c_parse_dates_names + cdef vector[int] c_parse_dates_indexes + cdef vector[int] c_parse_hex_names + cdef vector[int] c_parse_hex_indexes + cdef vector[data_type] c_dtypes_list + cdef map[string, data_type] c_dtypes_map + + cdef csv_reader_options options = move( + csv_reader_options.builder(source_info.c_obj) + .compression(compression) + .mangle_dupe_cols(mangle_dupe_cols) + .byte_range_offset(byte_range_offset) + .byte_range_size(byte_range_size) + .nrows(nrows) + .skiprows(skiprows) + .skipfooter(skipfooter) + .quoting(quoting) + .lineterminator(ord(lineterminator)) + .quotechar(ord(quotechar)) + .decimal(ord(decimal)) + .delim_whitespace(delim_whitespace) + .skipinitialspace(skipinitialspace) + .skip_blank_lines(skip_blank_lines) + .doublequote(doublequote) + .keep_default_na(keep_default_na) + .na_filter(na_filter) + .dayfirst(dayfirst) + .build() + ) + + options.set_header(header) + + if col_names is not None: + options.set_names([str(name).encode() for name in col_names]) + + if prefix is not None: + options.set_prefix(prefix.encode()) + + if usecols is not None: + if all([isinstance(col, int) for col in usecols]): + options.set_use_cols_indexes(list(usecols)) + else: + options.set_use_cols_names([str(name).encode() for name in usecols]) + + if delimiter is not None: + options.set_delimiter(ord(delimiter)) + + if thousands is not None: + options.set_thousands(ord(thousands)) + + if comment is not None: + options.set_comment(ord(comment)) + + if parse_dates is not None: + if not all([isinstance(col, (str, int)) for col in parse_dates]): + raise NotImplementedError( + "`parse_dates`: Must pass a list of column names/indices") + + # Set both since users are allowed to mix column names and indices + c_parse_dates_names, c_parse_dates_indexes = \ + _process_parse_dates_hex(parse_dates) + options.set_parse_dates(c_parse_dates_names) + options.set_parse_dates(c_parse_dates_indexes) + + if parse_hex is not None: + if not all([isinstance(col, (str, int)) for col in parse_hex]): + raise NotImplementedError( + "`parse_hex`: Must pass a list of column names/indices") + + # Set both since users are allowed to mix column names and indices + c_parse_hex_names, c_parse_hex_indexes = _process_parse_dates_hex(parse_hex) + options.set_parse_hex(c_parse_hex_names) + options.set_parse_hex(c_parse_hex_indexes) + + if isinstance(dtypes, list): + for dtype in dtypes: + c_dtypes_list.push_back((dtype).c_obj) + options.set_dtypes(c_dtypes_list) + elif isinstance(dtypes, dict): + # dtypes_t is dict + for k, v in dtypes.items(): + c_dtypes_map[str(k).encode()] = (v).c_obj + options.set_dtypes(c_dtypes_map) + elif dtypes is not None: + raise TypeError("dtypes must either by a list/dict") + + if true_values is not None: + options.set_true_values(_make_str_vector(true_values)) + + if false_values is not None: + options.set_false_values(_make_str_vector(false_values)) + + if na_values is not None: + options.set_na_values(_make_str_vector(na_values)) + + cdef table_with_metadata c_result + with nogil: + c_result = move(cpp_read_csv(options)) + + return TableWithMetadata.from_libcudf(c_result) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd index ab223c16a72..0094bf6032c 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pxd @@ -38,6 +38,9 @@ cdef class TableWithMetadata: cdef class SourceInfo: cdef source_info c_obj + # Keep the bytes converted from stringio alive + # (otherwise we end up with a use after free when they get gc'ed) + cdef list byte_sources cdef class SinkInfo: # This vector just exists to keep the unique_ptrs to the sinks alive diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index df0b729b711..68498ff88f4 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -178,7 +178,7 @@ cdef class SourceInfo: raise ValueError("All sources must be of the same type!") new_sources.append(buffer.read().encode()) sources = new_sources - + self.byte_sources = sources if isinstance(sources[0], bytes): empty_buffer = True for buffer in sources: diff --git a/python/cudf/cudf/_lib/types.pyx b/python/cudf/cudf/_lib/types.pyx index 895e1afc502..fc672caa574 100644 --- a/python/cudf/cudf/_lib/types.pyx +++ b/python/cudf/cudf/_lib/types.pyx @@ -239,6 +239,9 @@ cdef dtype_from_column_view(column_view cv): ] cdef libcudf_types.data_type dtype_to_data_type(dtype) except *: + # Note: This function is to be phased out in favor of + # dtype_to_pylibcudf_type which will return a pylibcudf + # DataType object cdef libcudf_types.type_id tid if isinstance(dtype, cudf.ListDtype): tid = libcudf_types.type_id.LIST diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index efb192b3251..e029edfa2ed 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -4,6 +4,7 @@ import io import os +import numpy as np import pyarrow as pa import pytest @@ -109,7 +110,10 @@ def _make_fields_nullable(typ): lhs_type = _make_fields_nullable(lhs.type) lhs = rhs.cast(lhs_type) - assert lhs.equals(rhs) + if pa.types.is_floating(lhs.type) and pa.types.is_floating(rhs.type): + np.testing.assert_array_almost_equal(lhs, rhs) + else: + assert lhs.equals(rhs) def assert_table_eq(pa_table: pa.Table, plc_table: plc.Table) -> None: @@ -125,6 +129,8 @@ def assert_table_and_meta_eq( pa_table: pa.Table, plc_table_w_meta: plc.io.types.TableWithMetadata, check_field_nullability=True, + check_types_if_empty=True, + check_names=True, ) -> None: """Verify that the pylibcudf TableWithMetadata and PyArrow table are equal""" @@ -135,11 +141,17 @@ def assert_table_and_meta_eq( plc_shape == pa_table.shape ), f"{plc_shape} is not equal to {pa_table.shape}" + if not check_types_if_empty and plc_table.num_rows() == 0: + return + for plc_col, pa_col in zip(plc_table.columns(), pa_table.columns): assert_column_eq(pa_col, plc_col, check_field_nullability) # Check column name equality - assert plc_table_w_meta.column_names() == pa_table.column_names + if check_names: + assert ( + plc_table_w_meta.column_names() == pa_table.column_names + ), f"{plc_table_w_meta.column_names()} != {pa_table.column_names}" def cudf_raises(expected_exception: BaseException, *args, **kwargs): @@ -174,6 +186,33 @@ def is_nested_list(typ): return nesting_level(typ)[0] > 1 +def _convert_numeric_types_to_floating(pa_table): + """ + Useful little helper for testing the + dtypes option in I/O readers. + + Returns a tuple containing the pylibcudf dtypes + and the new pyarrow schema + """ + dtypes = [] + new_fields = [] + for i in range(len(pa_table.schema)): + field = pa_table.schema.field(i) + child_types = [] + + plc_type = plc.interop.from_arrow(field.type) + if pa.types.is_integer(field.type) or pa.types.is_unsigned_integer( + field.type + ): + plc_type = plc.interop.from_arrow(pa.float64()) + field = field.with_type(pa.float64()) + + dtypes.append((field.name, plc_type, child_types)) + + new_fields.append(field) + return dtypes, new_fields + + def write_source_str(source, input_str): """ Write a string to the source diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 53e207f29cb..4a7194a6d8d 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -141,6 +141,20 @@ def _generate_nested_data(typ): ), pa_table +@pytest.fixture(params=[(0, 0), ("half", 0), (-1, "half")]) +def nrows_skiprows(table_data, request): + """ + Parametrized nrows fixture that accompanies table_data + """ + _, pa_table = table_data + nrows, skiprows = request.param + if nrows == "half": + nrows = len(pa_table) // 2 + if skiprows == "half": + skiprows = (len(pa_table) - nrows) // 2 + return nrows, skiprows + + @pytest.fixture( params=["a.txt", pathlib.Path("a.txt"), io.BytesIO, io.StringIO], ) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_csv.py b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py new file mode 100644 index 00000000000..95326a8b681 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/io/test_csv.py @@ -0,0 +1,280 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import io +import os +from io import StringIO + +import pandas as pd +import pyarrow as pa +import pytest +from utils import ( + _convert_numeric_types_to_floating, + assert_table_and_meta_eq, + make_source, + write_source_str, +) + +import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.io.types import CompressionType + +# Shared kwargs to pass to make_source +_COMMON_CSV_SOURCE_KWARGS = { + "format": "csv", + "index": False, +} + + +@pytest.fixture(scope="module") +def csv_table_data(table_data): + """ + Like the table_data but with nested types dropped + since the CSV reader can't handle that + uint64 is also dropped since it can get confused with int64 + """ + _, pa_table = table_data + pa_table = pa_table.drop_columns( + [ + "col_uint64", + "col_list", + "col_list>", + "col_struct", + "col_struct not null>", + ] + ) + return plc.interop.from_arrow(pa_table), pa_table + + +@pytest.mark.parametrize("delimiter", [",", ";"]) +def test_read_csv_basic( + csv_table_data, + source_or_sink, + text_compression_type, + nrows_skiprows, + delimiter, +): + _, pa_table = csv_table_data + compression_type = text_compression_type + nrows, skiprows = nrows_skiprows + + # can't compress non-binary data with pandas + if isinstance(source_or_sink, io.StringIO): + compression_type = CompressionType.NONE + + source = make_source( + source_or_sink, + pa_table, + compression=compression_type, + sep=delimiter, + **_COMMON_CSV_SOURCE_KWARGS, + ) + + # Rename the table (by reversing the names) to test names argument + pa_table = pa_table.rename_columns(pa_table.column_names[::-1]) + column_names = pa_table.column_names + + # Adapt to nrows/skiprows + pa_table = pa_table.slice( + offset=skiprows, length=nrows if nrows != -1 else None + ) + + res = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), + delimiter=delimiter, + compression=compression_type, + col_names=column_names, + nrows=nrows, + skiprows=skiprows, + ) + + assert_table_and_meta_eq( + pa_table, + res, + check_types_if_empty=False, + check_names=False if skiprows > 0 and column_names is None else True, + ) + + +# Note: make sure chunk size is big enough so that dtype inference +# infers correctly +@pytest.mark.parametrize("chunk_size", [1000, 5999]) +def test_read_csv_byte_range(table_data, chunk_size, tmp_path): + _, pa_table = table_data + if len(pa_table) == 0: + # pandas writes nothing when we have empty table + # and header=None + pytest.skip("Don't test empty table case") + source = f"{tmp_path}/a.csv" + source = make_source( + source, pa_table, header=False, **_COMMON_CSV_SOURCE_KWARGS + ) + file_size = os.stat(source).st_size + tbls_w_meta = [] + for segment in range((file_size + chunk_size - 1) // chunk_size): + tbls_w_meta.append( + plc.io.csv.read_csv( + plc.io.SourceInfo([source]), + byte_range_offset=segment * chunk_size, + byte_range_size=chunk_size, + header=-1, + col_names=pa_table.column_names, + ) + ) + if isinstance(source, io.IOBase): + source.seek(0) + exp = pd.read_csv(source, names=pa_table.column_names, header=None) + tbls = [] + for tbl_w_meta in tbls_w_meta: + if tbl_w_meta.tbl.num_rows() > 0: + tbls.append(plc.interop.to_arrow(tbl_w_meta.tbl)) + full_tbl = pa.concat_tables(tbls) + + full_tbl_plc = plc.io.TableWithMetadata( + plc.interop.from_arrow(full_tbl), + tbls_w_meta[0].column_names(include_children=True), + ) + assert_table_and_meta_eq(pa.Table.from_pandas(exp), full_tbl_plc) + + +@pytest.mark.parametrize("usecols", [None, ["col_int64", "col_bool"], [0, 1]]) +def test_read_csv_dtypes(csv_table_data, source_or_sink, usecols): + # Simple test for dtypes where we read in + # all numeric data as floats + _, pa_table = csv_table_data + + source = make_source( + source_or_sink, + pa_table, + **_COMMON_CSV_SOURCE_KWARGS, + ) + # Adjust table for usecols + if usecols is not None: + pa_table = pa_table.select(usecols) + + dtypes, new_fields = _convert_numeric_types_to_floating(pa_table) + # Extract the dtype out of the (name, type, child_types) tuple + # (read_csv doesn't support this format since it doesn't support nested columns) + dtypes = {name: dtype for name, dtype, _ in dtypes} + + new_schema = pa.schema(new_fields) + + res = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), dtypes=dtypes, usecols=usecols + ) + new_table = pa_table.cast(new_schema) + + assert_table_and_meta_eq(new_table, res) + + +@pytest.mark.parametrize("skip_blanks", [True, False]) +@pytest.mark.parametrize("decimal, quotechar", [(".", "'"), ("_", '"')]) +@pytest.mark.parametrize("lineterminator", ["\n", "\r\n"]) +def test_read_csv_parse_options( + source_or_sink, decimal, quotechar, skip_blanks, lineterminator +): + lines = [ + "# first comment line", + "# third comment line", + "1,2,3,4_4,'z'", + '4,5,6,5_5,""', + "7,8,9,9_87,'123'", + "# last comment line", + "1,1,1,10_11,abc", + ] + buffer = lineterminator.join(lines) + + write_source_str(source_or_sink, buffer) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source_or_sink]), + comment="#", + decimal=decimal, + skip_blank_lines=skip_blanks, + quotechar=quotechar, + ) + df = pd.read_csv( + StringIO(buffer), + comment="#", + decimal=decimal, + skip_blank_lines=skip_blanks, + quotechar=quotechar, + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta) + + +@pytest.mark.parametrize("na_filter", [True, False]) +@pytest.mark.parametrize("na_values", [["n/a"], ["NV_NAN"]]) +@pytest.mark.parametrize("keep_default_na", [True, False]) +def test_read_csv_na_values( + source_or_sink, na_filter, na_values, keep_default_na +): + lines = ["a,b,c", "n/a,NaN,NV_NAN", "1.0,2.0,3.0"] + buffer = "\n".join(lines) + + write_source_str(source_or_sink, buffer) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source_or_sink]), + na_filter=na_filter, + na_values=na_values if na_filter else None, + keep_default_na=keep_default_na, + ) + df = pd.read_csv( + StringIO(buffer), + na_filter=na_filter, + na_values=na_values if na_filter else None, + keep_default_na=keep_default_na, + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta) + + +@pytest.mark.parametrize("header", [0, 10, -1]) +def test_read_csv_header(csv_table_data, source_or_sink, header): + _, pa_table = csv_table_data + + source = make_source( + source_or_sink, + pa_table, + **_COMMON_CSV_SOURCE_KWARGS, + ) + + plc_table_w_meta = plc.io.csv.read_csv( + plc.io.SourceInfo([source]), header=header + ) + if header > 0: + if header < len(pa_table): + names_row = pa_table.take([header - 1]).to_pylist()[0].values() + pa_table = pa_table.slice(header) + col_names = [str(name) for name in names_row] + pa_table = pa_table.rename_columns(col_names) + else: + pa_table = pa.table([]) + elif header < 0: + # neg header means use user-provided names (in this case nothing) + # (the original column names are now data) + tbl_dict = pa_table.to_pydict() + new_tbl_dict = {} + for i, (name, vals) in enumerate(tbl_dict.items()): + str_vals = [str(val) for val in vals] + new_tbl_dict[str(i)] = [name] + str_vals + pa_table = pa.table(new_tbl_dict) + + assert_table_and_meta_eq( + pa_table, + plc_table_w_meta, + check_types_if_empty=False, + ) + + +# TODO: test these +# str prefix = "", +# bool mangle_dupe_cols = True, +# size_type skipfooter = 0, +# str thousands = None, +# bool delim_whitespace = False, +# bool skipinitialspace = False, +# quote_style quoting = quote_style.MINIMAL, +# bool doublequote = True, +# bool detect_whitespace_around_quotes = False, +# list parse_dates = None, +# list true_values = None, +# list false_values = None, +# bool dayfirst = False,