From 921872d06d78d6420d7d740760016e8d76a52634 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 13 Dec 2022 12:01:28 -0800 Subject: [PATCH 01/13] initial progress Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 55 ++++++++++--------- .../_internal/arrow_ops/transform_pyarrow.py | 46 +++++++++++----- python/ray/data/_internal/plan.py | 14 ++++- python/ray/data/tests/test_stats.py | 10 ++++ 4 files changed, 83 insertions(+), 42 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index 91b5daecbefe..a225ee57d174 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -132,6 +132,30 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) + @classmethod + def _need_variable_shaped_tensor_array( + cls, array_types: Sequence[Union["ArrowTensorType", "ArrowVariableShapedTensorType"]], + ) -> bool: + """ + Whether the provided list of tensor types need a variable-shaped + representation when concatenating or chunking. + + If one or more of the tensor types in `array_types` are variable-shaped + and/or any of the tensor arrays have a different shape than the others, a variable-shaped + tensor array representation will be required and this method will return True. + """ + needs_variable_shaped = False + shape = None + for arr_type in array_types: + if isinstance(arr_type, ArrowVariableShapedTensorType) or ( + shape is not None and arr_type.shape != shape + ): + needs_variable_shaped = True + break + if shape is None: + shape = arr_type.shape + return needs_variable_shaped + if _arrow_extension_scalars_are_subclassable(): # TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+. @@ -410,7 +434,8 @@ def _concat_same_type( of the tensor arrays have a different shape than the others, a variable-shaped tensor array will be returned. """ - if cls._need_variable_shaped_tensor_array(to_concat): + to_concat_types = [arr.type for arr in to_concat] + if ArrowTensorType._need_variable_shaped_tensor_array(to_concat_types): # Need variable-shaped tensor array. # TODO(Clark): Eliminate this NumPy roundtrip by directly constructing the # underlying storage array buffers (NumPy roundtrip will not be zero-copy @@ -432,7 +457,8 @@ def _chunk_tensor_arrays( """ Create a ChunkedArray from multiple tensor arrays. """ - if cls._need_variable_shaped_tensor_array(arrs): + arrs_types = [arr.type for arr in arrs] + if ArrowTensorType._need_variable_shaped_tensor_array(arrs_types): new_arrs = [] for a in arrs: if isinstance(a.type, ArrowTensorType): @@ -442,31 +468,6 @@ def _chunk_tensor_arrays( arrs = new_arrs return pa.chunked_array(arrs) - @classmethod - def _need_variable_shaped_tensor_array( - cls, arrs: Sequence[Union["ArrowTensorArray", "ArrowVariableShapedTensorArray"]] - ) -> bool: - """ - Whether the provided tensor arrays need a variable-shaped representation when - concatenating or chunking. - - If one or more of the tensor arrays in arrs are variable-shaped and/or any of - the tensor arrays have a different shape than the others, a variable-shaped - tensor array representation will be required and this method will return True. - """ - needs_variable_shaped = False - shape = None - for a in arrs: - a_type = a.type - if isinstance(a_type, ArrowVariableShapedTensorType) or ( - shape is not None and a_type.shape != shape - ): - needs_variable_shaped = True - break - if shape is None: - shape = a_type.shape - return needs_variable_shaped - def to_variable_shaped_tensor_array(self) -> "ArrowVariableShapedTensorArray": """ Convert this tensor array to a variable-shaped tensor array. diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 2bb61e496090..b2fb6867dfbb 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -1,4 +1,6 @@ -from typing import TYPE_CHECKING, List, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +from ray.air.util.tensor_extensions.arrow import ArrowTensorArray, ArrowTensorType, ArrowVariableShapedTensorType try: import pyarrow @@ -45,6 +47,23 @@ def take_table( return table +def unify_schemas( + schemas: List["pyarrow.Schema"], +) -> "pyarrow.Schema": + # schemas_to_unify = [] + for col_idx in range(len(schemas[0].types)): + column_types = [s.types[col_idx] for s in schemas] + if ArrowTensorType._need_variable_shaped_tensor_array(column_types): + dtype = column_types[col_idx].storage_type + new_type = ArrowVariableShapedTensorType(dtype=dtype, ndim = len(column_types)) + for schema in schemas: + var_shaped_col = schema.field(col_idx).with_type(new_type) + schema = schema.set(col_idx, var_shaped_col) + + # Let Arrow unify the schema of non-tensor extension type columns. + return pyarrow.unify_schemas(schemas) + + def _concatenate_chunked_arrays(arrs: "pyarrow.ChunkedArray") -> "pyarrow.ChunkedArray": """ Concatenate provided chunked arrays into a single chunked array. @@ -121,19 +140,20 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": else: col = _concatenate_chunked_arrays(col_chunked_arrays) cols.append(col) - # Unify schemas. - schemas = [] - for block in blocks: - schema = block.schema - # If concatenating uniform tensor columns results in a variable-shaped - # tensor columns, override the column type for all blocks. - if schema_tensor_field_overrides: - for idx, field in schema_tensor_field_overrides.items(): - schema = schema.set(idx, field) - schemas.append(schema) - # Let Arrow unify the schema of non-tensor extension type columns. - schema = pyarrow.unify_schemas(schemas) + # # Unify schemas. + # schemas = [] + # for block in blocks: + # schema = block.schema + # # If concatenating uniform tensor columns results in a variable-shaped + # # tensor columns, override the column type for all blocks. + # if schema_tensor_field_overrides: + # for idx, field in schema_tensor_field_overrides.items(): + # schema = schema.set(idx, field) + # schemas.append(schema) + # # Let Arrow unify the schema of non-tensor extension type columns. + # schema = pyarrow.unify_schemas(schemas) # Build the concatenated table. + schema = unify_schemas([b.schema for b in blocks]) table = pyarrow.Table.from_arrays(cols, schema=schema) # Validate table schema (this is a cheap check by default). table.validate() diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 8a2648a93712..f731f19499c0 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -16,6 +16,7 @@ ) import ray +from ray.data._internal.arrow_ops.transform_pyarrow import unify_schemas from ray.data._internal.block_list import BlockList from ray.data._internal.compute import ( UDF, @@ -261,17 +262,26 @@ def schema( metadata = blocks.get_metadata(fetch_if_missing=False) # Some blocks could be empty, in which case we cannot get their schema. # TODO(ekl) validate schema is the same across different blocks. + + # TODO(scott): apply schema unification here? instead of + # kicking out upon first block with valid schema + schemas_to_unify = [] for m in metadata: if m.schema is not None and (m.num_rows is None or m.num_rows > 0): - return m.schema + schemas_to_unify.append(m.schema) + if schemas_to_unify: + return unify_schemas(schemas_to_unify) if not fetch_if_missing: return None # Synchronously fetch the schema. # For lazy block lists, this launches read tasks and fetches block metadata # until we find valid block schema. + schemas_to_unify = [] for _, m in blocks.iter_blocks_with_metadata(): if m.schema is not None and (m.num_rows is None or m.num_rows > 0): - return m.schema + schemas_to_unify.append(m.schema) + if schemas_to_unify: + return unify_schemas(schemas_to_unify) return None def meta_count(self) -> Optional[int]: diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 1af089a25c75..636193e27117 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -3,6 +3,7 @@ import pytest import ray +from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorArray, ArrowVariableShapedTensorType from ray.data._internal.dataset_logger import DatasetLogger from ray.data.context import DatasetContext from ray.tests.conftest import * # noqa @@ -418,6 +419,15 @@ def consume(split): """ ) +def test_tensor(ray_start_regular_shared): + import numpy as np + ds = ray.data.from_items([{"spam": np.zeros((32, 32, 3))}, {"spam": np.zeros((64, 64, 3))}]) + print(ds.schema().types) + print(ds.schema().types[0].storage_type) + new_type = ds.schema().types[0].storage_type + assert ds.schema().types == ArrowVariableShapedTensorType(dtype=new_type, ndim=3) + + if __name__ == "__main__": import sys From d7a84728f6bee680f020418cffa268947ac58be3 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 13 Dec 2022 13:04:05 -0800 Subject: [PATCH 02/13] more scratch work Signed-off-by: Scott Lee --- .../data/_internal/arrow_ops/transform_pyarrow.py | 11 +++++++---- python/ray/data/tests/test_stats.py | 14 +++++++++----- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index b2fb6867dfbb..1c75ad8ab09a 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -50,18 +50,21 @@ def take_table( def unify_schemas( schemas: List["pyarrow.Schema"], ) -> "pyarrow.Schema": - # schemas_to_unify = [] + schemas_to_unify = [] for col_idx in range(len(schemas[0].types)): column_types = [s.types[col_idx] for s in schemas] if ArrowTensorType._need_variable_shaped_tensor_array(column_types): - dtype = column_types[col_idx].storage_type - new_type = ArrowVariableShapedTensorType(dtype=dtype, ndim = len(column_types)) + new_type = ArrowVariableShapedTensorType( + dtype=column_types[0].storage_type.value_type, + ndim =len(column_types[0].shape), + ) for schema in schemas: var_shaped_col = schema.field(col_idx).with_type(new_type) schema = schema.set(col_idx, var_shaped_col) + schemas_to_unify.append(schema) # Let Arrow unify the schema of non-tensor extension type columns. - return pyarrow.unify_schemas(schemas) + return pyarrow.unify_schemas(schemas_to_unify) def _concatenate_chunked_arrays(arrs: "pyarrow.ChunkedArray") -> "pyarrow.ChunkedArray": diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 636193e27117..95db5a9339e4 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -419,13 +419,17 @@ def consume(split): """ ) -def test_tensor(ray_start_regular_shared): +def test_ragged_tensors(ray_start_regular_shared): import numpy as np - ds = ray.data.from_items([{"spam": np.zeros((32, 32, 3))}, {"spam": np.zeros((64, 64, 3))}]) + ds = ray.data.from_items([ + {"spam": np.zeros((32, 32, 3))}, + {"spam": np.zeros((64, 64, 3))}, + ]) print(ds.schema().types) - print(ds.schema().types[0].storage_type) - new_type = ds.schema().types[0].storage_type - assert ds.schema().types == ArrowVariableShapedTensorType(dtype=new_type, ndim=3) + new_type = ds.schema().types[0].storage_type.value_type + assert ds.schema().types == [ + ArrowVariableShapedTensorType(dtype=new_type, ndim=3), + ] From e5094ef1a9e8f02cc7ab98f23896f0da5a88a16d Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 13 Dec 2022 16:00:40 -0800 Subject: [PATCH 03/13] add scalar_type property in ArrowTensorType to simplify unify_schemas() logic Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 11 +++++ .../_internal/arrow_ops/transform_pyarrow.py | 46 +++++++++++-------- python/ray/data/tests/test_dataset.py | 13 ++++++ python/ray/data/tests/test_stats.py | 13 ------ 4 files changed, 51 insertions(+), 32 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index a225ee57d174..73851d52fb16 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -79,6 +79,11 @@ def shape(self): """ return self._shape + @property + def scalar_type(self): + """Returns the type of the underlying tensor elements.""" + return self.storage_type.value_type + def to_pandas_dtype(self): """ Convert Arrow extension type to corresponding Pandas dtype. @@ -530,6 +535,12 @@ def ndim(self) -> int: """Return the number of dimensions in the tensor elements.""" return self._ndim + @property + def scalar_type(self): + """Returns the type of the underlying tensor elements.""" + data_field_index = self.storage_type.get_field_index("data") + return self.storage_type[data_field_index].type.value_type + def __reduce__(self): return ( ArrowVariableShapedTensorType, diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 1c75ad8ab09a..ef2496be9cde 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -51,18 +51,26 @@ def unify_schemas( schemas: List["pyarrow.Schema"], ) -> "pyarrow.Schema": schemas_to_unify = [] - for col_idx in range(len(schemas[0].types)): - column_types = [s.types[col_idx] for s in schemas] - if ArrowTensorType._need_variable_shaped_tensor_array(column_types): - new_type = ArrowVariableShapedTensorType( - dtype=column_types[0].storage_type.value_type, - ndim =len(column_types[0].shape), - ) + schema_tensor_field_overrides = {} + + if type(schemas[0]) == pyarrow.Schema: + if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schemas[0].types): + for col_idx in range(len(schemas[0].types)): + column_types = [s.types[col_idx] for s in schemas] + if ArrowTensorType._need_variable_shaped_tensor_array(column_types): + new_type = ArrowVariableShapedTensorType( + dtype=column_types[0].scalar_type, + ndim =len(column_types[0].shape), + ) + schema_tensor_field_overrides[col_idx] = new_type for schema in schemas: - var_shaped_col = schema.field(col_idx).with_type(new_type) - schema = schema.set(col_idx, var_shaped_col) - schemas_to_unify.append(schema) - + for col_idx, col_new_type in schema_tensor_field_overrides.items(): + var_shaped_col = schema.field(col_idx).with_type(col_new_type) + schema = schema.set(col_idx, var_shaped_col) + schemas_to_unify.append(schema) + else: + schemas_to_unify = schemas + print("===> schemas_to_unify:", schemas_to_unify) # Let Arrow unify the schema of non-tensor extension type columns. return pyarrow.unify_schemas(schemas_to_unify) @@ -116,7 +124,7 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schema.types): # Custom handling for extension array columns. cols = [] - schema_tensor_field_overrides = {} + # schema_tensor_field_overrides = {} for col_name in schema.names: col_chunked_arrays = [] for block in blocks: @@ -133,13 +141,13 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": col = ArrowTensorArray._chunk_tensor_arrays( [chunk for ca in col_chunked_arrays for chunk in ca.chunks] ) - if schema.field(col_name).type != col.type: - # Ensure that the field's type is properly updated in the schema if - # a collection of homogeneous-shaped columns resulted in a - # variable-shaped tensor column once concatenated. - new_field = schema.field(col_name).with_type(col.type) - field_idx = schema.get_field_index(col_name) - schema_tensor_field_overrides[field_idx] = new_field + # if schema.field(col_name).type != col.type: + # # Ensure that the field's type is properly updated in the schema if + # # a collection of homogeneous-shaped columns resulted in a + # # variable-shaped tensor column once concatenated. + # new_field = schema.field(col_name).with_type(col.type) + # field_idx = schema.get_field_index(col_name) + # schema_tensor_field_overrides[field_idx] = new_field else: col = _concatenate_chunked_arrays(col_chunked_arrays) cols.append(col) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index b67315e6da1e..7f361aad06e1 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -13,6 +13,7 @@ import ray from ray._private.test_utils import wait_for_condition +from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType from ray.data._internal.stats import _StatsActor from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_builder import BlockBuilder @@ -5363,6 +5364,18 @@ def test_dataset_schema_after_read_stats(ray_start_cluster): assert schema == ds.schema() +def test_ragged_tensors(ray_start_regular_shared): + import numpy as np + ds = ray.data.from_items([ + {"spam": np.zeros((32, 32, 3))}, + {"spam": np.zeros((64, 64, 3))}, + ]) + new_type = ds.schema().types[0].scalar_type + assert ds.schema().types == [ + ArrowVariableShapedTensorType(dtype=new_type, ndim=3), + ] + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 95db5a9339e4..c484d1488350 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -419,19 +419,6 @@ def consume(split): """ ) -def test_ragged_tensors(ray_start_regular_shared): - import numpy as np - ds = ray.data.from_items([ - {"spam": np.zeros((32, 32, 3))}, - {"spam": np.zeros((64, 64, 3))}, - ]) - print(ds.schema().types) - new_type = ds.schema().types[0].storage_type.value_type - assert ds.schema().types == [ - ArrowVariableShapedTensorType(dtype=new_type, ndim=3), - ] - - if __name__ == "__main__": import sys From 9a962c8a772b0a55b47fde675f33b24b70cd6c54 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 13 Dec 2022 16:10:55 -0800 Subject: [PATCH 04/13] clean up and format Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 22 +++++++++---- .../_internal/arrow_ops/transform_pyarrow.py | 32 +++++-------------- python/ray/data/_internal/plan.py | 2 -- python/ray/data/tests/test_dataset.py | 11 ++++--- python/ray/data/tests/test_stats.py | 1 - 5 files changed, 31 insertions(+), 37 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index 73851d52fb16..3fcce63edc19 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -139,15 +139,25 @@ def __repr__(self) -> str: @classmethod def _need_variable_shaped_tensor_array( - cls, array_types: Sequence[Union["ArrowTensorType", "ArrowVariableShapedTensorType"]], + cls, + array_types: Sequence[ + Union["ArrowTensorType", "ArrowVariableShapedTensorType"] + ], ) -> bool: """ - Whether the provided list of tensor types need a variable-shaped - representation when concatenating or chunking. + Whether the provided list of tensor types need a variable-shaped + representation (i.e. `ArrowVariableShapedTensorType`) when concatenating + or chunking. If one or more of the tensor types in `array_types` are + variable-shaped and/or any of the tensor arrays have a different shape + than the others, a variable-shaped tensor array representation will be + required and this method will return True. + + Args: + array_types: List of tensor types to check if a variable-shaped + representation is required for concatenation - If one or more of the tensor types in `array_types` are variable-shaped - and/or any of the tensor arrays have a different shape than the others, a variable-shaped - tensor array representation will be required and this method will return True. + Returns: True if concatenating arrays with types `array_types` requires + a variable-shaped representation """ needs_variable_shaped = False shape = None diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index ef2496be9cde..83247a5e2043 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -1,6 +1,9 @@ -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, List, Union -from ray.air.util.tensor_extensions.arrow import ArrowTensorArray, ArrowTensorType, ArrowVariableShapedTensorType +from ray.air.util.tensor_extensions.arrow import ( + ArrowTensorType, + ArrowVariableShapedTensorType, +) try: import pyarrow @@ -60,7 +63,7 @@ def unify_schemas( if ArrowTensorType._need_variable_shaped_tensor_array(column_types): new_type = ArrowVariableShapedTensorType( dtype=column_types[0].scalar_type, - ndim =len(column_types[0].shape), + ndim=len(column_types[0].shape), ) schema_tensor_field_overrides[col_idx] = new_type for schema in schemas: @@ -124,7 +127,6 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schema.types): # Custom handling for extension array columns. cols = [] - # schema_tensor_field_overrides = {} for col_name in schema.names: col_chunked_arrays = [] for block in blocks: @@ -141,30 +143,12 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": col = ArrowTensorArray._chunk_tensor_arrays( [chunk for ca in col_chunked_arrays for chunk in ca.chunks] ) - # if schema.field(col_name).type != col.type: - # # Ensure that the field's type is properly updated in the schema if - # # a collection of homogeneous-shaped columns resulted in a - # # variable-shaped tensor column once concatenated. - # new_field = schema.field(col_name).with_type(col.type) - # field_idx = schema.get_field_index(col_name) - # schema_tensor_field_overrides[field_idx] = new_field else: col = _concatenate_chunked_arrays(col_chunked_arrays) cols.append(col) - # # Unify schemas. - # schemas = [] - # for block in blocks: - # schema = block.schema - # # If concatenating uniform tensor columns results in a variable-shaped - # # tensor columns, override the column type for all blocks. - # if schema_tensor_field_overrides: - # for idx, field in schema_tensor_field_overrides.items(): - # schema = schema.set(idx, field) - # schemas.append(schema) - # # Let Arrow unify the schema of non-tensor extension type columns. - # schema = pyarrow.unify_schemas(schemas) - # Build the concatenated table. + # Unify schemas. schema = unify_schemas([b.schema for b in blocks]) + # Build the concatenated table. table = pyarrow.Table.from_arrays(cols, schema=schema) # Validate table schema (this is a cheap check by default). table.validate() diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index f731f19499c0..f8d09233e483 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -263,8 +263,6 @@ def schema( # Some blocks could be empty, in which case we cannot get their schema. # TODO(ekl) validate schema is the same across different blocks. - # TODO(scott): apply schema unification here? instead of - # kicking out upon first block with valid schema schemas_to_unify = [] for m in metadata: if m.schema is not None and (m.num_rows is None or m.num_rows > 0): diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 7f361aad06e1..fd8676c92bc6 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5366,10 +5366,13 @@ def test_dataset_schema_after_read_stats(ray_start_cluster): def test_ragged_tensors(ray_start_regular_shared): import numpy as np - ds = ray.data.from_items([ - {"spam": np.zeros((32, 32, 3))}, - {"spam": np.zeros((64, 64, 3))}, - ]) + + ds = ray.data.from_items( + [ + {"spam": np.zeros((32, 32, 3))}, + {"spam": np.zeros((64, 64, 3))}, + ] + ) new_type = ds.schema().types[0].scalar_type assert ds.schema().types == [ ArrowVariableShapedTensorType(dtype=new_type, ndim=3), diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index c484d1488350..1af089a25c75 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -3,7 +3,6 @@ import pytest import ray -from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorArray, ArrowVariableShapedTensorType from ray.data._internal.dataset_logger import DatasetLogger from ray.data.context import DatasetContext from ray.tests.conftest import * # noqa From 2530d6a85d773fc6448f84c88532f16a1bccb3f1 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 14 Dec 2022 10:47:10 -0800 Subject: [PATCH 05/13] add check for same type on simple blocks, lazy block list support Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 5 ++- .../_internal/arrow_ops/transform_pyarrow.py | 37 ++++++++++--------- python/ray/data/_internal/plan.py | 25 ++++++++++--- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index 3fcce63edc19..eea6650b0965 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -145,7 +145,7 @@ def _need_variable_shaped_tensor_array( ], ) -> bool: """ - Whether the provided list of tensor types need a variable-shaped + Whether the provided list of tensor types needs a variable-shaped representation (i.e. `ArrowVariableShapedTensorType`) when concatenating or chunking. If one or more of the tensor types in `array_types` are variable-shaped and/or any of the tensor arrays have a different shape @@ -156,7 +156,8 @@ def _need_variable_shaped_tensor_array( array_types: List of tensor types to check if a variable-shaped representation is required for concatenation - Returns: True if concatenating arrays with types `array_types` requires + Returns: + True if concatenating arrays with types `array_types` requires a variable-shaped representation """ needs_variable_shaped = False diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 83247a5e2043..32b8a38b65fa 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -56,24 +56,25 @@ def unify_schemas( schemas_to_unify = [] schema_tensor_field_overrides = {} - if type(schemas[0]) == pyarrow.Schema: - if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schemas[0].types): - for col_idx in range(len(schemas[0].types)): - column_types = [s.types[col_idx] for s in schemas] - if ArrowTensorType._need_variable_shaped_tensor_array(column_types): - new_type = ArrowVariableShapedTensorType( - dtype=column_types[0].scalar_type, - ndim=len(column_types[0].shape), - ) - schema_tensor_field_overrides[col_idx] = new_type - for schema in schemas: - for col_idx, col_new_type in schema_tensor_field_overrides.items(): - var_shaped_col = schema.field(col_idx).with_type(col_new_type) - schema = schema.set(col_idx, var_shaped_col) - schemas_to_unify.append(schema) - else: - schemas_to_unify = schemas - print("===> schemas_to_unify:", schemas_to_unify) + if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schemas[0].types): + # If we have PyArrow extension types that may potentially be variable shaped, + # examine the first schema to gather the columns that need type conversions. + for col_idx in range(len(schemas[0].types)): + column_types = [s.types[col_idx] for s in schemas] + if ArrowTensorType._need_variable_shaped_tensor_array(column_types): + new_type = ArrowVariableShapedTensorType( + dtype=column_types[0].scalar_type, + ndim=len(column_types[0].shape), + ) + schema_tensor_field_overrides[col_idx] = new_type + # Go through all schemas and update the types of columns from the above loop. + for schema in schemas: + for col_idx, col_new_type in schema_tensor_field_overrides.items(): + var_shaped_col = schema.field(col_idx).with_type(col_new_type) + schema = schema.set(col_idx, var_shaped_col) + schemas_to_unify.append(schema) + else: + schemas_to_unify = schemas # Let Arrow unify the schema of non-tensor extension type columns. return pyarrow.unify_schemas(schemas_to_unify) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index f8d09233e483..c6dc64616c83 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -263,23 +263,38 @@ def schema( # Some blocks could be empty, in which case we cannot get their schema. # TODO(ekl) validate schema is the same across different blocks. + # First check if there are blocks with computed schemas, then unify + # valid schemas from all such blocks. schemas_to_unify = [] for m in metadata: if m.schema is not None and (m.num_rows is None or m.num_rows > 0): schemas_to_unify.append(m.schema) if schemas_to_unify: - return unify_schemas(schemas_to_unify) + # Check valid PyArrow installation before attempting schema unification + try: + import pyarrow as pa + except ImportError: + pa = None + # If the result contains PyArrow schemas, unify them + if pa is not None and isinstance(schemas_to_unify[0], pa.Schema): + return unify_schemas(schemas_to_unify) + # Otherwise, if the resulting schemas are simple types (e.g. int), + # validate that all blocks have the same type before returning. + if len(set(schemas_to_unify)) == 1: + return schemas_to_unify[0] + raise Exception( + f"Found blocks with different types in schemas: {schemas_to_unify}", + ) if not fetch_if_missing: return None # Synchronously fetch the schema. # For lazy block lists, this launches read tasks and fetches block metadata - # until we find valid block schema. + # until we find the first valid block schema. This is to minimize new + # computations when fetching the schema. schemas_to_unify = [] for _, m in blocks.iter_blocks_with_metadata(): if m.schema is not None and (m.num_rows is None or m.num_rows > 0): - schemas_to_unify.append(m.schema) - if schemas_to_unify: - return unify_schemas(schemas_to_unify) + return m.schema return None def meta_count(self) -> Optional[int]: From 2068d12deea021a43c5a2d4584a07802cbb00c84 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 14 Dec 2022 12:18:37 -0800 Subject: [PATCH 06/13] format Signed-off-by: Scott Lee --- python/ray/air/util/tensor_extensions/arrow.py | 2 +- .../data/_internal/arrow_ops/transform_pyarrow.py | 2 +- python/ray/data/_internal/plan.py | 14 +++++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index eea6650b0965..0120ff0be332 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -156,7 +156,7 @@ def _need_variable_shaped_tensor_array( array_types: List of tensor types to check if a variable-shaped representation is required for concatenation - Returns: + Returns: True if concatenating arrays with types `array_types` requires a variable-shaped representation """ diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 32b8a38b65fa..227777d154cf 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -67,7 +67,7 @@ def unify_schemas( ndim=len(column_types[0].shape), ) schema_tensor_field_overrides[col_idx] = new_type - # Go through all schemas and update the types of columns from the above loop. + # Go through all schemas and update the types of columns from the above loop. for schema in schemas: for col_idx, col_new_type in schema_tensor_field_overrides.items(): var_shaped_col = schema.field(col_idx).with_type(col_new_type) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index c6dc64616c83..bfa30729f29b 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -280,11 +280,15 @@ def schema( return unify_schemas(schemas_to_unify) # Otherwise, if the resulting schemas are simple types (e.g. int), # validate that all blocks have the same type before returning. - if len(set(schemas_to_unify)) == 1: - return schemas_to_unify[0] - raise Exception( - f"Found blocks with different types in schemas: {schemas_to_unify}", - ) + first_schema = schemas_to_unify[0] + for s in schemas_to_unify: + if s != first_schema: + raise Exception( + "Found blocks with different types in schemas: {}".format( + schemas_to_unify, + ) + ) + return first_schema if not fetch_if_missing: return None # Synchronously fetch the schema. From ce4336717d18a529d392b6952f9d81f791717853 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Thu, 15 Dec 2022 16:09:37 -0800 Subject: [PATCH 07/13] improved typechecking on unify_schemas Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 21 ++++++----- .../_internal/arrow_ops/transform_pyarrow.py | 36 +++++++++++++++---- python/ray/data/_internal/plan.py | 12 ++----- python/ray/data/tests/test_dataset.py | 4 +-- 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index 0120ff0be332..ef040127d1ae 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -160,17 +160,20 @@ def _need_variable_shaped_tensor_array( True if concatenating arrays with types `array_types` requires a variable-shaped representation """ - needs_variable_shaped = False shape = None for arr_type in array_types: - if isinstance(arr_type, ArrowVariableShapedTensorType) or ( - shape is not None and arr_type.shape != shape - ): - needs_variable_shaped = True - break - if shape is None: - shape = arr_type.shape - return needs_variable_shaped + # If at least one of the arrays is variable-shaped, we can immediately + # short-circuit since we require a variable-shaped representation. + if isinstance(arr_type, ArrowVariableShapedTensorType): + return True + # For PyArrow extension types, we need variable-shaped representation + # if all the shapes do not match. + if isinstance(arr_type, pa.ExtensionType): + if shape is not None and arr_type.shape != shape: + return True + if shape is None: + shape = arr_type.shape + return False if _arrow_extension_scalars_are_subclassable(): diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 227777d154cf..95d21996f2be 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -62,10 +62,18 @@ def unify_schemas( for col_idx in range(len(schemas[0].types)): column_types = [s.types[col_idx] for s in schemas] if ArrowTensorType._need_variable_shaped_tensor_array(column_types): - new_type = ArrowVariableShapedTensorType( - dtype=column_types[0].scalar_type, - ndim=len(column_types[0].shape), - ) + if isinstance(column_types[0], ArrowVariableShapedTensorType): + new_type = column_types[0] + elif isinstance(column_types[0], ArrowTensorType): + new_type = ArrowVariableShapedTensorType( + dtype=column_types[0].scalar_type, + ndim=len(column_types[0].shape), + ) + else: + raise Exception( + "Detected need for variable shaped tensor representation, " + + f"but schema is not ArrayTensorType: {column_types[0]}" + ) schema_tensor_field_overrides[col_idx] = new_type # Go through all schemas and update the types of columns from the above loop. for schema in schemas: @@ -147,8 +155,24 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": else: col = _concatenate_chunked_arrays(col_chunked_arrays) cols.append(col) - # Unify schemas. - schema = unify_schemas([b.schema for b in blocks]) + + # If the result contains PyArrow schemas, unify them + schemas_to_unify = [b.schema for b in blocks] + if pyarrow is not None and isinstance(schemas_to_unify[0], pyarrow.Schema): + schema = unify_schemas(schemas_to_unify) + else: + # Otherwise, if the resulting schemas are simple types (e.g. int), + # check that all blocks with valid schemas have the same type. + schema = schemas_to_unify[0] + if schema is not None: + NoneType = type(None) + for s in schemas_to_unify: + if s is not NoneType and s != schema: + raise Exception( + "Found blocks with different types in schemas: {}".format( + schemas_to_unify, + ) + ) # Build the concatenated table. table = pyarrow.Table.from_arrays(cols, schema=schema) # Validate table schema (this is a cheap check by default). diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index bfa30729f29b..02f32c4311ba 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -279,16 +279,8 @@ def schema( if pa is not None and isinstance(schemas_to_unify[0], pa.Schema): return unify_schemas(schemas_to_unify) # Otherwise, if the resulting schemas are simple types (e.g. int), - # validate that all blocks have the same type before returning. - first_schema = schemas_to_unify[0] - for s in schemas_to_unify: - if s != first_schema: - raise Exception( - "Found blocks with different types in schemas: {}".format( - schemas_to_unify, - ) - ) - return first_schema + # return the first schema. + return schemas_to_unify[0] if not fetch_if_missing: return None # Synchronously fetch the schema. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 694a835b5a87..bbef793ba83c 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5397,8 +5397,8 @@ def test_ragged_tensors(ray_start_regular_shared): ds = ray.data.from_items( [ - {"spam": np.zeros((32, 32, 3))}, - {"spam": np.zeros((64, 64, 3))}, + {"spam": np.zeros((32, 32, 5))}, + {"spam": np.zeros((64, 64, 5))}, ] ) new_type = ds.schema().types[0].scalar_type From c5f25612df91d23c12ab954933c472392d72dc8b Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Thu, 15 Dec 2022 17:07:45 -0800 Subject: [PATCH 08/13] check all blocks for potential pyarrow schema Signed-off-by: Scott Lee --- python/ray/data/_internal/arrow_ops/transform_pyarrow.py | 4 +++- python/ray/data/_internal/plan.py | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 95d21996f2be..ce3226691d58 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -158,7 +158,9 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": # If the result contains PyArrow schemas, unify them schemas_to_unify = [b.schema for b in blocks] - if pyarrow is not None and isinstance(schemas_to_unify[0], pyarrow.Schema): + if pyarrow is not None and any( + isinstance(s, pyarrow.Schema) for s in schemas_to_unify + ): schema = unify_schemas(schemas_to_unify) else: # Otherwise, if the resulting schemas are simple types (e.g. int), diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 02f32c4311ba..ae27c68640fd 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -276,7 +276,9 @@ def schema( except ImportError: pa = None # If the result contains PyArrow schemas, unify them - if pa is not None and isinstance(schemas_to_unify[0], pa.Schema): + if pa is not None and any( + isinstance(s, pa.Schema) for s in schemas_to_unify + ): return unify_schemas(schemas_to_unify) # Otherwise, if the resulting schemas are simple types (e.g. int), # return the first schema. @@ -287,7 +289,6 @@ def schema( # For lazy block lists, this launches read tasks and fetches block metadata # until we find the first valid block schema. This is to minimize new # computations when fetching the schema. - schemas_to_unify = [] for _, m in blocks.iter_blocks_with_metadata(): if m.schema is not None and (m.num_rows is None or m.num_rows > 0): return m.schema From cadbf24d86c2b34130cfde16040ef3b3c46c802e Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Mon, 19 Dec 2022 22:20:13 -0800 Subject: [PATCH 09/13] comments Signed-off-by: Scott Lee --- .../ray/air/util/tensor_extensions/arrow.py | 18 ++++++---- .../_internal/arrow_ops/transform_pyarrow.py | 36 ++++++++++--------- python/ray/data/_internal/plan.py | 2 +- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index ef040127d1ae..7412e2d30c23 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -166,13 +166,17 @@ def _need_variable_shaped_tensor_array( # short-circuit since we require a variable-shaped representation. if isinstance(arr_type, ArrowVariableShapedTensorType): return True - # For PyArrow extension types, we need variable-shaped representation - # if all the shapes do not match. - if isinstance(arr_type, pa.ExtensionType): - if shape is not None and arr_type.shape != shape: - return True - if shape is None: - shape = arr_type.shape + if not isinstance(arr_type, ArrowTensorType): + raise ValueError( + "All provided array types must be an instance of either " + "ArrowTensorType or ArrowVariableShapedTensorType, but " + f"got {arr_type}" + ) + # We need variable-shaped representation if any of the tensor arrays have + # different shapes. + if shape is not None and arr_type.shape != shape: + return True + shape = arr_type.shape return False diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index ce3226691d58..807eb4f029d8 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -57,22 +57,26 @@ def unify_schemas( schema_tensor_field_overrides = {} if any(isinstance(type_, pyarrow.ExtensionType) for type_ in schemas[0].types): - # If we have PyArrow extension types that may potentially be variable shaped, + # If we have pyarrow extension types that may potentially be variable shaped, # examine the first schema to gather the columns that need type conversions. for col_idx in range(len(schemas[0].types)): - column_types = [s.types[col_idx] for s in schemas] - if ArrowTensorType._need_variable_shaped_tensor_array(column_types): - if isinstance(column_types[0], ArrowVariableShapedTensorType): - new_type = column_types[0] - elif isinstance(column_types[0], ArrowTensorType): + tensor_array_types = [] + for s in schemas: + col_type = s.types[col_idx] + if isinstance(col_type, pyarrow.ExtensionType): + tensor_array_types.append(col_type) + if ArrowTensorType._need_variable_shaped_tensor_array(tensor_array_types): + if isinstance(tensor_array_types[0], ArrowVariableShapedTensorType): + new_type = tensor_array_types[0] + elif isinstance(tensor_array_types[0], ArrowTensorType): new_type = ArrowVariableShapedTensorType( - dtype=column_types[0].scalar_type, - ndim=len(column_types[0].shape), + dtype=tensor_array_types[0].scalar_type, + ndim=len(tensor_array_types[0].shape), ) else: - raise Exception( + raise ValueError( "Detected need for variable shaped tensor representation, " - + f"but schema is not ArrayTensorType: {column_types[0]}" + f"but schema is not ArrayTensorType: {tensor_array_types[0]}" ) schema_tensor_field_overrides[col_idx] = new_type # Go through all schemas and update the types of columns from the above loop. @@ -156,7 +160,7 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": col = _concatenate_chunked_arrays(col_chunked_arrays) cols.append(col) - # If the result contains PyArrow schemas, unify them + # If the result contains pyarrow schemas, unify them schemas_to_unify = [b.schema for b in blocks] if pyarrow is not None and any( isinstance(s, pyarrow.Schema) for s in schemas_to_unify @@ -167,13 +171,11 @@ def concat(blocks: List["pyarrow.Table"]) -> "pyarrow.Table": # check that all blocks with valid schemas have the same type. schema = schemas_to_unify[0] if schema is not None: - NoneType = type(None) for s in schemas_to_unify: - if s is not NoneType and s != schema: - raise Exception( - "Found blocks with different types in schemas: {}".format( - schemas_to_unify, - ) + if s is not None and s != schema: + raise ValueError( + "Found blocks with different types " + f"in schemas: {schemas_to_unify}" ) # Build the concatenated table. table = pyarrow.Table.from_arrays(cols, schema=schema) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index ae27c68640fd..592abf3486bc 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -270,7 +270,7 @@ def schema( if m.schema is not None and (m.num_rows is None or m.num_rows > 0): schemas_to_unify.append(m.schema) if schemas_to_unify: - # Check valid PyArrow installation before attempting schema unification + # Check valid pyarrow installation before attempting schema unification try: import pyarrow as pa except ImportError: From fb93cad506cbd84e829e62e76c523cd07bdf681f Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 20 Dec 2022 11:36:57 -0800 Subject: [PATCH 10/13] additional unit tests Signed-off-by: Scott Lee --- .../ray/data/tests/test_transform_pyarrow.py | 82 ++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_transform_pyarrow.py b/python/ray/data/tests/test_transform_pyarrow.py index 99c47c04bbc8..e3aa6c536cce 100644 --- a/python/ray/data/tests/test_transform_pyarrow.py +++ b/python/ray/data/tests/test_transform_pyarrow.py @@ -7,7 +7,7 @@ ArrowTensorType, ArrowVariableShapedTensorType, ) -from ray.data._internal.arrow_ops.transform_pyarrow import concat +from ray.data._internal.arrow_ops.transform_pyarrow import concat, unify_schemas def test_arrow_concat_empty(): @@ -176,6 +176,86 @@ def test_arrow_concat_tensor_extension_uniform_but_different(): # fails for this case. +def test_unify_schemas(): + # Unifying a schema with the same schema as itself + tensor_arr_1 = pa.schema([("tensor_arr", ArrowTensorType((3, 5), pa.int32()))]) + assert unify_schemas([tensor_arr_1, tensor_arr_1]) == tensor_arr_1 + + # Single columns with different shapes + tensor_arr_2 = pa.schema([("tensor_arr", ArrowTensorType((2, 1), pa.int32()))]) + contains_diff_shaped = [tensor_arr_1, tensor_arr_2] + assert unify_schemas(contains_diff_shaped) == pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ]) + + # Single columns with same shapes + tensor_arr_3 = pa.schema([("tensor_arr", ArrowTensorType((3, 5), pa.int32()))]) + contains_diff_types = [tensor_arr_1, tensor_arr_3] + assert unify_schemas(contains_diff_types) == pa.schema([ + ("tensor_arr", ArrowTensorType((3, 5), pa.int32())), + ]) + + # Single columns with a variable shaped tensor, same ndim + var_tensor_arr = pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ]) + contains_var_shaped = [tensor_arr_1, var_tensor_arr] + assert unify_schemas(contains_var_shaped) == pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ]) + + # Single columns with a variable shaped tensor, different ndim + var_tensor_arr_1d = pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 1)), + ]) + var_tensor_arr_3d = pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), + ]) + contains_1d2d = [tensor_arr_1, var_tensor_arr_1d] + assert unify_schemas(contains_1d2d) == pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ]) + contains_2d3d = [tensor_arr_1, var_tensor_arr_3d] + assert unify_schemas(contains_2d3d) == pa.schema([ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), + ]) + + # Multi-column schemas + multicol_schema_1 = pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ]) + multicol_schema_2 = pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowTensorType((9, 4, 1, 0, 5), pa.int16())), + ]) + assert unify_schemas([multicol_schema_1, multicol_schema_2]) == pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ]) + + multicol_schema_3 = pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ]) + assert unify_schemas([multicol_schema_1, multicol_schema_3]) == pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ]) + + # Unifying >2 schemas together + assert unify_schemas([multicol_schema_1, multicol_schema_2, multicol_schema_3]) == pa.schema([ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ]) + + if __name__ == "__main__": import sys From 669fcd3445dd007d7b33725905b0e0e7eb573e20 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 20 Dec 2022 11:43:54 -0800 Subject: [PATCH 11/13] comments, format, clean up Signed-off-by: Scott Lee --- .../_internal/arrow_ops/transform_pyarrow.py | 2 + python/ray/data/tests/test_dataset.py | 2 + .../ray/data/tests/test_transform_pyarrow.py | 138 +++++++++++------- 3 files changed, 88 insertions(+), 54 deletions(-) diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 807eb4f029d8..dfc9e09b0c8e 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -53,6 +53,8 @@ def take_table( def unify_schemas( schemas: List["pyarrow.Schema"], ) -> "pyarrow.Schema": + """Version of `pyarrow.unify_schemas()` which also handles checks for + variable-shaped tensors in the given schemas.""" schemas_to_unify = [] schema_tensor_field_overrides = {} diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index bbef793ba83c..461efd0b9682 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5393,6 +5393,8 @@ def test_dataset_schema_after_read_stats(ray_start_cluster): def test_ragged_tensors(ray_start_regular_shared): + """Test Arrow type promotion between ArrowTensorType and + ArrowVariableShapedTensorType when a column contains ragged tensors.""" import numpy as np ds = ray.data.from_items( diff --git a/python/ray/data/tests/test_transform_pyarrow.py b/python/ray/data/tests/test_transform_pyarrow.py index e3aa6c536cce..68b5a91594a0 100644 --- a/python/ray/data/tests/test_transform_pyarrow.py +++ b/python/ray/data/tests/test_transform_pyarrow.py @@ -184,76 +184,106 @@ def test_unify_schemas(): # Single columns with different shapes tensor_arr_2 = pa.schema([("tensor_arr", ArrowTensorType((2, 1), pa.int32()))]) contains_diff_shaped = [tensor_arr_1, tensor_arr_2] - assert unify_schemas(contains_diff_shaped) == pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), - ]) + assert unify_schemas(contains_diff_shaped) == pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ] + ) # Single columns with same shapes tensor_arr_3 = pa.schema([("tensor_arr", ArrowTensorType((3, 5), pa.int32()))]) contains_diff_types = [tensor_arr_1, tensor_arr_3] - assert unify_schemas(contains_diff_types) == pa.schema([ - ("tensor_arr", ArrowTensorType((3, 5), pa.int32())), - ]) + assert unify_schemas(contains_diff_types) == pa.schema( + [ + ("tensor_arr", ArrowTensorType((3, 5), pa.int32())), + ] + ) # Single columns with a variable shaped tensor, same ndim - var_tensor_arr = pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), - ]) + var_tensor_arr = pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ] + ) contains_var_shaped = [tensor_arr_1, var_tensor_arr] - assert unify_schemas(contains_var_shaped) == pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), - ]) + assert unify_schemas(contains_var_shaped) == pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ] + ) # Single columns with a variable shaped tensor, different ndim - var_tensor_arr_1d = pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 1)), - ]) - var_tensor_arr_3d = pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), - ]) + var_tensor_arr_1d = pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 1)), + ] + ) + var_tensor_arr_3d = pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), + ] + ) contains_1d2d = [tensor_arr_1, var_tensor_arr_1d] - assert unify_schemas(contains_1d2d) == pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), - ]) + assert unify_schemas(contains_1d2d) == pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 2)), + ] + ) contains_2d3d = [tensor_arr_1, var_tensor_arr_3d] - assert unify_schemas(contains_2d3d) == pa.schema([ - ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), - ]) + assert unify_schemas(contains_2d3d) == pa.schema( + [ + ("tensor_arr", ArrowVariableShapedTensorType(pa.int32(), 3)), + ] + ) # Multi-column schemas - multicol_schema_1 = pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), - ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), - ]) - multicol_schema_2 = pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), - ("col_var_tensor", ArrowTensorType((9, 4, 1, 0, 5), pa.int16())), - ]) - assert unify_schemas([multicol_schema_1, multicol_schema_2]) == pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), - ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), - ]) + multicol_schema_1 = pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ] + ) + multicol_schema_2 = pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowTensorType((9, 4, 1, 0, 5), pa.int16())), + ] + ) + assert unify_schemas([multicol_schema_1, multicol_schema_2]) == pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowTensorType((4, 2), pa.int32())), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ] + ) - multicol_schema_3 = pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), - ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), - ]) - assert unify_schemas([multicol_schema_1, multicol_schema_3]) == pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), - ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), - ]) + multicol_schema_3 = pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ] + ) + assert unify_schemas([multicol_schema_1, multicol_schema_3]) == pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ] + ) # Unifying >2 schemas together - assert unify_schemas([multicol_schema_1, multicol_schema_2, multicol_schema_3]) == pa.schema([ - ("col_int", pa.int32()), - ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), - ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), - ]) + assert unify_schemas( + [multicol_schema_1, multicol_schema_2, multicol_schema_3] + ) == pa.schema( + [ + ("col_int", pa.int32()), + ("col_fixed_tensor", ArrowVariableShapedTensorType(pa.int32(), 3)), + ("col_var_tensor", ArrowVariableShapedTensorType(pa.int16(), 5)), + ] + ) if __name__ == "__main__": From 6efd9141fb2b6e77bed719f6a1cb0be646c4fafd Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 20 Dec 2022 12:57:02 -0800 Subject: [PATCH 12/13] final comments and format Signed-off-by: Scott Lee --- .../ray/data/_internal/arrow_ops/transform_pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index dfc9e09b0c8e..d73db2c7ecea 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -62,11 +62,11 @@ def unify_schemas( # If we have pyarrow extension types that may potentially be variable shaped, # examine the first schema to gather the columns that need type conversions. for col_idx in range(len(schemas[0].types)): - tensor_array_types = [] - for s in schemas: - col_type = s.types[col_idx] - if isinstance(col_type, pyarrow.ExtensionType): - tensor_array_types.append(col_type) + tensor_array_types = [ + s.types[col_idx] + for s in schemas + if isinstance(s.types[col_idx], pyarrow.ExtensionType) + ] if ArrowTensorType._need_variable_shaped_tensor_array(tensor_array_types): if isinstance(tensor_array_types[0], ArrowVariableShapedTensorType): new_type = tensor_array_types[0] From 55704e69028626fc54e518c732ccb5fc130dbdda Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 3 Jan 2023 12:41:31 -0800 Subject: [PATCH 13/13] defer pyarrow import to unify_schemas func Signed-off-by: Scott Lee --- .../ray/data/_internal/arrow_ops/transform_pyarrow.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index d73db2c7ecea..8a168e04cef1 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -1,10 +1,5 @@ from typing import TYPE_CHECKING, List, Union -from ray.air.util.tensor_extensions.arrow import ( - ArrowTensorType, - ArrowVariableShapedTensorType, -) - try: import pyarrow except ImportError: @@ -55,6 +50,11 @@ def unify_schemas( ) -> "pyarrow.Schema": """Version of `pyarrow.unify_schemas()` which also handles checks for variable-shaped tensors in the given schemas.""" + from ray.air.util.tensor_extensions.arrow import ( + ArrowTensorType, + ArrowVariableShapedTensorType, + ) + schemas_to_unify = [] schema_tensor_field_overrides = {}