diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 0353981eac..7ccf187a8c 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -216,8 +216,37 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: data.rename(str.rstrip, axis='columns', inplace=True) return data + # All timestamps in the telemetry data must be greater than this date + # (a very rough approximation for the start of this feature). _MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + @staticmethod + def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series: + """ + Attempt to convert a column to a datetime format. + + Parameters + ---------- + datetime_col : pandas.Series + The column to convert. + + Returns + ------- + pandas.Series + The converted datetime column. + + Raises + ------ + ValueError + On parse errors. + """ + new_datetime_col = pandas.to_datetime(datetime_col, utc=True) + if new_datetime_col.isna().any(): + raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") + if new_datetime_col.le(LocalEnv._MIN_TS).any(): + raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}") + return new_datetime_col + def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: (status, timestamp, _) = super().status() @@ -235,10 +264,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: data = self._normalize_columns( pandas.read_csv(fname, index_col=False)) - date_col = pandas.to_datetime(data.iloc[:, 0], utc=True) - if date_col.le(self._MIN_TS).any(): - raise ValueError(f"Invalid date range in the telemetry data: {date_col}") - data.iloc[:, 0] = date_col + data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) expected_col_names = ["timestamp", "metric", "value"] if len(data.columns) != len(expected_col_names): @@ -248,12 +274,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: # Assume no header - this is ok for telemetry data. data = pandas.read_csv( fname, index_col=False, names=expected_col_names) - date_col = pandas.to_datetime(data.iloc[:, 0], utc=True) - if date_col.isna().any(): - raise ValueError(f"Invalid date format in the telemetry data: {date_col}") - if date_col.le(self._MIN_TS).any(): - raise ValueError(f"Invalid date range in the telemetry data: {date_col}") - data.iloc[:, 0] = date_col + data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) except FileNotFoundError as ex: _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)