diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 6c9a5a2a..9d0e5eec 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -7,6 +7,7 @@ Release notes Release 0.12.1 (unreleased) =========================== +- `PR 730 `_ (resolves `PR 744 `_): Fix column type returned by a ``make_batch_reader`` when TransformSpec function sets all column values to ``None``. Release 0.12.0 @@ -33,6 +34,7 @@ Deprecated features Release 0.11.5 =========================== +<<<<<<< HEAD - `PR 746 `_: Import ABC from collections.abc for Python 3.10 compatibility. - `PR 757 `_: Replace process_iter by pid_exists. - `PR 762 `_: PyTorch: improve memory-efficiency in batched non-shuffle buffer. diff --git a/petastorm/arrow_reader_worker.py b/petastorm/arrow_reader_worker.py index fedad01d..2b602d99 100644 --- a/petastorm/arrow_reader_worker.py +++ b/petastorm/arrow_reader_worker.py @@ -62,8 +62,8 @@ def read_next(self, workers_pool, schema, ngram): column_as_numpy = column_as_pandas if pa.types.is_string(column.type): - result_dict[column_name] = column_as_numpy.astype(np.unicode_) - elif pa.types.is_list(column.type): + result_dict[column_name] = column_as_numpy.astype(np.object) + elif pa.types.is_list(column.type) or pa.types.is_fixed_size_list(column.type): # Assuming all lists are of the same length, hence we can collate them into a matrix list_of_lists = column_as_numpy try: @@ -190,7 +190,7 @@ def _load_rows(self, pq_file, piece, shuffle_row_drop_range): result = self._read_with_shuffle_row_drop(piece, pq_file, column_names_in_schema, shuffle_row_drop_range) if self._transform_spec: - result_as_pandas = result.to_pandas() + result_as_pandas = result.to_pandas(date_as_object=False) # A user may omit `func` value if they intend just to delete some fields using the TransformSpec if self._transform_spec.func: transformed_result = self._transform_spec.func(result_as_pandas) @@ -219,7 +219,10 @@ def _load_rows(self, pq_file, piece, shuffle_row_drop_range): transformed_result[field.name] = transformed_result[field.name] \ .map(lambda x, f=field: self._check_shape_and_ravel(x, f)) - result = pa.Table.from_pandas(transformed_result, preserve_index=False) + # Explicitly specify schema since in a case when all column values are None, pyarrow + # would not be able to properly detect the type + result = pa.Table.from_pandas(transformed_result, preserve_index=False, + schema=self._transformed_schema.as_pyarrow_schema()) return result diff --git a/petastorm/tests/test_parquet_reader.py b/petastorm/tests/test_parquet_reader.py index ba15281c..7cea25a3 100644 --- a/petastorm/tests/test_parquet_reader.py +++ b/petastorm/tests/test_parquet_reader.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd +import pyarrow as pa import pytest from pyarrow import parquet as pq @@ -209,6 +210,65 @@ def preproc_fn1(x): (3, 4, 5) == sample['tensor_col_2'].shape[1:] +@pytest.mark.parametrize('null_column_dtype', [np.float64, np.unicode_]) +@pytest.mark.parametrize('reader_factory', _D) +def test_transform_spec_returns_all_none_values(scalar_dataset, null_column_dtype, reader_factory): + def fill_id_with_nones(x): + return pd.DataFrame({'id': [None] * len(x)}) + + edit_fields = [('id', null_column_dtype, (), True)] + + with reader_factory(scalar_dataset.url, schema_fields=["id"], + transform_spec=TransformSpec(fill_id_with_nones, edit_fields=edit_fields)) as reader: + sample = next(reader) + assert sample.id.dtype.type == null_column_dtype + + +@pytest.mark.parametrize('np_dtype, pa_dtype, null_value', + ((np.float32, pa.float32(), np.nan), (np.object_, pa.string(), None))) +@pytest.mark.parametrize('reader_factory', _D) +def test_entire_column_of_typed_nulls(reader_factory, np_dtype, pa_dtype, null_value, tmp_path): + path = tmp_path / "dataset" + schema = pa.schema([pa.field('all_nulls', pa_dtype)]) + pq.write_table(pa.Table.from_pydict({"all_nulls": [null_value] * 10}, schema=schema), path) + + with reader_factory("file:///" + str(path)) as reader: + sample = next(reader) + assert sample.all_nulls.dtype == np_dtype + if np_dtype == np.float32: + assert np.all(np.isnan(sample.all_nulls)) + elif np_dtype == np.object_: + assert all(v is None for v in sample.all_nulls) + else: + assert False, "Unexpected np_dtype" + + +@pytest.mark.parametrize('reader_factory', _D) +def test_column_with_list_of_strings_some_are_null(reader_factory, tmp_path): + path = tmp_path / "dataset" + schema = pa.schema([pa.field('some_nulls', pa.list_(pa.string(), -1))]) + pq.write_table(pa.Table.from_pydict({"some_nulls": [['a0', 'a1'], ['b0', None], [None, None]]}, schema=schema), + path) + + with reader_factory("file:///" + str(path)) as reader: + sample = next(reader) + assert sample.some_nulls.dtype == np.object + np.testing.assert_equal(sample.some_nulls, [['a0', 'a1'], ['b0', None], [None, None]]) + + +@pytest.mark.parametrize('reader_factory', _D) +def test_transform_spec_returns_all_none_values_in_a_list_field(scalar_dataset, reader_factory): + def fill_id_with_nones(x): + return pd.DataFrame({'int_fixed_size_list': [[None for _ in range(3)]] * len(x)}) + + with reader_factory(scalar_dataset.url, schema_fields=["int_fixed_size_list"], + transform_spec=TransformSpec(fill_id_with_nones)) as reader: + sample = next(reader) + # The type will be float as an numpy converts integer array that has null + # values into a float64 array with nan as nulls + assert sample.int_fixed_size_list.dtype.type == np.float64 + + @pytest.mark.parametrize('reader_factory', _D) @pytest.mark.parametrize('partition_by', [['string'], ['id'], ['string', 'id']]) def test_string_partition(reader_factory, tmpdir, partition_by): diff --git a/petastorm/tests/test_unischema.py b/petastorm/tests/test_unischema.py index f7dcf010..f8ebf689 100644 --- a/petastorm/tests/test_unischema.py +++ b/petastorm/tests/test_unischema.py @@ -15,6 +15,7 @@ from __future__ import division from decimal import Decimal +from typing import Callable import numpy as np import pyarrow as pa @@ -496,3 +497,40 @@ def test_fullmatch(): assert not _fullmatch('abc', 'xyz') assert not _fullmatch('abc', 'abcx') assert not _fullmatch('abc', 'xabc') + + +@pytest.mark.parametrize("np_type, pa_is_type_func", [[np.int32, pa.types.is_int32], [np.float32, pa.types.is_float32]]) +def test_as_parrow_schema(np_type, pa_is_type_func: Callable): + """Try using 'as_spark_schema' function""" + schema = Unischema('TestSchema', [ + UnischemaField('scalar', np_type, ()), + UnischemaField('list_var', np_type, (None,)), + UnischemaField('list_fixed', np_type, (42,)), + UnischemaField('matrix_var', np_type, (None, 42,)), + UnischemaField('matrix_fixed', np_type, (42, 43,)), + ]) + + pa_schema = schema.as_pyarrow_schema() + assert not pa.types.is_list(pa_schema.field("scalar").type) + assert pa.types.is_list(pa_schema.field("list_var").type) + assert pa.types.is_list(pa_schema.field("matrix_var").type) + assert pa.types.is_fixed_size_list(pa_schema.field("list_fixed").type) + assert pa.types.is_fixed_size_list(pa_schema.field("matrix_fixed").type) + + assert pa_schema.field("list_fixed").type.list_size == 42 + assert pa_schema.field("matrix_fixed").type.list_size == 42 * 43 + + assert pa_is_type_func(pa_schema.field("scalar").type) + assert pa_is_type_func(pa_schema.field("list_var").type.value_type) + assert pa_is_type_func(pa_schema.field("list_fixed").type.value_type) + assert pa_is_type_func(pa_schema.field("matrix_var").type.value_type) + assert pa_is_type_func(pa_schema.field("matrix_fixed").type.value_type) + + +def test_as_pyarrow_schema_decimal_not_supported(): + schema = Unischema('TestSchema', [ + UnischemaField('decimal_scalar', Decimal, (), ScalarCodec(IntegerType()), False), + UnischemaField('decimal_list', Decimal, (None,), ScalarCodec(IntegerType()), False), + ]) + with pytest.raises(NotImplementedError, match="Can not convert a decimal type"): + schema.as_pyarrow_schema() diff --git a/petastorm/unischema.py b/petastorm/unischema.py index ba9ef029..9d06e9db 100644 --- a/petastorm/unischema.py +++ b/petastorm/unischema.py @@ -16,6 +16,7 @@ in several different python libraries. Currently supported are pyspark, tensorflow, and numpy. """ import copy +import decimal import re import sys import warnings @@ -280,6 +281,11 @@ def as_spark_schema(self): return sql_types.StructType(schema_entries) + def as_pyarrow_schema(self) -> pa.Schema: + """Converts a Unischema into a pyarrow schema""" + fields = [(field.name, _arrow_from_numpy_type(field)) for field in self._fields.values()] + return pa.schema(fields) + def make_namedtuple(self, **kargs): """Returns schema as a namedtuple type intialized with arguments passed to this method. @@ -341,7 +347,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=True): continue field_shape = (None,) try: - np_type = _numpy_and_codec_from_arrow_type(field_type) + np_type = _numpy_from_arrow_type(field_type) except ValueError: if omit_unsupported_fields: warnings.warn('Column %r has an unsupported field %r. Ignoring...' @@ -464,39 +470,54 @@ def match_unischema_fields(schema, field_regex): return [] -def _numpy_and_codec_from_arrow_type(field_type): - from pyarrow import types - - if types.is_int8(field_type): +def _numpy_from_arrow_type(field_type): + if pa.types.is_int8(field_type): np_type = np.int8 - elif types.is_uint8(field_type): + elif pa.types.is_uint8(field_type): np_type = np.uint8 - elif types.is_int16(field_type): + elif pa.types.is_int16(field_type): np_type = np.int16 - elif types.is_int32(field_type): + elif pa.types.is_int32(field_type): np_type = np.int32 - elif types.is_int64(field_type): + elif pa.types.is_int64(field_type): np_type = np.int64 - elif types.is_string(field_type): + elif pa.types.is_string(field_type): np_type = np.unicode_ - elif types.is_boolean(field_type): + elif pa.types.is_boolean(field_type): np_type = np.bool_ - elif types.is_float32(field_type): + elif pa.types.is_float32(field_type): np_type = np.float32 - elif types.is_float64(field_type): + elif pa.types.is_float64(field_type): np_type = np.float64 - elif types.is_decimal(field_type): + elif pa.types.is_decimal(field_type): np_type = Decimal - elif types.is_binary(field_type): + elif pa.types.is_binary(field_type): np_type = np.string_ - elif types.is_fixed_size_binary(field_type): + elif pa.types.is_fixed_size_binary(field_type): np_type = np.string_ - elif types.is_date(field_type): + elif pa.types.is_date(field_type): np_type = np.datetime64 - elif types.is_timestamp(field_type): + elif pa.types.is_timestamp(field_type): np_type = np.datetime64 - elif types.is_list(field_type): - np_type = _numpy_and_codec_from_arrow_type(field_type.value_type) + elif pa.types.is_list(field_type): + np_type = _numpy_from_arrow_type(field_type.value_type) else: raise ValueError('Cannot auto-create unischema due to unsupported column type {}'.format(field_type)) return np_type + + +def _arrow_from_numpy_type(field: UnischemaField) -> pa.DataType: + if field.numpy_dtype == decimal.Decimal: + raise NotImplementedError( + "Can not convert a decimal type to pyarrow's pa.decimal128 since " + "precision and scale are not stored in unischema.") + else: + if field.shape != (): + int_list_size = -1 if any(d is None for d in field.shape) else np.prod(field.shape) # type: ignore + + return pa.list_(pa.from_numpy_dtype(field.numpy_dtype), int_list_size) + else: + if field.numpy_dtype == np.datetime64: + return pa.timestamp("ns") + else: + return pa.from_numpy_dtype(field.numpy_dtype)