From 3203f0cced9081961f8b1f6c1bdea3743bf0a122 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 26 Jan 2023 14:36:45 -0800 Subject: [PATCH 01/16] Initial commit Signed-off-by: Balaji Veeramani --- doc/source/data/api/data_representations.rst | 3 +- python/ray/air/tests/test_tensor_extension.py | 15 +++++++ .../ray/air/util/tensor_extensions/pandas.py | 8 ++-- .../ray/air/util/tensor_extensions/utils.py | 43 ++++++++++++++++--- python/ray/data/_internal/simple_block.py | 4 +- python/ray/data/dataset.py | 4 +- .../ray/data/extensions/tensor_extension.py | 3 ++ python/ray/data/preprocessors/torch.py | 4 +- python/ray/data/read_api.py | 4 +- 9 files changed, 68 insertions(+), 20 deletions(-) diff --git a/doc/source/data/api/data_representations.rst b/doc/source/data/api/data_representations.rst index aacc09b963ab..0b86e393d64c 100644 --- a/doc/source/data/api/data_representations.rst +++ b/doc/source/data/api/data_representations.rst @@ -23,7 +23,7 @@ Batch API .. autosummary:: :toctree: doc/ - + block.DataBatch Row API @@ -42,6 +42,7 @@ Tensor Column Extension API .. autosummary:: :toctree: doc/ + extensions.tensor_extension.create_possibly_ragged_ndarray extensions.tensor_extension.TensorDtype extensions.tensor_extension.TensorArray extensions.tensor_extension.ArrowTensorType diff --git a/python/ray/air/tests/test_tensor_extension.py b/python/ray/air/tests/test_tensor_extension.py index 811116a82105..3a9c0c27c07b 100644 --- a/python/ray/air/tests/test_tensor_extension.py +++ b/python/ray/air/tests/test_tensor_extension.py @@ -13,9 +13,24 @@ ArrowVariableShapedTensorType, ) from ray.air.util.tensor_extensions.pandas import TensorArray, TensorDtype +from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray from ray._private.utils import _get_pyarrow_version +@pytest.mark.parametrize( + "values", + [ + [np.zeros((3, 1)), np.zeros((3, 2))], + [np.zeros((3,))], + ], +) +def test_create_possibly_ragged_ndarray(values): + ragged_array = create_possibly_ragged_ndarray(values) + + assert [array.shape for array in ragged_array] == [array.shape for array in values] + assert [array.dtype for array in ragged_array] == [array.dtype for array in values] + + def test_tensor_array_validation(): # Test unknown input type raises TypeError. with pytest.raises(TypeError): diff --git a/python/ray/air/util/tensor_extensions/pandas.py b/python/ray/air/util/tensor_extensions/pandas.py index b10a25c50726..aa7259f4ca02 100644 --- a/python/ray/air/util/tensor_extensions/pandas.py +++ b/python/ray/air/util/tensor_extensions/pandas.py @@ -45,7 +45,7 @@ from pandas.io.formats.format import ExtensionArrayFormatter from ray.air.util.tensor_extensions.utils import ( - _create_possibly_ragged_ndarray, + create_possibly_ragged_ndarray, _is_ndarray_variable_shaped_tensor, ) from ray.util.annotations import PublicAPI @@ -731,13 +731,13 @@ def __init__( # Try to convert some well-known objects to ndarrays before handing off to # ndarray handling logic. if isinstance(values, ABCSeries): - values = _create_possibly_ragged_ndarray(values) + values = create_possibly_ragged_ndarray(values) elif isinstance(values, Sequence): values = [ np.asarray(v) if isinstance(v, TensorArrayElement) else v for v in values ] - values = _create_possibly_ragged_ndarray(values) + values = create_possibly_ragged_ndarray(values) elif isinstance(values, TensorArrayElement): values = np.array([np.asarray(values)], copy=False) @@ -754,7 +754,7 @@ def __init__( values = [np.asarray(v) for v in values] # Try to convert ndarrays of ndarrays/TensorArrayElements with an # opaque object type to a properly typed ndarray of ndarrays. - values = _create_possibly_ragged_ndarray(values) + values = create_possibly_ragged_ndarray(values) else: raise TypeError( "Expected a well-typed ndarray or an object-typed ndarray of " diff --git a/python/ray/air/util/tensor_extensions/utils.py b/python/ray/air/util/tensor_extensions/utils.py index fe7ad97d16b4..94244f1645ad 100644 --- a/python/ray/air/util/tensor_extensions/utils.py +++ b/python/ray/air/util/tensor_extensions/utils.py @@ -3,6 +3,8 @@ import numpy as np +from ray.util import PublicAPI + if TYPE_CHECKING: from pandas.core.dtypes.generic import ABCSeries @@ -28,15 +30,42 @@ def _is_ndarray_variable_shaped_tensor(arr: np.ndarray) -> bool: return True -def _create_possibly_ragged_ndarray( +@PublicAPI(stability="beta") +def create_possibly_ragged_ndarray( values: Union[np.ndarray, "ABCSeries", Sequence[Any]] ) -> np.ndarray: - """ - Create a possibly ragged ndarray. - Using the np.array() constructor will fail to construct a ragged ndarray that has a - uniform first dimension (e.g. uniform channel dimension in imagery). This function - catches this failure and tries a create-and-fill method to construct the ragged - ndarray. + """Create a possibly ragged ndarray. + + If you're working with variable-length arrays like images, use this function to + create ragged arrays instead of ``np.array``. + + .. note:: + ``np.array`` fails to construct ragged arrays if the input arrays have a uniform + first dimension: + + >>> values = [np.zeros((3, 1)), np.zeros((3, 2))] + >>> np.array(values, dtype=object) + Traceback (most recent call last): + ... + ValueError: could not broadcast input array from shape (3,1) into shape (3,) + >>> create_possibly_ragged_ndarray(values) + array([array([[0.], + [0.], + [0.]]), array([[0., 0.], + [0., 0.], + [0., 0.]])], dtype=object) + + Or if you're creating a ragged array from a single array: + + >>> values = [np.zeros((3, 1))] + >>> np.array(values, dtype=object)[0].dtype + dtype('O') + >>> create_possibly_ragged_ndarray(values)[0].dtype + dtype('float64') + + ``create_possibly_ragged_ndarray`` avoids the limitations of ``np.array`` by + creating an empty array and filling it with pointers to the variable-length + arrays. """ try: with warnings.catch_warnings(): diff --git a/python/ray/data/_internal/simple_block.py b/python/ray/data/_internal/simple_block.py index 587f1efb4718..87cac3e689ff 100644 --- a/python/ray/data/_internal/simple_block.py +++ b/python/ray/data/_internal/simple_block.py @@ -5,7 +5,7 @@ import numpy as np -from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray if TYPE_CHECKING: import pandas @@ -105,7 +105,7 @@ def to_numpy( if not isinstance(columns, list): columns = [columns] return BlockAccessor.for_block(self.select(columns)).to_numpy() - return _create_possibly_ragged_ndarray(self._items) + return create_possibly_ragged_ndarray(self._items) def to_arrow(self) -> "pyarrow.Table": import pyarrow diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 7ab51669992b..26a391ab1ea1 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -25,7 +25,7 @@ import numpy as np import ray -from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray import ray.cloudpickle as pickle from ray._private.usage import usage_lib from ray.air.constants import TENSOR_COLUMN_NAME @@ -1136,7 +1136,7 @@ def process_batch(batch): if isinstance(batch, pd.DataFrame): return batch.sample(frac=fraction) if isinstance(batch, np.ndarray): - return _create_possibly_ragged_ndarray( + return create_possibly_ragged_ndarray( [row for row in batch if random.random() <= fraction] ) raise ValueError(f"Unsupported batch type: {type(batch)}") diff --git a/python/ray/data/extensions/tensor_extension.py b/python/ray/data/extensions/tensor_extension.py index 3541e41e6f1c..6ebcc21be5ee 100644 --- a/python/ray/data/extensions/tensor_extension.py +++ b/python/ray/data/extensions/tensor_extension.py @@ -10,3 +10,6 @@ ArrowVariableShapedTensorType, ArrowVariableShapedTensorArray, ) +from ray.air.util.tensor_extensions.utils import ( # noqa: F401 + create_possibly_ragged_ndarray, +) diff --git a/python/ray/data/preprocessors/torch.py b/python/ray/data/preprocessors/torch.py index 2f27ba178e38..ecfc1ab09577 100644 --- a/python/ray/data/preprocessors/torch.py +++ b/python/ray/data/preprocessors/torch.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING, Callable, Dict, List, Union import numpy as np -from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray from ray.data.preprocessor import Preprocessor from ray.util.annotations import PublicAPI @@ -86,7 +86,7 @@ def _transform_numpy( def transform(batch: np.ndarray) -> np.ndarray: if self._batched: return self._fn(batch).numpy() - return _create_possibly_ragged_ndarray( + return create_possibly_ragged_ndarray( [self._fn(array).numpy() for array in batch], ) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 5008743bd117..e62682ea9e33 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -5,7 +5,7 @@ import numpy as np import ray -from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_list import BlockList from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder @@ -1617,7 +1617,7 @@ def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": # NOTE(Clark): We use NumPy to consolidate these potentially # non-contiguous buffers, and to do buffer bookkeeping in # general. - np_col = _create_possibly_ragged_ndarray( + np_col = create_possibly_ragged_ndarray( [ np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype) for buf in block.column(tensor_col_name) From 28fa37c65179cd5f7ded165663fae75f0b863e7c Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 30 Jan 2023 14:01:41 -0800 Subject: [PATCH 02/16] Address review comments Signed-off-by: Balaji Veeramani --- python/ray/air/tests/test_tensor_extension.py | 6 +-- .../ray/air/util/tensor_extensions/utils.py | 43 +++++++++++-------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/python/ray/air/tests/test_tensor_extension.py b/python/ray/air/tests/test_tensor_extension.py index 3a9c0c27c07b..1b51bc73c89c 100644 --- a/python/ray/air/tests/test_tensor_extension.py +++ b/python/ray/air/tests/test_tensor_extension.py @@ -26,9 +26,9 @@ ) def test_create_possibly_ragged_ndarray(values): ragged_array = create_possibly_ragged_ndarray(values) - - assert [array.shape for array in ragged_array] == [array.shape for array in values] - assert [array.dtype for array in ragged_array] == [array.dtype for array in values] + assert len(ragged_array) == len(values) + for actual_array, expected_array in zip(ragged_array, values): + np.testing.assert_array_equal(actual_array, expected_array) def test_tensor_array_validation(): diff --git a/python/ray/air/util/tensor_extensions/utils.py b/python/ray/air/util/tensor_extensions/utils.py index 94244f1645ad..6bf570fdf6a2 100644 --- a/python/ray/air/util/tensor_extensions/utils.py +++ b/python/ray/air/util/tensor_extensions/utils.py @@ -43,30 +43,39 @@ def create_possibly_ragged_ndarray( ``np.array`` fails to construct ragged arrays if the input arrays have a uniform first dimension: - >>> values = [np.zeros((3, 1)), np.zeros((3, 2))] - >>> np.array(values, dtype=object) - Traceback (most recent call last): - ... - ValueError: could not broadcast input array from shape (3,1) into shape (3,) - >>> create_possibly_ragged_ndarray(values) - array([array([[0.], - [0.], - [0.]]), array([[0., 0.], - [0., 0.], - [0., 0.]])], dtype=object) + .. testsetup:: + + import numpy as np + from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray + + .. doctest:: + + >>> values = [np.zeros((3, 1)), np.zeros((3, 2))] + >>> np.array(values, dtype=object) + Traceback (most recent call last): + ... + ValueError: could not broadcast input array from shape (3,1) into shape (3,) + >>> create_possibly_ragged_ndarray(values) + array([array([[0.], + [0.], + [0.]]), array([[0., 0.], + [0., 0.], + [0., 0.]])], dtype=object) Or if you're creating a ragged array from a single array: - >>> values = [np.zeros((3, 1))] - >>> np.array(values, dtype=object)[0].dtype - dtype('O') - >>> create_possibly_ragged_ndarray(values)[0].dtype - dtype('float64') + .. doctest:: + + >>> values = [np.zeros((3, 1))] + >>> np.array(values, dtype=object)[0].dtype + dtype('O') + >>> create_possibly_ragged_ndarray(values)[0].dtype + dtype('float64') ``create_possibly_ragged_ndarray`` avoids the limitations of ``np.array`` by creating an empty array and filling it with pointers to the variable-length arrays. - """ + """ # noqa: E501 try: with warnings.catch_warnings(): # For NumPy < 1.24, constructing a ragged ndarray directly via From 2f5fd21b2397d8435b3e4c3523fffff99456a2c9 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 30 Jan 2023 16:09:32 -0800 Subject: [PATCH 03/16] Update utils.py Signed-off-by: Balaji Veeramani --- python/ray/air/util/tensor_extensions/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/air/util/tensor_extensions/utils.py b/python/ray/air/util/tensor_extensions/utils.py index 6bf570fdf6a2..9ad78a73db62 100644 --- a/python/ray/air/util/tensor_extensions/utils.py +++ b/python/ray/air/util/tensor_extensions/utils.py @@ -30,7 +30,7 @@ def _is_ndarray_variable_shaped_tensor(arr: np.ndarray) -> bool: return True -@PublicAPI(stability="beta") +@PublicAPI(stability="alpha") def create_possibly_ragged_ndarray( values: Union[np.ndarray, "ABCSeries", Sequence[Any]] ) -> np.ndarray: From 10d78cf4d4460a4956c7b55253dafc768205f13c Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 31 Jan 2023 14:47:26 -0800 Subject: [PATCH 04/16] Address review comments Signed-off-by: Balaji Veeramani --- doc/source/data/api/data_representations.rst | 2 +- python/ray/air/tests/test_tensor_extension.py | 6 +- .../ray/air/util/tensor_extensions/arrow.py | 4 +- .../ray/air/util/tensor_extensions/pandas.py | 8 +- .../ray/air/util/tensor_extensions/utils.py | 94 +++++++++---------- python/ray/data/_internal/simple_block.py | 4 +- python/ray/data/dataset.py | 4 +- .../ray/data/extensions/tensor_extension.py | 3 +- python/ray/data/preprocessors/torch.py | 4 +- python/ray/data/read_api.py | 4 +- 10 files changed, 64 insertions(+), 69 deletions(-) diff --git a/doc/source/data/api/data_representations.rst b/doc/source/data/api/data_representations.rst index 0b86e393d64c..8cedc04578af 100644 --- a/doc/source/data/api/data_representations.rst +++ b/doc/source/data/api/data_representations.rst @@ -42,7 +42,7 @@ Tensor Column Extension API .. autosummary:: :toctree: doc/ - extensions.tensor_extension.create_possibly_ragged_ndarray + extensions.tensor_extension.create_ragged_ndarray extensions.tensor_extension.TensorDtype extensions.tensor_extension.TensorArray extensions.tensor_extension.ArrowTensorType diff --git a/python/ray/air/tests/test_tensor_extension.py b/python/ray/air/tests/test_tensor_extension.py index adf4ba3fceed..e4b82afc9947 100644 --- a/python/ray/air/tests/test_tensor_extension.py +++ b/python/ray/air/tests/test_tensor_extension.py @@ -13,7 +13,7 @@ ArrowVariableShapedTensorType, ) from ray.air.util.tensor_extensions.pandas import TensorArray, TensorDtype -from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import create_ragged_ndarray from ray._private.utils import _get_pyarrow_version @@ -24,8 +24,8 @@ [np.zeros((3,))], ], ) -def test_create_possibly_ragged_ndarray(values): - ragged_array = create_possibly_ragged_ndarray(values) +def test_create_ragged_ndarray(values): + ragged_array = create_ragged_ndarray(values) assert len(ragged_array) == len(values) for actual_array, expected_array in zip(ragged_array, values): np.testing.assert_array_equal(actual_array, expected_array) diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index b6d2681de031..06097242737f 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -8,7 +8,7 @@ from ray.air.util.tensor_extensions.utils import ( _is_ndarray_variable_shaped_tensor, - _create_strict_ragged_ndarray, + create_ragged_ndarray, ) from ray._private.utils import _get_pyarrow_version from ray.util.annotations import PublicAPI @@ -779,7 +779,7 @@ def _to_numpy(self, index: Optional[int] = None, zero_copy_only: bool = False): arrs = [self._to_numpy(i, zero_copy_only) for i in range(len(self))] # Return ragged NumPy ndarray in the ndarray of ndarray pointers # representation. - return _create_strict_ragged_ndarray(arrs) + return create_ragged_ndarray(arrs) data = self.storage.field("data") shapes = self.storage.field("shape") value_type = data.type.value_type diff --git a/python/ray/air/util/tensor_extensions/pandas.py b/python/ray/air/util/tensor_extensions/pandas.py index aa7259f4ca02..b10a25c50726 100644 --- a/python/ray/air/util/tensor_extensions/pandas.py +++ b/python/ray/air/util/tensor_extensions/pandas.py @@ -45,7 +45,7 @@ from pandas.io.formats.format import ExtensionArrayFormatter from ray.air.util.tensor_extensions.utils import ( - create_possibly_ragged_ndarray, + _create_possibly_ragged_ndarray, _is_ndarray_variable_shaped_tensor, ) from ray.util.annotations import PublicAPI @@ -731,13 +731,13 @@ def __init__( # Try to convert some well-known objects to ndarrays before handing off to # ndarray handling logic. if isinstance(values, ABCSeries): - values = create_possibly_ragged_ndarray(values) + values = _create_possibly_ragged_ndarray(values) elif isinstance(values, Sequence): values = [ np.asarray(v) if isinstance(v, TensorArrayElement) else v for v in values ] - values = create_possibly_ragged_ndarray(values) + values = _create_possibly_ragged_ndarray(values) elif isinstance(values, TensorArrayElement): values = np.array([np.asarray(values)], copy=False) @@ -754,7 +754,7 @@ def __init__( values = [np.asarray(v) for v in values] # Try to convert ndarrays of ndarrays/TensorArrayElements with an # opaque object type to a properly typed ndarray of ndarrays. - values = create_possibly_ragged_ndarray(values) + values = _create_possibly_ragged_ndarray(values) else: raise TypeError( "Expected a well-typed ndarray or an object-typed ndarray of " diff --git a/python/ray/air/util/tensor_extensions/utils.py b/python/ray/air/util/tensor_extensions/utils.py index 9ad78a73db62..410f4c3a2a44 100644 --- a/python/ray/air/util/tensor_extensions/utils.py +++ b/python/ray/air/util/tensor_extensions/utils.py @@ -30,11 +30,47 @@ def _is_ndarray_variable_shaped_tensor(arr: np.ndarray) -> bool: return True -@PublicAPI(stability="alpha") -def create_possibly_ragged_ndarray( +def _create_possibly_ragged_ndarray( values: Union[np.ndarray, "ABCSeries", Sequence[Any]] ) -> np.ndarray: - """Create a possibly ragged ndarray. + """ + Create a possibly ragged ndarray. + Using the np.array() constructor will fail to construct a ragged ndarray that has a + uniform first dimension (e.g. uniform channel dimension in imagery). This function + catches this failure and tries a create-and-fill method to construct the ragged + ndarray. + """ + try: + with warnings.catch_warnings(): + # For NumPy < 1.24, constructing a ragged ndarray directly via + # `np.array(...)` without the `dtype=object` parameter will raise a + # VisibleDeprecationWarning which we suppress. + # More details: https://stackoverflow.com/q/63097829 + warnings.simplefilter("ignore", category=np.VisibleDeprecationWarning) + return np.array(values, copy=False) + except ValueError as e: + # Constructing a ragged ndarray directly via `np.array(...)` + # without the `dtype=object` parameter will raise a ValueError. + # For NumPy < 1.24, the message is of the form: + # "could not broadcast input array from shape..." + # For NumPy >= 1.24, the message is of the form: + # "The requested array has an inhomogeneous shape..." + # More details: https://github.com/numpy/numpy/pull/22004 + error_str = str(e) + if ( + "could not broadcast input array from shape" in error_str + or "The requested array has an inhomogeneous shape" in error_str + ): + # Fall back to strictly creating a ragged ndarray. + return create_ragged_ndarray(values) + else: + # Re-raise original error if the failure wasn't a broadcast error. + raise e from None + + +@PublicAPI(stability="alpha") +def create_ragged_ndarray(values: Sequence[np.ndarray]) -> np.ndarray: + """Create an array that contains arrays of different length If you're working with variable-length arrays like images, use this function to create ragged arrays instead of ``np.array``. @@ -46,7 +82,7 @@ def create_possibly_ragged_ndarray( .. testsetup:: import numpy as np - from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray + from ray.air.util.tensor_extensions.utils import create_ragged_ndarray .. doctest:: @@ -55,7 +91,7 @@ def create_possibly_ragged_ndarray( Traceback (most recent call last): ... ValueError: could not broadcast input array from shape (3,1) into shape (3,) - >>> create_possibly_ragged_ndarray(values) + >>> create_ragged_ndarray(values) array([array([[0.], [0.], [0.]]), array([[0., 0.], @@ -69,54 +105,12 @@ def create_possibly_ragged_ndarray( >>> values = [np.zeros((3, 1))] >>> np.array(values, dtype=object)[0].dtype dtype('O') - >>> create_possibly_ragged_ndarray(values)[0].dtype + >>> create_ragged_ndarray(values)[0].dtype dtype('float64') - ``create_possibly_ragged_ndarray`` avoids the limitations of ``np.array`` by - creating an empty array and filling it with pointers to the variable-length - arrays. + ``create_ragged_ndarray`` avoids the limitations of ``np.array`` by creating an + empty array and filling it with pointers to the variable-length arrays. """ # noqa: E501 - try: - with warnings.catch_warnings(): - # For NumPy < 1.24, constructing a ragged ndarray directly via - # `np.array(...)` without the `dtype=object` parameter will raise a - # VisibleDeprecationWarning which we suppress. - # More details: https://stackoverflow.com/q/63097829 - warnings.simplefilter("ignore", category=np.VisibleDeprecationWarning) - return np.array(values, copy=False) - except ValueError as e: - # Constructing a ragged ndarray directly via `np.array(...)` - # without the `dtype=object` parameter will raise a ValueError. - # For NumPy < 1.24, the message is of the form: - # "could not broadcast input array from shape..." - # For NumPy >= 1.24, the message is of the form: - # "The requested array has an inhomogeneous shape..." - # More details: https://github.com/numpy/numpy/pull/22004 - error_str = str(e) - if ( - "could not broadcast input array from shape" in error_str - or "The requested array has an inhomogeneous shape" in error_str - ): - # Fall back to strictly creating a ragged ndarray. - return _create_strict_ragged_ndarray(values) - else: - # Re-raise original error if the failure wasn't a broadcast error. - raise e from None - - -def _create_strict_ragged_ndarray(values: Any) -> np.ndarray: - """Create a ragged ndarray; the representation will be ragged (1D array of - subndarray pointers) even if it's possible to represent it as a non-ragged ndarray. - """ - # Use the create-empty-and-fill method. This avoids the following pitfalls of the - # np.array constructor - np.array(values, dtype=object): - # 1. It will fail to construct an ndarray if the first element dimension is - # uniform, e.g. for imagery whose first element dimension is the channel. - # 2. It will construct the wrong representation for a single-row column (i.e. unit - # outer dimension). Namely, it will consolidate it into a single multi-dimensional - # ndarray rather than a 1D array of subndarray pointers, resulting in the single - # row not being well-typed (having object dtype). - # Create an empty object-dtyped 1D array. arr = np.empty(len(values), dtype=object) # Try to fill the 1D array of pointers with the (ragged) tensors. diff --git a/python/ray/data/_internal/simple_block.py b/python/ray/data/_internal/simple_block.py index 87cac3e689ff..587f1efb4718 100644 --- a/python/ray/data/_internal/simple_block.py +++ b/python/ray/data/_internal/simple_block.py @@ -5,7 +5,7 @@ import numpy as np -from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray if TYPE_CHECKING: import pandas @@ -105,7 +105,7 @@ def to_numpy( if not isinstance(columns, list): columns = [columns] return BlockAccessor.for_block(self.select(columns)).to_numpy() - return create_possibly_ragged_ndarray(self._items) + return _create_possibly_ragged_ndarray(self._items) def to_arrow(self) -> "pyarrow.Table": import pyarrow diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 518e73935e86..4839cdf4fcd1 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -25,7 +25,7 @@ import numpy as np import ray -from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray import ray.cloudpickle as pickle from ray._private.usage import usage_lib from ray.air.constants import TENSOR_COLUMN_NAME @@ -1095,7 +1095,7 @@ def process_batch(batch): if isinstance(batch, pd.DataFrame): return batch.sample(frac=fraction) if isinstance(batch, np.ndarray): - return create_possibly_ragged_ndarray( + return _create_possibly_ragged_ndarray( [row for row in batch if random.random() <= fraction] ) raise ValueError(f"Unsupported batch type: {type(batch)}") diff --git a/python/ray/data/extensions/tensor_extension.py b/python/ray/data/extensions/tensor_extension.py index 6ebcc21be5ee..e439ec11bc5f 100644 --- a/python/ray/data/extensions/tensor_extension.py +++ b/python/ray/data/extensions/tensor_extension.py @@ -11,5 +11,6 @@ ArrowVariableShapedTensorArray, ) from ray.air.util.tensor_extensions.utils import ( # noqa: F401 - create_possibly_ragged_ndarray, + _create_possibly_ragged_ndarray, + create_ragged_ndarray, ) diff --git a/python/ray/data/preprocessors/torch.py b/python/ray/data/preprocessors/torch.py index ecfc1ab09577..2f27ba178e38 100644 --- a/python/ray/data/preprocessors/torch.py +++ b/python/ray/data/preprocessors/torch.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING, Callable, Dict, List, Union import numpy as np -from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray from ray.data.preprocessor import Preprocessor from ray.util.annotations import PublicAPI @@ -86,7 +86,7 @@ def _transform_numpy( def transform(batch: np.ndarray) -> np.ndarray: if self._batched: return self._fn(batch).numpy() - return create_possibly_ragged_ndarray( + return _create_possibly_ragged_ndarray( [self._fn(array).numpy() for array in batch], ) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index e62682ea9e33..5008743bd117 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -5,7 +5,7 @@ import numpy as np import ray -from ray.air.util.tensor_extensions.utils import create_possibly_ragged_ndarray +from ray.air.util.tensor_extensions.utils import _create_possibly_ragged_ndarray from ray.data._internal.arrow_block import ArrowRow from ray.data._internal.block_list import BlockList from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder @@ -1617,7 +1617,7 @@ def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": # NOTE(Clark): We use NumPy to consolidate these potentially # non-contiguous buffers, and to do buffer bookkeeping in # general. - np_col = create_possibly_ragged_ndarray( + np_col = _create_possibly_ragged_ndarray( [ np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype) for buf in block.column(tensor_col_name) From 0d331c5bf58db8549a4d28b0d4d69343db3f2870 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 31 Jan 2023 14:49:42 -0800 Subject: [PATCH 05/16] Update tensor_extension.py Signed-off-by: Balaji Veeramani --- python/ray/data/extensions/tensor_extension.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/ray/data/extensions/tensor_extension.py b/python/ray/data/extensions/tensor_extension.py index e439ec11bc5f..5f7ad379d8a3 100644 --- a/python/ray/data/extensions/tensor_extension.py +++ b/python/ray/data/extensions/tensor_extension.py @@ -10,7 +10,5 @@ ArrowVariableShapedTensorType, ArrowVariableShapedTensorArray, ) -from ray.air.util.tensor_extensions.utils import ( # noqa: F401 - _create_possibly_ragged_ndarray, - create_ragged_ndarray, +from ray.air.util.tensor_extensions.utils import create_ragged_ndarray # noqa: F401 ) From 328b3f8f627516d38b8104222888dc67a8f375f2 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 31 Jan 2023 14:50:05 -0800 Subject: [PATCH 06/16] Update tensor_extension.py Signed-off-by: Balaji Veeramani --- python/ray/data/extensions/tensor_extension.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/extensions/tensor_extension.py b/python/ray/data/extensions/tensor_extension.py index 5f7ad379d8a3..2734de875718 100644 --- a/python/ray/data/extensions/tensor_extension.py +++ b/python/ray/data/extensions/tensor_extension.py @@ -11,4 +11,3 @@ ArrowVariableShapedTensorArray, ) from ray.air.util.tensor_extensions.utils import create_ragged_ndarray # noqa: F401 -) From 9855e9430e4a7c9dcae628de1b9323e3aabfaa88 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 2 Feb 2023 15:30:55 -0800 Subject: [PATCH 07/16] Initial commit Signed-off-by: Balaji Veeramani --- python/ray/train/_internal/dl_predictor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/train/_internal/dl_predictor.py b/python/ray/train/_internal/dl_predictor.py index e097fe1297bc..d80396c14387 100644 --- a/python/ray/train/_internal/dl_predictor.py +++ b/python/ray/train/_internal/dl_predictor.py @@ -1,13 +1,13 @@ import abc -from typing import Dict, TypeVar, Union +from typing import Dict, Optional, TypeVar, Union import numpy as np import pandas as pd from ray.air.util.data_batch_conversion import ( BatchFormat, - convert_pandas_to_batch_type, convert_batch_type_to_pandas, + convert_pandas_to_batch_type, ) from ray.train.predictor import Predictor from ray.util.annotations import DeveloperAPI @@ -72,7 +72,9 @@ def preferred_batch_format(cls) -> BatchFormat: return BatchFormat.NUMPY def _predict_pandas( - self, data: pd.DataFrame, dtype: Union[TensorDtype, Dict[str, TensorDtype]] + self, + data: pd.DataFrame, + dtype: Optional[Union[TensorDtype, Dict[str, TensorDtype]]], ) -> pd.DataFrame: numpy_input = convert_pandas_to_batch_type( data, @@ -85,7 +87,7 @@ def _predict_pandas( def _predict_numpy( self, data: Union[np.ndarray, Dict[str, np.ndarray]], - dtype: Union[TensorDtype, Dict[str, TensorDtype]], + dtype: Optional[Union[TensorDtype, Dict[str, TensorDtype]]], ) -> Union[np.ndarray, Dict[str, np.ndarray]]: # Single column selection return numpy array so preprocessors can be # reused in both training and prediction From 22b3aa97a3934dab1a865f7e2865afb0ba1f7c56 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 2 Feb 2023 17:18:02 -0800 Subject: [PATCH 08/16] Add `TorchDetectionPredictor` Signed-off-by: Balaji Veeramani --- doc/source/train/api.rst | 14 +- .../tests/test_torch_detection_predictor.py | 59 ++++++++ python/ray/train/torch/__init__.py | 4 +- .../train/torch/torch_detection_predictor.py | 136 ++++++++++++++++++ python/ray/train/torch/torch_predictor.py | 21 +-- 5 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 python/ray/train/tests/test_torch_detection_predictor.py create mode 100644 python/ray/train/torch/torch_detection_predictor.py diff --git a/doc/source/train/api.rst b/doc/source/train/api.rst index 61ea64fefa31..de1e986f0e00 100644 --- a/doc/source/train/api.rst +++ b/doc/source/train/api.rst @@ -89,11 +89,21 @@ PyTorch ``TorchPredictor`` ****************** -.. automodule:: ray.train.torch +.. autoclass:: ray.train.torch.TorchPredictor :members: - :exclude-members: TorchTrainer :show-inheritance: + .. automethod:: __init__ + +``TorchDetectionPredictor`` +*************************** + +.. autoclass:: ray.train.torch.TorchDetectionPredictor + :members: + :show-inheritance: + + .. automethod:: __init__ + Horovod ~~~~~~~ diff --git a/python/ray/train/tests/test_torch_detection_predictor.py b/python/ray/train/tests/test_torch_detection_predictor.py new file mode 100644 index 000000000000..15cd973a5c3c --- /dev/null +++ b/python/ray/train/tests/test_torch_detection_predictor.py @@ -0,0 +1,59 @@ +import numpy as np +import pytest +from torchvision import models + +import ray +from ray.air.util.tensor_extensions.utils import create_ragged_ndarray +from ray.train.batch_predictor import BatchPredictor +from ray.train.torch import TorchCheckpoint, TorchDetectionPredictor + + +@pytest.fixture(name="predictor") +def predictor_fixture(): + model = models.detection.maskrcnn_resnet50_fpn() + yield TorchDetectionPredictor(model=model) + + +@pytest.mark.parametrize( + "data", + [ + np.zeros((1, 3, 32, 32), dtype=np.float32), + {"image": np.zeros((1, 3, 32, 32), dtype=np.float32)}, + create_ragged_ndarray( + [ + np.zeros((3, 32, 32), dtype=np.float32), + np.zeros((3, 64, 64), dtype=np.float32), + ] + ), + ], +) +def test_predict(predictor, data): + predictions = predictor.predict(data) + + assert all(len(value) == len(data) for value in predictions.values()) + # Boxes should have shape `(# detections, 4)`. + assert all(boxes.ndim == 2 for boxes in predictions["pred_boxes"]) + assert all(boxes.shape[-1] == 4 for boxes in predictions["pred_boxes"]) + # Labels should have shape `(# detections,)`. + assert all(labels.ndim == 1 for labels in predictions["pred_labels"]) + # Scores should have shape `(# detections,)`. + assert all(labels.ndim == 1 for labels in predictions["pred_scores"]) + + +def test_multi_column_batch_raises_value_error(predictor): + data = { + "image": np.zeros((2, 3, 32, 32), dtype=np.float32), + "boxes": np.zeros((2, 0, 4), dtype=np.float32), + "labels": np.zeros((2, 0), dtype=np.int64), + } + with pytest.raises(ValueError): + # `data` should only contain one key. Otherwise, `TorchDetectionPredictor` + # doesn't know which column contains the input images. + predictor.predict(data) + + +def test_invalid_dtype_raises_value_error(predictor): + data = np.zeros((1, 3, 32, 32), dtype=np.float32) + with pytest.raises(ValueError): + # `dtype` should be a single `torch.dtype`. + predictor.predict(data, dtype=np.float32) diff --git a/python/ray/train/torch/__init__.py b/python/ray/train/torch/__init__.py index e041beb51cc4..86b424ca3777 100644 --- a/python/ray/train/torch/__init__.py +++ b/python/ray/train/torch/__init__.py @@ -7,8 +7,9 @@ ) # isort: on -from ray.train.torch.torch_checkpoint import TorchCheckpoint from ray.train.torch.config import TorchConfig +from ray.train.torch.torch_checkpoint import TorchCheckpoint +from ray.train.torch.torch_detection_predictor import TorchDetectionPredictor from ray.train.torch.torch_predictor import TorchPredictor from ray.train.torch.torch_trainer import TorchTrainer from ray.train.torch.train_loop_utils import ( @@ -33,4 +34,5 @@ "backward", "enable_reproducibility", "TorchPredictor", + "TorchDetectionPredictor", ] diff --git a/python/ray/train/torch/torch_detection_predictor.py b/python/ray/train/torch/torch_detection_predictor.py new file mode 100644 index 000000000000..8c85b35115fc --- /dev/null +++ b/python/ray/train/torch/torch_detection_predictor.py @@ -0,0 +1,136 @@ +import collections +from typing import Dict, List, Optional, Union + +import numpy as np +import torch + +from ray.air.util.tensor_extensions.utils import create_ragged_ndarray +from ray.train._internal.dl_predictor import TensorDtype +from ray.train.torch.torch_predictor import TorchPredictor +from ray.util.annotations import PublicAPI + + +@PublicAPI(stability="alpha") +class TorchDetectionPredictor(TorchPredictor): + """A predictor for TorchVision detection models. + + Unlike other Torch models, instance segmentation models return + `List[Dict[str, Tensor]]`. This predictor extends :class:`TorchPredictor` to support + the non-standard outputs. + + To learn more about instance segmentation models, read + `Instance segmentation models `_. + + Example: + + .. testcode:: + + import numpy as np + from torchvision import models + + from ray.train.torch import TorchDetectionPredictor + + model = models.detection.fasterrcnn_resnet50_fpn_v2(pretrained=True) + + predictor = TorchDetectionPredictor(model=model) + predictions = predictor.predict(np.zeros((4, 3, 32, 32), dtype=np.float32)) + + print(predictions.keys()) + + .. testoutput:: + + dict_keys(['pred_boxes', 'pred_labels', 'pred_scores']) + + .. testcode:: + + import ray + from ray.train.batch_predictor import BatchPredictor + from ray.train.torch import TorchCheckpoint + + dataset = ray.data.from_items([{"image": np.zeros((3, 32, 32), dtype=np.float32)}]) + checkpoint = TorchCheckpoint.from_model(model) + predictor = BatchPredictor.from_checkpoint(checkpoint, TorchDetectionPredictor) + predictions = predictor.predict(dataset, feature_columns=["image"]) + + print(predictions.take(1)) + + .. testoutput:: + + [{'pred_boxes': array([], shape=(0, 4), dtype=float32), 'pred_labels': array([], dtype=int64), 'pred_scores': array([], dtype=float32)}] + """ # noqa: E501 + + def _predict_numpy( + self, + data: Union[np.ndarray, Dict[str, np.ndarray]], + dtype: Optional[Union[TensorDtype, Dict[str, TensorDtype]]], + ) -> Dict[str, np.ndarray]: + if isinstance(data, dict) and len(data) != 1: + raise ValueError( + f"""Expected input to contain one key, but got {len(data)} instead. + + If you're using `BatchPredictor`, pass a one-element list to + `feature_columns`. + + --- + predictor = BatchPredictor(checkpoint, TorchDetectionPredictor) + predictor.predict(dataset, feature_columns=["image"]) + --- + """ + ) + + if dtype is not None and not isinstance(dtype, torch.dtype): + raise ValueError( + "Expected `dtype` to be a `torch.dtype`, but got a " + f"{type(dtype).__name__} instead." + ) + + if isinstance(data, dict): + images = next(iter(data.values())) + else: + images = data + + inputs = [ + torch.as_tensor(image, dtype=dtype).to(self.device) for image in images + ] + outputs = self.call_model(inputs) + outputs = convert_outputs_to_ndarray_batch(outputs) + outputs = {"pred_" + key: value for key, value in outputs.items()} + + return outputs + + +def convert_outputs_to_ndarray_batch( + outputs: List[Dict[str, torch.Tensor]], +) -> Dict[str, np.ndarray]: + """Batch detection model outputs. + + TorchVision detection models return `List[Dict[Tensor]]`. Each `Dict` contain + 'boxes', 'labels, and 'scores'. + + >>> import torch + >>> from torchvision import models + >>> model = models.detection.fasterrcnn_resnet50_fpn_v2() + >>> model.eval() # doctest: +ELLIPSIS + FasterRCNN(...) + >>> outputs = model(torch.zeros((2, 3, 32, 32))) + >>> len(outputs) + 2 + >>> outputs[0].keys() + dict_keys(['boxes', 'labels', 'scores']) + + This function batches values and returns a `Dict[str, np.ndarray]`. + + >>> from ray.train.torch.torch_detection_predictor import convert_outputs_to_ndarray_batch + >>> batch = convert_outputs_to_ndarray_batch(outputs) + >>> batch.keys() + dict_keys(['boxes', 'labels', 'scores']) + >>> batch["boxes"].shape + (2,) + """ # noqa: E501 + batch = collections.defaultdict(list) + for output in outputs: + for key, value in output.items(): + batch[key].append(value.cpu().detach().numpy()) + for key, value in batch.items(): + batch[key] = create_ragged_ndarray(value) + return batch diff --git a/python/ray/train/torch/torch_predictor.py b/python/ray/train/torch/torch_predictor.py index f621e28198b3..64b4cf1524ec 100644 --- a/python/ray/train/torch/torch_predictor.py +++ b/python/ray/train/torch/torch_predictor.py @@ -4,12 +4,12 @@ import numpy as np import torch -from ray.util import log_once -from ray.train.predictor import DataBatchType -from ray.air.checkpoint import Checkpoint from ray.air._internal.torch_utils import convert_ndarray_batch_to_torch_tensor_batch -from ray.train.torch.torch_checkpoint import TorchCheckpoint +from ray.air.checkpoint import Checkpoint from ray.train._internal.dl_predictor import DLPredictor +from ray.train.predictor import DataBatchType +from ray.train.torch.torch_checkpoint import TorchCheckpoint +from ray.util import log_once from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: @@ -39,11 +39,14 @@ def __init__( self.model = model self.model.eval() - # TODO (jiaodong): #26249 Use multiple GPU devices with sharded input - self.use_gpu = use_gpu if use_gpu: - # Ensure input tensor and model live on GPU for GPU inference - self.model.to(torch.device("cuda")) + # TODO (jiaodong): #26249 Use multiple GPU devices with sharded input + self.device = torch.device("cuda") + else: + self.device = torch.device("cpu") + + # Ensure input tensor and model live on the same device + self.model.to(self.device) if ( not use_gpu @@ -231,7 +234,7 @@ def _arrays_to_tensors( return convert_ndarray_batch_to_torch_tensor_batch( numpy_arrays, dtypes=dtypes, - device="cuda" if self.use_gpu else None, + device=self.device, ) def _tensor_to_array(self, tensor: torch.Tensor) -> np.ndarray: From ab67b37f491e58ce6a05d78bfc91fc11af36bd67 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 6 Feb 2023 12:04:41 -0800 Subject: [PATCH 09/16] Fix test Signed-off-by: Balaji Veeramani --- python/ray/air/tests/test_tensor_extension.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/ray/air/tests/test_tensor_extension.py b/python/ray/air/tests/test_tensor_extension.py index 278b6a239fd3..c7022fefdab2 100644 --- a/python/ray/air/tests/test_tensor_extension.py +++ b/python/ray/air/tests/test_tensor_extension.py @@ -15,7 +15,6 @@ from ray.air.util.tensor_extensions.pandas import TensorArray, TensorDtype from ray.air.util.tensor_extensions.utils import create_ragged_ndarray from ray._private.utils import _get_pyarrow_version -from ray.air.util.tensor_extensions.utils import _create_strict_ragged_ndarray @pytest.mark.parametrize( @@ -597,7 +596,7 @@ def test_arrow_tensor_array_slice(test_arr, dtype): for shape in pytest_tensor_array_concat_shapes ] pytest_tensor_array_concat_arrs += [ - _create_strict_ragged_ndarray( + create_ragged_ndarray( [np.arange(4).reshape((2, 2)), np.arange(4, 13).reshape((3, 3))] ) ] From 750cc0b16d7f459063d5c33e56231611f483defc Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 6 Feb 2023 12:09:51 -0800 Subject: [PATCH 10/16] Address review comments Signed-off-by: Balaji Veeramani --- python/ray/train/_internal/dl_predictor.py | 2 +- python/ray/train/tensorflow/tensorflow_predictor.py | 4 ++-- python/ray/train/torch/torch_predictor.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/train/_internal/dl_predictor.py b/python/ray/train/_internal/dl_predictor.py index d80396c14387..65aee3d9f085 100644 --- a/python/ray/train/_internal/dl_predictor.py +++ b/python/ray/train/_internal/dl_predictor.py @@ -21,7 +21,7 @@ class DLPredictor(Predictor): def _arrays_to_tensors( self, numpy_arrays: Union[np.ndarray, Dict[str, np.ndarray]], - dtype: Union[TensorDtype, Dict[str, TensorDtype]], + dtype: Optional[Union[TensorDtype, Dict[str, TensorDtype]]], ) -> Union[TensorType, Dict[str, TensorType]]: """Converts a NumPy ndarray batch to the tensor type for the DL framework. diff --git a/python/ray/train/tensorflow/tensorflow_predictor.py b/python/ray/train/tensorflow/tensorflow_predictor.py index 6faa2642df78..980bb388e884 100644 --- a/python/ray/train/tensorflow/tensorflow_predictor.py +++ b/python/ray/train/tensorflow/tensorflow_predictor.py @@ -225,9 +225,9 @@ def predict( def _arrays_to_tensors( self, numpy_arrays: Union[np.ndarray, Dict[str, np.ndarray]], - dtypes: Union[tf.dtypes.DType, Dict[str, tf.dtypes.DType]], + dtype: Optional[Union[tf.dtypes.DType, Dict[str, tf.dtypes.DType]]], ) -> Union[tf.Tensor, Dict[str, tf.Tensor]]: - return convert_ndarray_batch_to_tf_tensor_batch(numpy_arrays, dtypes=dtypes) + return convert_ndarray_batch_to_tf_tensor_batch(numpy_arrays, dtypes=dtype) def _tensor_to_array(self, tensor: tf.Tensor) -> np.ndarray: if not isinstance(tensor, tf.Tensor): diff --git a/python/ray/train/torch/torch_predictor.py b/python/ray/train/torch/torch_predictor.py index f621e28198b3..eb74d5e808bd 100644 --- a/python/ray/train/torch/torch_predictor.py +++ b/python/ray/train/torch/torch_predictor.py @@ -226,11 +226,11 @@ def forward(self, input_dict: dict): def _arrays_to_tensors( self, numpy_arrays: Union[np.ndarray, Dict[str, np.ndarray]], - dtypes: Union[torch.dtype, Dict[str, torch.dtype]], + dtype: Optional[Union[torch.dtype, Dict[str, torch.dtype]]], ) -> Union[torch.Tensor, Dict[str, torch.Tensor]]: return convert_ndarray_batch_to_torch_tensor_batch( numpy_arrays, - dtypes=dtypes, + dtypes=dtype, device="cuda" if self.use_gpu else None, ) From dd3a9eb16390c2a4b780756c725cf954576b74f6 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 15:19:12 -0800 Subject: [PATCH 11/16] Address review comments Signed-off-by: Balaji Veeramani --- .../tests/test_torch_detection_predictor.py | 53 ++++++++++++++++++- .../train/torch/torch_detection_predictor.py | 8 +-- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/python/ray/train/tests/test_torch_detection_predictor.py b/python/ray/train/tests/test_torch_detection_predictor.py index 15cd973a5c3c..e43499c21f71 100644 --- a/python/ray/train/tests/test_torch_detection_predictor.py +++ b/python/ray/train/tests/test_torch_detection_predictor.py @@ -37,7 +37,58 @@ def test_predict(predictor, data): # Labels should have shape `(# detections,)`. assert all(labels.ndim == 1 for labels in predictions["pred_labels"]) # Scores should have shape `(# detections,)`. - assert all(labels.ndim == 1 for labels in predictions["pred_scores"]) + assert all(scores.ndim == 1 for scores in predictions["pred_scores"]) + + +def test_predict_tensor_dataset(): + model = models.detection.maskrcnn_resnet50_fpn() + checkpoint = TorchCheckpoint.from_model(model) + predictor = BatchPredictor.from_checkpoint(checkpoint, TorchDetectionPredictor) + dataset = ray.data.from_items([np.zeros((3, 32, 32), dtype=np.float32)]) + + predictions = predictor.predict(dataset) + + # Boxes should have shape `(# detections, 4)`. + pred_boxes = [row["pred_boxes"] for row in predictions.take_all()] + assert all(boxes.ndim == 2 for boxes in pred_boxes) + assert all(boxes.shape[-1] == 4 for boxes in pred_boxes) + # Labels should have shape `(# detections,)`. + pred_labels = [row["pred_labels"] for row in predictions.take_all()] + assert all(labels.ndim == 1 for labels in pred_labels) + # Scores should have shape `(# detections,)`. + pred_scores = [row["pred_scores"] for row in predictions.take_all()] + assert all(scores.ndim == 1 for scores in pred_scores) + + +@pytest.mark.parametrize( + "items", + [ + [{"image": np.zeros((3, 32, 32), dtype=np.float32)}], + [ + {"image": np.zeros((3, 32, 32), dtype=np.float32)}, + {"image": np.zeros((3, 64, 64), dtype=np.float32)}, + ], + ], +) +def test_predict_tabular_dataset(items): + model = models.detection.maskrcnn_resnet50_fpn() + checkpoint = TorchCheckpoint.from_model(model) + predictor = BatchPredictor.from_checkpoint(checkpoint, TorchDetectionPredictor) + dataset = ray.data.from_items(items) + + predictions = predictor.predict(dataset) + + assert predictions.count() == len(items) + # Boxes should have shape `(# detections, 4)`. + pred_boxes = [row["pred_boxes"] for row in predictions.take_all()] + assert all(boxes.ndim == 2 for boxes in pred_boxes) + assert all(boxes.shape[-1] == 4 for boxes in pred_boxes) + # Labels should have shape `(# detections,)`. + pred_labels = [row["pred_labels"] for row in predictions.take_all()] + assert all(labels.ndim == 1 for labels in pred_labels) + # Scores should have shape `(# detections,)`. + pred_scores = [row["pred_scores"] for row in predictions.take_all()] + assert all(scores.ndim == 1 for scores in pred_scores) def test_multi_column_batch_raises_value_error(predictor): diff --git a/python/ray/train/torch/torch_detection_predictor.py b/python/ray/train/torch/torch_detection_predictor.py index 8c85b35115fc..fb970cd943e9 100644 --- a/python/ray/train/torch/torch_detection_predictor.py +++ b/python/ray/train/torch/torch_detection_predictor.py @@ -93,13 +93,13 @@ def _predict_numpy( torch.as_tensor(image, dtype=dtype).to(self.device) for image in images ] outputs = self.call_model(inputs) - outputs = convert_outputs_to_ndarray_batch(outputs) + outputs = _convert_outputs_to_ndarray_batch(outputs) outputs = {"pred_" + key: value for key, value in outputs.items()} return outputs -def convert_outputs_to_ndarray_batch( +def _convert_outputs_to_ndarray_batch( outputs: List[Dict[str, torch.Tensor]], ) -> Dict[str, np.ndarray]: """Batch detection model outputs. @@ -120,8 +120,8 @@ def convert_outputs_to_ndarray_batch( This function batches values and returns a `Dict[str, np.ndarray]`. - >>> from ray.train.torch.torch_detection_predictor import convert_outputs_to_ndarray_batch - >>> batch = convert_outputs_to_ndarray_batch(outputs) + >>> from ray.train.torch.torch_detection_predictor import _convert_outputs_to_ndarray_batch + >>> batch = _convert_outputs_to_ndarray_batch(outputs) >>> batch.keys() dict_keys(['boxes', 'labels', 'scores']) >>> batch["boxes"].shape From 6ecf830f29bff8b2497b1e44a31a52d6d37aa802 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 15:51:26 -0800 Subject: [PATCH 12/16] Update torch_detection_predictor.py Signed-off-by: Balaji Veeramani --- python/ray/train/torch/torch_detection_predictor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/train/torch/torch_detection_predictor.py b/python/ray/train/torch/torch_detection_predictor.py index fb970cd943e9..7de5eee0934a 100644 --- a/python/ray/train/torch/torch_detection_predictor.py +++ b/python/ray/train/torch/torch_detection_predictor.py @@ -43,11 +43,15 @@ class TorchDetectionPredictor(TorchPredictor): .. testcode:: + import numpy as np + from torchvision import models + import ray from ray.train.batch_predictor import BatchPredictor - from ray.train.torch import TorchCheckpoint + from ray.train.torch import TorchCheckpoint, TorchDetectionPredictor dataset = ray.data.from_items([{"image": np.zeros((3, 32, 32), dtype=np.float32)}]) + model = models.detection.fasterrcnn_resnet50_fpn_v2(pretrained=True) checkpoint = TorchCheckpoint.from_model(model) predictor = BatchPredictor.from_checkpoint(checkpoint, TorchDetectionPredictor) predictions = predictor.predict(dataset, feature_columns=["image"]) From 6374151416afbd7db711fae3d92a6ba6b7471633 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 16:49:05 -0800 Subject: [PATCH 13/16] Fix Bazel Signed-off-by: Balaji Veeramani --- python/ray/train/BUILD | 7 +++++++ python/ray/train/tests/test_torch_detection_predictor.py | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index 6c8a9026ee94..d585efe0facf 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -496,6 +496,13 @@ py_test( deps = [":train_lib", ":conftest"] ) +py_test( + name = "test_torch_detection_predictor", + size = "small", + srcs = ["tests/test_torch_detection_predictor.py"], + tags = ["team:ml", "exclusive", "ray_air", "gpu"], + deps = [":train_lib", ":conftest"] + py_test( name = "test_torch_trainer", size = "medium", diff --git a/python/ray/train/tests/test_torch_detection_predictor.py b/python/ray/train/tests/test_torch_detection_predictor.py index e43499c21f71..e0cb9fb3b5a6 100644 --- a/python/ray/train/tests/test_torch_detection_predictor.py +++ b/python/ray/train/tests/test_torch_detection_predictor.py @@ -108,3 +108,11 @@ def test_invalid_dtype_raises_value_error(predictor): with pytest.raises(ValueError): # `dtype` should be a single `torch.dtype`. predictor.predict(data, dtype=np.float32) + + +if __name__ == "__main__": + import sys + + import pytest + + sys.exit(pytest.main(["-v", "-x", __file__])) From bdf28f958fa3173f9f053d5170ddd3480014afdf Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 17:04:48 -0800 Subject: [PATCH 14/16] Update BUILD Signed-off-by: Balaji Veeramani --- python/ray/train/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index d585efe0facf..5228f0028038 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -502,6 +502,7 @@ py_test( srcs = ["tests/test_torch_detection_predictor.py"], tags = ["team:ml", "exclusive", "ray_air", "gpu"], deps = [":train_lib", ":conftest"] +) py_test( name = "test_torch_trainer", From 163d2af7d7fd03f7aed9a5d0ab380d53c7741417 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 18:18:01 -0800 Subject: [PATCH 15/16] Update torch_predictor.py Signed-off-by: Balaji Veeramani --- python/ray/train/torch/torch_predictor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/train/torch/torch_predictor.py b/python/ray/train/torch/torch_predictor.py index 2cc9d2231933..bfd96dcb0dd4 100644 --- a/python/ray/train/torch/torch_predictor.py +++ b/python/ray/train/torch/torch_predictor.py @@ -38,6 +38,7 @@ def __init__( ): self.model = model self.model.eval() + self.use_gpu = use_gpu if use_gpu: # TODO (jiaodong): #26249 Use multiple GPU devices with sharded input From cea988f70b15f4346d531ecf9761e8c38a3fd2f4 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 7 Feb 2023 22:03:16 -0800 Subject: [PATCH 16/16] Update torch_predictor.py Signed-off-by: Balaji Veeramani --- python/ray/train/torch/torch_predictor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/train/torch/torch_predictor.py b/python/ray/train/torch/torch_predictor.py index bfd96dcb0dd4..dee3f6d45597 100644 --- a/python/ray/train/torch/torch_predictor.py +++ b/python/ray/train/torch/torch_predictor.py @@ -121,6 +121,8 @@ def call_model( .. testcode:: + from ray.train.torch import TorchPredictor + # List outputs are not supported by default TorchPredictor. # So let's define a custom TorchPredictor and override call_model class MyModel(torch.nn.Module):