From 6c595402dd9a2ded1abdf7fb557c403fc3f5b8ae Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Tue, 23 Jul 2024 15:04:41 -0700 Subject: [PATCH 1/3] adding in translation logic for datetime list Signed-off-by: Matthew Owen --- python/ray/data/_internal/numpy_support.py | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/ray/data/_internal/numpy_support.py b/python/ray/data/_internal/numpy_support.py index fbb54de7555b..4e8f2abc78ae 100644 --- a/python/ray/data/_internal/numpy_support.py +++ b/python/ray/data/_internal/numpy_support.py @@ -1,4 +1,5 @@ import collections +from datetime import datetime from typing import Any, Dict, List, Union import numpy as np @@ -38,6 +39,28 @@ def validate_numpy_batch(batch: Union[Dict[str, np.ndarray], Dict[str, list]]) - f"a numpy batch to a block, got: {type(batch)} " f"({_truncated_repr(batch)})" ) + +def _detect_highest_datetime_precision_dtype(datetime_list: List[datetime]) -> str: + highest_precision = 'datetime64[D]' # Start with day precision + + for dt in datetime_list: + if dt.microsecond != 0: + highest_precision = 'datetime64[ns]' + break + elif dt.second != 0: + highest_precision = 'datetime64[s]' + elif dt.minute != 0: + highest_precision = 'datetime64[m]' + elif dt.hour != 0: + highest_precision = 'datetime64[h]' + + return highest_precision + +def _convert_datetime_list_to_array(datetime_list: List[datetime]) -> np.ndarray: + # Detect highest precision + dtype_with_precision = _detect_highest_datetime_precision_dtype(datetime_list) + + return np.array([np.datetime64(dt) for dt in datetime_list], dtype=dtype_with_precision) def convert_udf_returns_to_numpy(udf_return_col: Any) -> Any: @@ -64,6 +87,9 @@ def convert_udf_returns_to_numpy(udf_return_col: Any) -> Any: udf_return_col = np.expand_dims(udf_return_col[0], axis=0) return udf_return_col + if all(isinstance(elem, datetime) for elem in udf_return_col): + return _convert_datetime_list_to_array(udf_return_col) + # Try to convert list values into an numpy array via # np.array(), so users don't need to manually cast. # NOTE: we don't cast generic iterables, since types like From f9e9b442c2666f7d4b4713b59547a2c365e58a16 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 25 Jul 2024 12:21:54 -0700 Subject: [PATCH 2/3] add tests, debug and tweak code Signed-off-by: Matthew Owen --- python/ray/data/_internal/numpy_support.py | 24 +++++++--------- python/ray/data/tests/test_numpy_support.py | 32 +++++++++++++++++++++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/numpy_support.py b/python/ray/data/_internal/numpy_support.py index 4e8f2abc78ae..e61f77577dac 100644 --- a/python/ray/data/_internal/numpy_support.py +++ b/python/ray/data/_internal/numpy_support.py @@ -40,27 +40,25 @@ def validate_numpy_batch(batch: Union[Dict[str, np.ndarray], Dict[str, list]]) - f"({_truncated_repr(batch)})" ) -def _detect_highest_datetime_precision_dtype(datetime_list: List[datetime]) -> str: - highest_precision = 'datetime64[D]' # Start with day precision +def _detect_highest_datetime_precision(datetime_list: List[datetime]) -> str: + highest_precision = 'D' for dt in datetime_list: - if dt.microsecond != 0: - highest_precision = 'datetime64[ns]' + if dt.microsecond != 0 and dt.microsecond % 1000 != 0: + highest_precision = 'us' break - elif dt.second != 0: - highest_precision = 'datetime64[s]' - elif dt.minute != 0: - highest_precision = 'datetime64[m]' - elif dt.hour != 0: - highest_precision = 'datetime64[h]' + elif dt.microsecond != 0 and dt.microsecond % 1000 == 0: + highest_precision = 'ms' + elif dt.hour != 0 or dt.minute != 0 or dt.second != 0: + # pyarrow does not support h or m, use s for those cases too + highest_precision = 's' return highest_precision def _convert_datetime_list_to_array(datetime_list: List[datetime]) -> np.ndarray: - # Detect highest precision - dtype_with_precision = _detect_highest_datetime_precision_dtype(datetime_list) + precision = _detect_highest_datetime_precision(datetime_list) - return np.array([np.datetime64(dt) for dt in datetime_list], dtype=dtype_with_precision) + return np.array([np.datetime64(dt, precision) for dt in datetime_list], dtype=f"datetime64[{precision}]") def convert_udf_returns_to_numpy(udf_return_col: Any) -> Any: diff --git a/python/ray/data/tests/test_numpy_support.py b/python/ray/data/tests/test_numpy_support.py index 3bf29f60e05b..7f7cec997642 100644 --- a/python/ray/data/tests/test_numpy_support.py +++ b/python/ray/data/tests/test_numpy_support.py @@ -6,6 +6,7 @@ from ray.air.util.tensor_extensions.utils import create_ragged_ndarray from ray.data.tests.conftest import * # noqa from ray.tests.conftest import * # noqa +from datetime import datetime class UserObj: @@ -45,6 +46,37 @@ def test_list_of_objects(ray_start_regular_shared): output = do_map_batches(data) assert_structure_equals(output, np.array([1, 2, 3, UserObj()])) +DATETIME_DAY_PRECISION = datetime(year=2024, month=1, day=1) +DATETIME_HOUR_PRECISION = datetime(year=2024, month=1, day=1, hour=1) +DATETIME_MIN_PRECISION = datetime(year=2024, month=1, day=1, minute=1) +DATETIME_SEC_PRECISION = datetime(year=2024, month=1, day=1, second=1) +DATETIME_MILLISEC_PRECISION = datetime(year=2024, month=1, day=1, microsecond=1000) +DATETIME_MICROSEC_PRECISION = datetime(year=2024, month=1, day=1, microsecond=1) + +DATETIME64_DAY_PRECISION = np.datetime64("2024-01-01") +DATETIME64_HOUR_PRECISION = np.datetime64("2024-01-01T01:00", 's') +DATETIME64_MIN_PRECISION = np.datetime64("2024-01-01T00:01", 's') +DATETIME64_SEC_PRECISION = np.datetime64("2024-01-01T00:00:01") +DATETIME64_MILLISEC_PRECISION = np.datetime64("2024-01-01T00:00:00.001") +DATETIME64_MICROSEC_PRECISION = np.datetime64("2024-01-01T00:00:00.000001") + +@pytest.mark.parametrize( + "data,expected_output", + [ + ([DATETIME_DAY_PRECISION], np.array([DATETIME64_DAY_PRECISION])), + ([DATETIME_HOUR_PRECISION], np.array([DATETIME64_HOUR_PRECISION])), + ([DATETIME_MIN_PRECISION], np.array([DATETIME64_MIN_PRECISION])), + ([DATETIME_SEC_PRECISION], np.array([DATETIME64_SEC_PRECISION])), + ([DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_MILLISEC_PRECISION])), + ([DATETIME_MICROSEC_PRECISION], np.array([DATETIME64_MICROSEC_PRECISION])), + ([DATETIME_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], dtype="datetime64[us]")), + ([DATETIME_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], dtype="datetime64[ms]")), + ] +) +def test_list_of_datetimes(data, expected_output, ray_start_regular_shared): + output = do_map_batches(data) + assert_structure_equals(output, expected_output) + def test_array_like(ray_start_regular_shared): data = torch.Tensor([1, 2, 3]) From a7c6b4d922ce43a99383cd9c62b2a8fdb9fa5af7 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 25 Jul 2024 12:55:16 -0700 Subject: [PATCH 3/3] robocop Signed-off-by: Matthew Owen --- python/ray/data/_internal/numpy_support.py | 23 +++++++++++------- python/ray/data/tests/test_numpy_support.py | 27 ++++++++++++++++----- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/python/ray/data/_internal/numpy_support.py b/python/ray/data/_internal/numpy_support.py index e61f77577dac..9e6a7c305dfb 100644 --- a/python/ray/data/_internal/numpy_support.py +++ b/python/ray/data/_internal/numpy_support.py @@ -39,26 +39,31 @@ def validate_numpy_batch(batch: Union[Dict[str, np.ndarray], Dict[str, list]]) - f"a numpy batch to a block, got: {type(batch)} " f"({_truncated_repr(batch)})" ) - + + def _detect_highest_datetime_precision(datetime_list: List[datetime]) -> str: - highest_precision = 'D' - + highest_precision = "D" + for dt in datetime_list: if dt.microsecond != 0 and dt.microsecond % 1000 != 0: - highest_precision = 'us' + highest_precision = "us" break elif dt.microsecond != 0 and dt.microsecond % 1000 == 0: - highest_precision = 'ms' + highest_precision = "ms" elif dt.hour != 0 or dt.minute != 0 or dt.second != 0: # pyarrow does not support h or m, use s for those cases too - highest_precision = 's' - + highest_precision = "s" + return highest_precision + def _convert_datetime_list_to_array(datetime_list: List[datetime]) -> np.ndarray: precision = _detect_highest_datetime_precision(datetime_list) - - return np.array([np.datetime64(dt, precision) for dt in datetime_list], dtype=f"datetime64[{precision}]") + + return np.array( + [np.datetime64(dt, precision) for dt in datetime_list], + dtype=f"datetime64[{precision}]", + ) def convert_udf_returns_to_numpy(udf_return_col: Any) -> Any: diff --git a/python/ray/data/tests/test_numpy_support.py b/python/ray/data/tests/test_numpy_support.py index 7f7cec997642..c14038918c0a 100644 --- a/python/ray/data/tests/test_numpy_support.py +++ b/python/ray/data/tests/test_numpy_support.py @@ -1,3 +1,5 @@ +from datetime import datetime + import numpy as np import pytest import torch @@ -6,7 +8,6 @@ from ray.air.util.tensor_extensions.utils import create_ragged_ndarray from ray.data.tests.conftest import * # noqa from ray.tests.conftest import * # noqa -from datetime import datetime class UserObj: @@ -46,6 +47,7 @@ def test_list_of_objects(ray_start_regular_shared): output = do_map_batches(data) assert_structure_equals(output, np.array([1, 2, 3, UserObj()])) + DATETIME_DAY_PRECISION = datetime(year=2024, month=1, day=1) DATETIME_HOUR_PRECISION = datetime(year=2024, month=1, day=1, hour=1) DATETIME_MIN_PRECISION = datetime(year=2024, month=1, day=1, minute=1) @@ -54,12 +56,13 @@ def test_list_of_objects(ray_start_regular_shared): DATETIME_MICROSEC_PRECISION = datetime(year=2024, month=1, day=1, microsecond=1) DATETIME64_DAY_PRECISION = np.datetime64("2024-01-01") -DATETIME64_HOUR_PRECISION = np.datetime64("2024-01-01T01:00", 's') -DATETIME64_MIN_PRECISION = np.datetime64("2024-01-01T00:01", 's') +DATETIME64_HOUR_PRECISION = np.datetime64("2024-01-01T01:00", "s") +DATETIME64_MIN_PRECISION = np.datetime64("2024-01-01T00:01", "s") DATETIME64_SEC_PRECISION = np.datetime64("2024-01-01T00:00:01") DATETIME64_MILLISEC_PRECISION = np.datetime64("2024-01-01T00:00:00.001") DATETIME64_MICROSEC_PRECISION = np.datetime64("2024-01-01T00:00:00.000001") + @pytest.mark.parametrize( "data,expected_output", [ @@ -69,9 +72,21 @@ def test_list_of_objects(ray_start_regular_shared): ([DATETIME_SEC_PRECISION], np.array([DATETIME64_SEC_PRECISION])), ([DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_MILLISEC_PRECISION])), ([DATETIME_MICROSEC_PRECISION], np.array([DATETIME64_MICROSEC_PRECISION])), - ([DATETIME_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], dtype="datetime64[us]")), - ([DATETIME_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], np.array([DATETIME64_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], dtype="datetime64[ms]")), - ] + ( + [DATETIME_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], + np.array( + [DATETIME64_MICROSEC_PRECISION, DATETIME_MILLISEC_PRECISION], + dtype="datetime64[us]", + ), + ), + ( + [DATETIME_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], + np.array( + [DATETIME64_SEC_PRECISION, DATETIME_MILLISEC_PRECISION], + dtype="datetime64[ms]", + ), + ), + ], ) def test_list_of_datetimes(data, expected_output, ray_start_regular_shared): output = do_map_batches(data)