diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 41a003f0bf..7ccf187a8c 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -18,6 +18,7 @@ from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union from typing_extensions import Literal +import pytz import pandas from mlos_bench.environments.status import Status @@ -215,6 +216,37 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: data.rename(str.rstrip, axis='columns', inplace=True) return data + # All timestamps in the telemetry data must be greater than this date + # (a very rough approximation for the start of this feature). + _MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + + @staticmethod + def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series: + """ + Attempt to convert a column to a datetime format. + + Parameters + ---------- + datetime_col : pandas.Series + The column to convert. + + Returns + ------- + pandas.Series + The converted datetime column. + + Raises + ------ + ValueError + On parse errors. + """ + new_datetime_col = pandas.to_datetime(datetime_col, utc=True) + if new_datetime_col.isna().any(): + raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") + if new_datetime_col.le(LocalEnv._MIN_TS).any(): + raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}") + return new_datetime_col + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: (status, timestamp, _) = super().status() @@ -229,8 +261,10 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: # TODO: Use the timestamp of the CSV file as our status timestamp? # FIXME: We should not be assuming that the only output file type is a CSV. + data = self._normalize_columns( - pandas.read_csv(fname, index_col=False, parse_dates=[0])) + pandas.read_csv(fname, index_col=False)) + data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) expected_col_names = ["timestamp", "metric", "value"] if len(data.columns) != len(expected_col_names): @@ -239,7 +273,8 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: if list(data.columns) != expected_col_names: # Assume no header - this is ok for telemetry data. data = pandas.read_csv( - fname, index_col=False, parse_dates=[0], names=expected_col_names) + fname, index_col=False, names=expected_col_names) + data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) except FileNotFoundError as ex: _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) diff --git a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py index 74a1edac8c..8e13715b66 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py @@ -7,6 +7,7 @@ """ import sys from datetime import datetime, timedelta +import pytz from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.tests.environments import check_env_success @@ -22,12 +23,12 @@ def test_composite_env(tunable_groups: TunableGroups) -> None: can be used in the shell_envs by its children. See Also: http://github.com/microsoft/MLOS/issues/501 """ - ts1 = datetime.utcnow() + ts1 = datetime.utcnow().astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=2) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S") + time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") + time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") (var_prefix, var_suffix) = ("%", "%") if sys.platform == 'win32' else ("$", "") diff --git a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py index a53b7ae54e..63f366b39d 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py @@ -6,6 +6,9 @@ Unit tests for telemetry and status of LocalEnv benchmark environment. """ from datetime import datetime, timedelta +import pytz + +import pytest from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.tests.environments import check_env_success, check_env_fail_telemetry @@ -16,12 +19,12 @@ def test_local_env_telemetry(tunable_groups: TunableGroups) -> None: """ Produce benchmark and telemetry data in a local script and read it. """ - ts1 = datetime.utcnow() + ts1 = datetime.utcnow().astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S") + time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") + time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") local_env = create_local_env(tunable_groups, { "run": [ @@ -60,12 +63,12 @@ def test_local_env_telemetry_no_header(tunable_groups: TunableGroups) -> None: """ Read the telemetry data with no header. """ - ts1 = datetime.utcnow() + ts1 = datetime.utcnow().astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S") + time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") + time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") local_env = create_local_env(tunable_groups, { "run": [ @@ -89,16 +92,17 @@ def test_local_env_telemetry_no_header(tunable_groups: TunableGroups) -> None: ) +@pytest.mark.filterwarnings("ignore:.*(Could not infer format, so each element will be parsed individually, falling back to `dateutil`).*:UserWarning::0") # pylint: disable=line-too-long # noqa def test_local_env_telemetry_wrong_header(tunable_groups: TunableGroups) -> None: """ Read the telemetry data with incorrect header. """ - ts1 = datetime.utcnow() + ts1 = datetime.utcnow().astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S") + time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") + time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") local_env = create_local_env(tunable_groups, { "run": [ @@ -119,12 +123,12 @@ def test_local_env_telemetry_invalid(tunable_groups: TunableGroups) -> None: """ Fail when the telemetry data has wrong format. """ - ts1 = datetime.utcnow() + ts1 = datetime.utcnow().astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S") + time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") + time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") local_env = create_local_env(tunable_groups, { "run": [