Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix type of the a batch returned by make_batch_reader when TransformSpec's function returns column with all values being None #750

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release notes

Release 0.12.1 (unreleased)
===========================
- `PR 730 <https://github.com/uber/petastorm/pull/730>`_ (resolves `PR 744 <https://github.com/uber/petastorm/issues/744>`_): Fix column type returned by a ``make_batch_reader`` when TransformSpec function sets all column values to ``None``.


Release 0.12.0
Expand All @@ -33,6 +34,7 @@ Deprecated features

Release 0.11.5
===========================
<<<<<<< HEAD
- `PR 746 <https://github.com/uber/petastorm/pull/746>`_: Import ABC from collections.abc for Python 3.10 compatibility.
- `PR 757 <https://github.com/uber/petastorm/pull/757>`_: Replace process_iter by pid_exists.
- `PR 762 <https://github.com/uber/petastorm/pull/762>`_: PyTorch: improve memory-efficiency in batched non-shuffle buffer.
Expand Down
11 changes: 7 additions & 4 deletions petastorm/arrow_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def read_next(self, workers_pool, schema, ngram):
column_as_numpy = column_as_pandas

if pa.types.is_string(column.type):
result_dict[column_name] = column_as_numpy.astype(np.unicode_)
elif pa.types.is_list(column.type):
result_dict[column_name] = column_as_numpy.astype(np.object)
elif pa.types.is_list(column.type) or pa.types.is_fixed_size_list(column.type):
# Assuming all lists are of the same length, hence we can collate them into a matrix
list_of_lists = column_as_numpy
try:
Expand Down Expand Up @@ -190,7 +190,7 @@ def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
result = self._read_with_shuffle_row_drop(piece, pq_file, column_names_in_schema, shuffle_row_drop_range)

if self._transform_spec:
result_as_pandas = result.to_pandas()
result_as_pandas = result.to_pandas(date_as_object=False)
# A user may omit `func` value if they intend just to delete some fields using the TransformSpec
if self._transform_spec.func:
transformed_result = self._transform_spec.func(result_as_pandas)
Expand Down Expand Up @@ -219,7 +219,10 @@ def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
transformed_result[field.name] = transformed_result[field.name] \
.map(lambda x, f=field: self._check_shape_and_ravel(x, f))

result = pa.Table.from_pandas(transformed_result, preserve_index=False)
# Explicitly specify schema since in a case when all column values are None, pyarrow
# would not be able to properly detect the type
result = pa.Table.from_pandas(transformed_result, preserve_index=False,
schema=self._transformed_schema.as_pyarrow_schema())

return result

Expand Down
60 changes: 60 additions & 0 deletions petastorm/tests/test_parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest
from pyarrow import parquet as pq

Expand Down Expand Up @@ -209,6 +210,65 @@ def preproc_fn1(x):
(3, 4, 5) == sample['tensor_col_2'].shape[1:]


@pytest.mark.parametrize('null_column_dtype', [np.float64, np.unicode_])
@pytest.mark.parametrize('reader_factory', _D)
def test_transform_spec_returns_all_none_values(scalar_dataset, null_column_dtype, reader_factory):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases assume the transform_spec func is creating the null values. In the more common case, there are missing values in fields unedited by the transform_spec. I believe this solution already addresses both cases but it would be good to demonstrate this in the tests either with an additional test case or additional non-edited fields in this test case.

def fill_id_with_nones(x):
return pd.DataFrame({'id': [None] * len(x)})

edit_fields = [('id', null_column_dtype, (), True)]

with reader_factory(scalar_dataset.url, schema_fields=["id"],
transform_spec=TransformSpec(fill_id_with_nones, edit_fields=edit_fields)) as reader:
sample = next(reader)
assert sample.id.dtype.type == null_column_dtype


@pytest.mark.parametrize('np_dtype, pa_dtype, null_value',
((np.float32, pa.float32(), np.nan), (np.object_, pa.string(), None)))
@pytest.mark.parametrize('reader_factory', _D)
def test_entire_column_of_typed_nulls(reader_factory, np_dtype, pa_dtype, null_value, tmp_path):
path = tmp_path / "dataset"
schema = pa.schema([pa.field('all_nulls', pa_dtype)])
pq.write_table(pa.Table.from_pydict({"all_nulls": [null_value] * 10}, schema=schema), path)

with reader_factory("file:///" + str(path)) as reader:
sample = next(reader)
assert sample.all_nulls.dtype == np_dtype
if np_dtype == np.float32:
assert np.all(np.isnan(sample.all_nulls))
elif np_dtype == np.object_:
assert all(v is None for v in sample.all_nulls)
else:
assert False, "Unexpected np_dtype"


@pytest.mark.parametrize('reader_factory', _D)
def test_column_with_list_of_strings_some_are_null(reader_factory, tmp_path):
path = tmp_path / "dataset"
schema = pa.schema([pa.field('some_nulls', pa.list_(pa.string(), -1))])
pq.write_table(pa.Table.from_pydict({"some_nulls": [['a0', 'a1'], ['b0', None], [None, None]]}, schema=schema),
path)

with reader_factory("file:///" + str(path)) as reader:
sample = next(reader)
assert sample.some_nulls.dtype == np.object
np.testing.assert_equal(sample.some_nulls, [['a0', 'a1'], ['b0', None], [None, None]])


@pytest.mark.parametrize('reader_factory', _D)
def test_transform_spec_returns_all_none_values_in_a_list_field(scalar_dataset, reader_factory):
def fill_id_with_nones(x):
return pd.DataFrame({'int_fixed_size_list': [[None for _ in range(3)]] * len(x)})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also ran into the NoneType issue with lists of strings. Consider adding string types to the test as well.

The NoneType problem also occurs when only some of the values in the list are None, e.g. ['a', 'b', None]. What about a test_transform_spec_returns_some_none_values_in_a_list_field?


with reader_factory(scalar_dataset.url, schema_fields=["int_fixed_size_list"],
transform_spec=TransformSpec(fill_id_with_nones)) as reader:
sample = next(reader)
# The type will be float as an numpy converts integer array that has null
# values into a float64 array with nan as nulls
assert sample.int_fixed_size_list.dtype.type == np.float64


@pytest.mark.parametrize('reader_factory', _D)
@pytest.mark.parametrize('partition_by', [['string'], ['id'], ['string', 'id']])
def test_string_partition(reader_factory, tmpdir, partition_by):
Expand Down
38 changes: 38 additions & 0 deletions petastorm/tests/test_unischema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import division

from decimal import Decimal
from typing import Callable

import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -496,3 +497,40 @@ def test_fullmatch():
assert not _fullmatch('abc', 'xyz')
assert not _fullmatch('abc', 'abcx')
assert not _fullmatch('abc', 'xabc')


@pytest.mark.parametrize("np_type, pa_is_type_func", [[np.int32, pa.types.is_int32], [np.float32, pa.types.is_float32]])
def test_as_parrow_schema(np_type, pa_is_type_func: Callable):
"""Try using 'as_spark_schema' function"""
schema = Unischema('TestSchema', [
UnischemaField('scalar', np_type, ()),
UnischemaField('list_var', np_type, (None,)),
UnischemaField('list_fixed', np_type, (42,)),
UnischemaField('matrix_var', np_type, (None, 42,)),
UnischemaField('matrix_fixed', np_type, (42, 43,)),
])

pa_schema = schema.as_pyarrow_schema()
assert not pa.types.is_list(pa_schema.field("scalar").type)
assert pa.types.is_list(pa_schema.field("list_var").type)
assert pa.types.is_list(pa_schema.field("matrix_var").type)
assert pa.types.is_fixed_size_list(pa_schema.field("list_fixed").type)
assert pa.types.is_fixed_size_list(pa_schema.field("matrix_fixed").type)

assert pa_schema.field("list_fixed").type.list_size == 42
assert pa_schema.field("matrix_fixed").type.list_size == 42 * 43

assert pa_is_type_func(pa_schema.field("scalar").type)
assert pa_is_type_func(pa_schema.field("list_var").type.value_type)
assert pa_is_type_func(pa_schema.field("list_fixed").type.value_type)
assert pa_is_type_func(pa_schema.field("matrix_var").type.value_type)
assert pa_is_type_func(pa_schema.field("matrix_fixed").type.value_type)


def test_as_pyarrow_schema_decimal_not_supported():
schema = Unischema('TestSchema', [
UnischemaField('decimal_scalar', Decimal, (), ScalarCodec(IntegerType()), False),
UnischemaField('decimal_list', Decimal, (None,), ScalarCodec(IntegerType()), False),
])
with pytest.raises(NotImplementedError, match="Can not convert a decimal type"):
schema.as_pyarrow_schema()
61 changes: 41 additions & 20 deletions petastorm/unischema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
in several different python libraries. Currently supported are pyspark, tensorflow, and numpy.
"""
import copy
import decimal
import re
import sys
import warnings
Expand Down Expand Up @@ -280,6 +281,11 @@ def as_spark_schema(self):

return sql_types.StructType(schema_entries)

def as_pyarrow_schema(self) -> pa.Schema:
"""Converts a Unischema into a pyarrow schema"""
fields = [(field.name, _arrow_from_numpy_type(field)) for field in self._fields.values()]
return pa.schema(fields)

def make_namedtuple(self, **kargs):
"""Returns schema as a namedtuple type intialized with arguments passed to this method.

Expand Down Expand Up @@ -341,7 +347,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=True):
continue
field_shape = (None,)
try:
np_type = _numpy_and_codec_from_arrow_type(field_type)
np_type = _numpy_from_arrow_type(field_type)
except ValueError:
if omit_unsupported_fields:
warnings.warn('Column %r has an unsupported field %r. Ignoring...'
Expand Down Expand Up @@ -464,39 +470,54 @@ def match_unischema_fields(schema, field_regex):
return []


def _numpy_and_codec_from_arrow_type(field_type):
from pyarrow import types

if types.is_int8(field_type):
def _numpy_from_arrow_type(field_type):
if pa.types.is_int8(field_type):
np_type = np.int8
elif types.is_uint8(field_type):
elif pa.types.is_uint8(field_type):
np_type = np.uint8
elif types.is_int16(field_type):
elif pa.types.is_int16(field_type):
np_type = np.int16
elif types.is_int32(field_type):
elif pa.types.is_int32(field_type):
np_type = np.int32
elif types.is_int64(field_type):
elif pa.types.is_int64(field_type):
np_type = np.int64
elif types.is_string(field_type):
elif pa.types.is_string(field_type):
np_type = np.unicode_
elif types.is_boolean(field_type):
elif pa.types.is_boolean(field_type):
np_type = np.bool_
elif types.is_float32(field_type):
elif pa.types.is_float32(field_type):
np_type = np.float32
elif types.is_float64(field_type):
elif pa.types.is_float64(field_type):
np_type = np.float64
elif types.is_decimal(field_type):
elif pa.types.is_decimal(field_type):
np_type = Decimal
elif types.is_binary(field_type):
elif pa.types.is_binary(field_type):
np_type = np.string_
elif types.is_fixed_size_binary(field_type):
elif pa.types.is_fixed_size_binary(field_type):
np_type = np.string_
elif types.is_date(field_type):
elif pa.types.is_date(field_type):
np_type = np.datetime64
elif types.is_timestamp(field_type):
elif pa.types.is_timestamp(field_type):
np_type = np.datetime64
elif types.is_list(field_type):
np_type = _numpy_and_codec_from_arrow_type(field_type.value_type)
elif pa.types.is_list(field_type):
np_type = _numpy_from_arrow_type(field_type.value_type)
else:
raise ValueError('Cannot auto-create unischema due to unsupported column type {}'.format(field_type))
return np_type


def _arrow_from_numpy_type(field: UnischemaField) -> pa.DataType:
if field.numpy_dtype == decimal.Decimal:
raise NotImplementedError(
"Can not convert a decimal type to pyarrow's pa.decimal128 since "
"precision and scale are not stored in unischema.")
else:
if field.shape != ():
int_list_size = -1 if any(d is None for d in field.shape) else np.prod(field.shape) # type: ignore

return pa.list_(pa.from_numpy_dtype(field.numpy_dtype), int_list_size)
else:
if field.numpy_dtype == np.datetime64:
return pa.timestamp("ns")
else:
return pa.from_numpy_dtype(field.numpy_dtype)