Skip to content

Commit

Permalink
consolidate logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bpkroth committed Feb 20, 2024
1 parent 8a141a6 commit 381963a
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions mlos_bench/mlos_bench/environments/local/local_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,37 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
data.rename(str.rstrip, axis='columns', inplace=True)
return data

# All timestamps in the telemetry data must be greater than this date
# (a very rough approximation for the start of this feature).
_MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)

@staticmethod
def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series:
"""
Attempt to convert a column to a datetime format.
Parameters
----------
datetime_col : pandas.Series
The column to convert.
Returns
-------
pandas.Series
The converted datetime column.
Raises
------
ValueError
On parse errors.
"""
new_datetime_col = pandas.to_datetime(datetime_col, utc=True)
if new_datetime_col.isna().any():
raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}")
if new_datetime_col.le(LocalEnv._MIN_TS).any():
raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}")
return new_datetime_col

def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

(status, timestamp, _) = super().status()
Expand All @@ -235,10 +264,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

data = self._normalize_columns(
pandas.read_csv(fname, index_col=False))
date_col = pandas.to_datetime(data.iloc[:, 0], utc=True)
if date_col.le(self._MIN_TS).any():
raise ValueError(f"Invalid date range in the telemetry data: {date_col}")
data.iloc[:, 0] = date_col
data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0])

expected_col_names = ["timestamp", "metric", "value"]
if len(data.columns) != len(expected_col_names):
Expand All @@ -248,12 +274,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
# Assume no header - this is ok for telemetry data.
data = pandas.read_csv(
fname, index_col=False, names=expected_col_names)
date_col = pandas.to_datetime(data.iloc[:, 0], utc=True)
if date_col.isna().any():
raise ValueError(f"Invalid date format in the telemetry data: {date_col}")
if date_col.le(self._MIN_TS).any():
raise ValueError(f"Invalid date range in the telemetry data: {date_col}")
data.iloc[:, 0] = date_col
data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0])

except FileNotFoundError as ex:
_LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
Expand Down

0 comments on commit 381963a

Please sign in to comment.