From 90869922aaef17d7eb1ceff3183b311aa4538218 Mon Sep 17 00:00:00 2001 From: Sergiy Matusevych Date: Fri, 15 Mar 2024 17:15:22 -0700 Subject: [PATCH 01/21] do not use deprecated datetime.utcnow() and .utcfromtimestamp() functions --- .../environments/base_environment.py | 4 +-- .../environments/remote/remote_env.py | 4 +-- .../mlos_bench/schedulers/base_scheduler.py | 4 +-- .../mlos_bench/schedulers/sync_scheduler.py | 4 +-- .../services/remote/azure/azure_auth.py | 8 +++--- .../mlos_bench/storage/sql/experiment.py | 4 +-- .../local/composite_local_env_test.py | 4 +-- .../local/local_env_telemetry_test.py | 10 +++---- .../mlos_bench/tests/storage/exp_load_test.py | 28 +++++++++---------- .../mlos_bench/tests/storage/sql/fixtures.py | 4 +-- .../tests/storage/trial_config_test.py | 8 +++--- .../tests/storage/trial_schedule_test.py | 4 +-- .../tests/storage/trial_telemetry_test.py | 8 +++--- 13 files changed, 47 insertions(+), 47 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/base_environment.py b/mlos_bench/mlos_bench/environments/base_environment.py index 38b7bd1142..ff191e97ba 100644 --- a/mlos_bench/mlos_bench/environments/base_environment.py +++ b/mlos_bench/mlos_bench/environments/base_environment.py @@ -9,7 +9,7 @@ import abc import json import logging -from datetime import datetime +from datetime import datetime, UTC from types import TracebackType from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union from typing_extensions import Literal @@ -411,7 +411,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: """ # Make sure we create a context before invoking setup/run/status/teardown assert self._in_context - timestamp = datetime.utcnow() + timestamp = datetime.now(UTC) if self._is_ready: return (Status.READY, timestamp, []) _LOG.warning("Environment not ready: %s", self) diff --git a/mlos_bench/mlos_bench/environments/remote/remote_env.py b/mlos_bench/mlos_bench/environments/remote/remote_env.py index a3675d4e7a..89df3cade1 100644 --- a/mlos_bench/mlos_bench/environments/remote/remote_env.py +++ b/mlos_bench/mlos_bench/environments/remote/remote_env.py @@ -9,7 +9,7 @@ """ import logging -from datetime import datetime +from datetime import datetime, UTC from typing import Dict, Iterable, Optional, Tuple from mlos_bench.environments.status import Status @@ -174,5 +174,5 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optiona (status, output) = self._remote_exec_service.get_remote_exec_results(output) _LOG.debug("Status: %s :: %s", status, output) # FIXME: get the timestamp from the remote environment! - timestamp = datetime.utcnow() + timestamp = datetime.now(UTC) return (status, timestamp, output) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index 0cd30c0896..9f1a64d1e1 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -8,7 +8,7 @@ import json import logging -from datetime import datetime +from datetime import datetime, UTC from abc import ABCMeta, abstractmethod from types import TracebackType @@ -231,7 +231,7 @@ def _run_schedule(self, running: bool = False) -> None: Scheduler part of the loop. Check for pending trials in the queue and run them. """ assert self.experiment is not None - for trial in self.experiment.pending_trials(datetime.utcnow(), running=running): + for trial in self.experiment.pending_trials(datetime.now(UTC), running=running): self.run_trial(trial) @abstractmethod diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index baa058bd57..67151f0d43 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -7,7 +7,7 @@ """ import logging -from datetime import datetime +from datetime import datetime, UTC from mlos_bench.environments.status import Status from mlos_bench.schedulers.base_scheduler import Scheduler @@ -49,7 +49,7 @@ def run_trial(self, trial: Storage.Trial) -> None: _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) # FIXME: Use the actual timestamp from the environment. _LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED) - trial.update(Status.FAILED, datetime.utcnow()) + trial.update(Status.FAILED, datetime.now(UTC)) return (status, timestamp, results) = self.environment.run() # Block and wait for the final result. diff --git a/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py b/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py index 3b031c0adb..32d4ffe23c 100644 --- a/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py +++ b/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py @@ -6,9 +6,9 @@ A collection Service functions for managing VMs on Azure. """ -import datetime import logging from base64 import b64decode +from datetime import datetime, UTC from typing import Any, Callable, Dict, List, Optional, Union import azure.identity as azure_id @@ -60,7 +60,7 @@ def __init__(self, self._req_interval = float(self.config.get("tokenRequestInterval", self._REQ_INTERVAL)) self._access_token = "RENEW *NOW*" - self._token_expiration_ts = datetime.datetime.utcnow() # Typically, some future timestamp. + self._token_expiration_ts = datetime.now(UTC) # Typically, some future timestamp. # Login as ourselves self._cred: Union[azure_id.AzureCliCredential, azure_id.CertificateCredential] @@ -113,12 +113,12 @@ def get_access_token(self) -> str: if "spClientId" in self.config: self._init_sp() - ts_diff = (self._token_expiration_ts - datetime.datetime.utcnow()).total_seconds() + ts_diff = (self._token_expiration_ts - datetime.now(UTC)).total_seconds() _LOG.debug("Time to renew the token: %.2f sec.", ts_diff) if ts_diff < self._req_interval: _LOG.debug("Request new accessToken") res = self._cred.get_token("https://management.azure.com/.default") - self._token_expiration_ts = datetime.datetime.utcfromtimestamp(res.expires_on) + self._token_expiration_ts = datetime.fromtimestamp(res.expires_on, tz=UTC) self._access_token = res.token _LOG.info("Got new accessToken. Expiration time: %s", self._token_expiration_ts) return self._access_token diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 25998364d2..b1244b285d 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -8,7 +8,7 @@ import logging import hashlib -from datetime import datetime +from datetime import datetime, UTC from typing import Optional, Tuple, List, Dict, Iterator, Any from sqlalchemy import Engine, Connection, Table, column, func @@ -246,7 +246,7 @@ def new_trial(self, tunables: TunableGroups, ts_start: Optional[datetime] = None exp_id=self._experiment_id, trial_id=self._trial_id, config_id=config_id, - ts_start=ts_start or datetime.utcnow(), + ts_start=ts_start or datetime.now(UTC), status='PENDING', )) diff --git a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py index 8e13715b66..71126673bf 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py @@ -6,7 +6,7 @@ Unit tests for the composition of several LocalEnv benchmark environments. """ import sys -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC import pytz from mlos_bench.tunables.tunable_groups import TunableGroups @@ -23,7 +23,7 @@ def test_composite_env(tunable_groups: TunableGroups) -> None: can be used in the shell_envs by its children. See Also: http://github.com/microsoft/MLOS/issues/501 """ - ts1 = datetime.utcnow().astimezone(pytz.UTC) + ts1 = datetime.now(UTC).astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=2) diff --git a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py index 63f366b39d..284af92227 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py @@ -5,7 +5,7 @@ """ Unit tests for telemetry and status of LocalEnv benchmark environment. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC import pytz import pytest @@ -19,7 +19,7 @@ def test_local_env_telemetry(tunable_groups: TunableGroups) -> None: """ Produce benchmark and telemetry data in a local script and read it. """ - ts1 = datetime.utcnow().astimezone(pytz.UTC) + ts1 = datetime.now(UTC).astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) @@ -63,7 +63,7 @@ def test_local_env_telemetry_no_header(tunable_groups: TunableGroups) -> None: """ Read the telemetry data with no header. """ - ts1 = datetime.utcnow().astimezone(pytz.UTC) + ts1 = datetime.now(UTC).astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) @@ -97,7 +97,7 @@ def test_local_env_telemetry_wrong_header(tunable_groups: TunableGroups) -> None """ Read the telemetry data with incorrect header. """ - ts1 = datetime.utcnow().astimezone(pytz.UTC) + ts1 = datetime.now(UTC).astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) @@ -123,7 +123,7 @@ def test_local_env_telemetry_invalid(tunable_groups: TunableGroups) -> None: """ Fail when the telemetry data has wrong format. """ - ts1 = datetime.utcnow().astimezone(pytz.UTC) + ts1 = datetime.now(UTC).astimezone(pytz.UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) diff --git a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py index 3067156a4f..207691b7d8 100644 --- a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py +++ b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py @@ -5,7 +5,7 @@ """ Unit tests for the storage subsystem. """ -from datetime import datetime +from datetime import datetime, UTC import pytest @@ -29,7 +29,7 @@ def test_exp_pending_empty(exp_storage: Storage.Experiment) -> None: """ Try to retrieve pending experiments from the empty storage. """ - trials = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + trials = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert not trials @@ -39,7 +39,7 @@ def test_exp_trial_pending(exp_storage: Storage.Experiment, Start a trial and check that it is pending. """ trial = exp_storage.new_trial(tunable_groups) - (pending,) = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + (pending,) = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert pending.trial_id == trial.trial_id assert pending.tunables == tunable_groups @@ -58,7 +58,7 @@ def test_exp_trial_pending_many(exp_storage: Storage.Experiment, } pending_ids = { pending.trial_id - for pending in exp_storage.pending_trials(datetime.utcnow(), running=True) + for pending in exp_storage.pending_trials(datetime.now(UTC), running=True) } assert len(pending_ids) == 3 assert trial_ids == pending_ids @@ -70,8 +70,8 @@ def test_exp_trial_pending_fail(exp_storage: Storage.Experiment, Start a trial, fail it, and and check that it is NOT pending. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.FAILED, datetime.utcnow()) - trials = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + trial.update(Status.FAILED, datetime.now(UTC)) + trials = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert not trials @@ -81,8 +81,8 @@ def test_exp_trial_success(exp_storage: Storage.Experiment, Start a trial, finish it successfully, and and check that it is NOT pending. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.SUCCEEDED, datetime.utcnow(), 99.9) - trials = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + trial.update(Status.SUCCEEDED, datetime.now(UTC), 99.9) + trials = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert not trials @@ -92,7 +92,7 @@ def test_exp_trial_update_categ(exp_storage: Storage.Experiment, Update the trial with multiple metrics, some of which are categorical. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.SUCCEEDED, datetime.utcnow(), {"score": 99.9, "benchmark": "test"}) + trial.update(Status.SUCCEEDED, datetime.now(UTC), {"score": 99.9, "benchmark": "test"}) assert exp_storage.load() == ( [trial.trial_id], [{ @@ -112,9 +112,9 @@ def test_exp_trial_update_twice(exp_storage: Storage.Experiment, Update the trial status twice and receive an error. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.FAILED, datetime.utcnow()) + trial.update(Status.FAILED, datetime.now(UTC)) with pytest.raises(RuntimeError): - trial.update(Status.SUCCEEDED, datetime.utcnow(), 99.9) + trial.update(Status.SUCCEEDED, datetime.now(UTC), 99.9) def test_exp_trial_pending_3(exp_storage: Storage.Experiment, @@ -129,10 +129,10 @@ def test_exp_trial_pending_3(exp_storage: Storage.Experiment, trial_succ = exp_storage.new_trial(tunable_groups) trial_pend = exp_storage.new_trial(tunable_groups) - trial_fail.update(Status.FAILED, datetime.utcnow()) - trial_succ.update(Status.SUCCEEDED, datetime.utcnow(), score) + trial_fail.update(Status.FAILED, datetime.now(UTC)) + trial_succ.update(Status.SUCCEEDED, datetime.now(UTC), score) - (pending,) = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + (pending,) = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert pending.trial_id == trial_pend.trial_id (trial_ids, configs, scores, status) = exp_storage.load() diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index c5c808876f..03cc2bf780 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -6,7 +6,7 @@ Test fixtures for mlos_bench storage. """ -from datetime import datetime +from datetime import datetime, UTC from random import random, seed as rand_seed from typing import Generator, Optional @@ -150,7 +150,7 @@ def _dummy_run_exp(exp: SqlStorage.Experiment, tunable_name: Optional[str]) -> S tunable_value_norm = base_score * (tunable_value - tunable_min) / tunable_range else: tunable_value_norm = 0 - timestamp = datetime.utcnow() + timestamp = datetime.now(UTC) trial.update_telemetry(status=Status.RUNNING, timestamp=timestamp, metrics=[ (timestamp, "some-metric", tunable_value_norm + random() / 100), ]) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py index 563ade7106..e89f5b1919 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py @@ -5,7 +5,7 @@ """ Unit tests for saving and retrieving additional parameters of pending trials. """ -from datetime import datetime +from datetime import datetime, UTC from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups @@ -18,7 +18,7 @@ def test_exp_trial_pending(exp_storage: Storage.Experiment, """ config = {"location": "westus2", "num_repeats": 100} trial = exp_storage.new_trial(tunable_groups, config=config) - (pending,) = list(exp_storage.pending_trials(datetime.utcnow(), running=True)) + (pending,) = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert pending.trial_id == trial.trial_id assert pending.tunables == tunable_groups assert pending.config() == { @@ -57,7 +57,7 @@ def test_exp_trial_configs(exp_storage: Storage.Experiment, pending_ids = [ pending.tunable_config_id - for pending in exp_storage.pending_trials(datetime.utcnow(), running=True) + for pending in exp_storage.pending_trials(datetime.now(UTC), running=True) ] assert len(pending_ids) == 6 assert len(set(pending_ids)) == 2 @@ -71,7 +71,7 @@ def test_exp_trial_no_config(exp_no_tunables_storage: Storage.Experiment) -> Non empty_config: dict = {} tunable_groups = TunableGroups(config=empty_config) trial = exp_no_tunables_storage.new_trial(tunable_groups, config=empty_config) - (pending,) = exp_no_tunables_storage.pending_trials(datetime.utcnow(), running=True) + (pending,) = exp_no_tunables_storage.pending_trials(datetime.now(UTC), running=True) assert pending.trial_id == trial.trial_id assert pending.tunables == tunable_groups assert pending.config() == { diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index 3a582d559d..f7e7bc8115 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -5,7 +5,7 @@ """ Unit tests for scheduling trials for some future time. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC from typing import Iterator, Set @@ -26,7 +26,7 @@ def test_schedule_trial(exp_storage: Storage.Experiment, """ Schedule several trials for future execution and retrieve them later at certain timestamps. """ - timestamp = datetime.utcnow() + timestamp = datetime.now(UTC) timedelta_1min = timedelta(minutes=1) timedelta_1hr = timedelta(hours=1) config = {"location": "westus2", "num_repeats": 10} diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index 967bdb8d31..090b946243 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -5,7 +5,7 @@ """ Unit tests for saving and restoring the telemetry data. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC from typing import Any, List, Optional, Tuple import pytest @@ -28,7 +28,7 @@ def telemetry_data() -> List[Tuple[datetime, str, Any]]: List[Tuple[datetime, str, str]] A list of (timestamp, metric_id, metric_value) """ - timestamp1 = datetime.utcnow() + timestamp1 = datetime.now(UTC) timestamp2 = timestamp1 + timedelta(seconds=1) return sorted([ (timestamp1, "cpu_load", 10.1), @@ -58,7 +58,7 @@ def test_update_telemetry(storage: Storage, trial = exp_storage.new_trial(tunable_groups) assert exp_storage.load_telemetry(trial.trial_id) == [] - trial.update_telemetry(Status.RUNNING, datetime.utcnow(), telemetry_data) + trial.update_telemetry(Status.RUNNING, datetime.now(UTC), telemetry_data) assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data) # Also check that the TrialData telemetry looks right. @@ -73,7 +73,7 @@ def test_update_telemetry_twice(exp_storage: Storage.Experiment, Make sure update_telemetry() call is idempotent. """ trial = exp_storage.new_trial(tunable_groups) - timestamp = datetime.utcnow() + timestamp = datetime.now(UTC) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) From 6b0f4116c2fef4b9c7154554705e404c2880ad11 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 17:02:32 +0000 Subject: [PATCH 02/21] wip: canonicalize storage and return of data in utc format --- .../mlos_bench/storage/base_trial_data.py | 4 +- mlos_bench/mlos_bench/storage/sql/common.py | 25 ++++++++--- .../mlos_bench/storage/sql/experiment.py | 11 +++-- mlos_bench/mlos_bench/storage/sql/schema.py | 4 +- mlos_bench/mlos_bench/storage/sql/trial.py | 8 ++++ .../mlos_bench/storage/sql/trial_data.py | 7 ++- mlos_bench/mlos_bench/storage/util.py | 45 ++++++++++++++++++- .../mlos_bench/tests/storage/sql/fixtures.py | 1 + .../tests/storage/trial_telemetry_test.py | 4 +- 9 files changed, 94 insertions(+), 15 deletions(-) diff --git a/mlos_bench/mlos_bench/storage/base_trial_data.py b/mlos_bench/mlos_bench/storage/base_trial_data.py index d9aecd7b54..c886589fae 100644 --- a/mlos_bench/mlos_bench/storage/base_trial_data.py +++ b/mlos_bench/mlos_bench/storage/base_trial_data.py @@ -6,7 +6,7 @@ Base interface for accessing the stored benchmark trial data. """ from abc import ABCMeta, abstractmethod -from datetime import datetime +from datetime import datetime, UTC from typing import Any, Dict, Optional, TYPE_CHECKING import pandas @@ -38,6 +38,8 @@ def __init__(self, *, self._experiment_id = experiment_id self._trial_id = trial_id self._tunable_config_id = tunable_config_id + assert ts_start.tzinfo == UTC, "ts_start must be in UTC" + assert ts_end is None or ts_end.tzinfo == UTC, "ts_end must be in UTC if not None" self._ts_start = ts_start self._ts_end = ts_end self._status = status diff --git a/mlos_bench/mlos_bench/storage/sql/common.py b/mlos_bench/mlos_bench/storage/sql/common.py index ce08e839b3..8ecff95b45 100644 --- a/mlos_bench/mlos_bench/storage/sql/common.py +++ b/mlos_bench/mlos_bench/storage/sql/common.py @@ -14,6 +14,7 @@ from mlos_bench.storage.base_experiment_data import ExperimentData from mlos_bench.storage.base_trial_data import TrialData from mlos_bench.storage.sql.schema import DbSchema +from mlos_bench.storage.util import utcify_timestamp, utcify_nullable_timestamp def get_trials( @@ -48,8 +49,8 @@ def get_trials( experiment_id=experiment_id, trial_id=trial.trial_id, config_id=trial.config_id, - ts_start=trial.ts_start, - ts_end=trial.ts_end, + ts_start=utcify_timestamp(trial.ts_start, origin="utc"), + ts_end=utcify_nullable_timestamp(trial.ts_end, origin="utc"), status=Status[trial.status], ) for trial in trials.fetchall() @@ -107,9 +108,23 @@ def get_results_df( ) cur_trials = conn.execute(cur_trials_stmt) trials_df = pandas.DataFrame( - [(row.trial_id, row.ts_start, row.ts_end, row.config_id, row.tunable_config_trial_group_id, row.status) - for row in cur_trials.fetchall()], - columns=['trial_id', 'ts_start', 'ts_end', 'tunable_config_id', 'tunable_config_trial_group_id', 'status']) + [( + row.trial_id, + utcify_timestamp(row.ts_start, origin="utc"), + utcify_nullable_timestamp(row.ts_end, origin="utc"), + row.config_id, + row.tunable_config_trial_group_id, + row.status, + ) for row in cur_trials.fetchall()], + columns=[ + 'trial_id', + 'ts_start', + 'ts_end', + 'tunable_config_id', + 'tunable_config_trial_group_id', + 'status', + ] + ) # Get each trial's config in wide format. configs_stmt = schema.trial.select().with_only_columns( diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index b1244b285d..4122e65383 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -18,6 +18,7 @@ from mlos_bench.storage.base_storage import Storage from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.trial import Trial +from mlos_bench.storage.util import utcify_timestamp from mlos_bench.util import nullable _LOG = logging.getLogger(__name__) @@ -120,7 +121,9 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]: self._schema.trial_telemetry.c.metric_id, ) ) - return [(row.ts, row.metric_id, row.metric_value) + # Not all storage backends store the original zone info. + # We try to ensure data is entered in UTC and augment it on return again here. + return [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()] def load(self, @@ -184,6 +187,7 @@ def _save_params(conn: Connection, table: Table, def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) + timestamp = timestamp.astimezone(UTC) if timestamp.tzinfo else timestamp.replace(tzinfo=UTC) if running: pending_status = ['PENDING', 'READY', 'RUNNING'] else: @@ -238,7 +242,8 @@ def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int: def new_trial(self, tunables: TunableGroups, ts_start: Optional[datetime] = None, config: Optional[Dict[str, Any]] = None) -> Storage.Trial: - _LOG.debug("Create trial: %s:%d", self._experiment_id, self._trial_id) + ts_start = utcify_timestamp(ts_start or datetime.now(UTC), origin="local") + _LOG.debug("Create trial: %s:%d @ %s", self._experiment_id, self._trial_id, ts_start) with self._engine.begin() as conn: try: config_id = self._get_config_id(conn, tunables) @@ -246,7 +251,7 @@ def new_trial(self, tunables: TunableGroups, ts_start: Optional[datetime] = None exp_id=self._experiment_id, trial_id=self._trial_id, config_id=config_id, - ts_start=ts_start or datetime.now(UTC), + ts_start=ts_start, status='PENDING', )) diff --git a/mlos_bench/mlos_bench/storage/sql/schema.py b/mlos_bench/mlos_bench/storage/sql/schema.py index 9fc801b3eb..c59adc1c67 100644 --- a/mlos_bench/mlos_bench/storage/sql/schema.py +++ b/mlos_bench/mlos_bench/storage/sql/schema.py @@ -155,7 +155,7 @@ def __init__(self, engine: Engine): self._meta, Column("exp_id", String(self._ID_LEN), nullable=False), Column("trial_id", Integer, nullable=False), - Column("ts", DateTime, nullable=False, default="now"), + Column("ts", DateTime(timezone=True), nullable=False, default="now"), Column("status", String(self._STATUS_LEN), nullable=False), UniqueConstraint("exp_id", "trial_id", "ts"), @@ -181,7 +181,7 @@ def __init__(self, engine: Engine): self._meta, Column("exp_id", String(self._ID_LEN), nullable=False), Column("trial_id", Integer, nullable=False), - Column("ts", DateTime, nullable=False, default="now"), + Column("ts", DateTime(timezone=True), nullable=False, default="now"), Column("metric_id", String(self._ID_LEN), nullable=False), Column("metric_value", String(self._METRIC_VALUE_LEN)), diff --git a/mlos_bench/mlos_bench/storage/sql/trial.py b/mlos_bench/mlos_bench/storage/sql/trial.py index 74c80e158c..1fd38684e8 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial.py +++ b/mlos_bench/mlos_bench/storage/sql/trial.py @@ -17,6 +17,7 @@ from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.storage.base_storage import Storage from mlos_bench.storage.sql.schema import DbSchema +from mlos_bench.storage.util import utcify_timestamp from mlos_bench.util import nullable _LOG = logging.getLogger(__name__) @@ -47,6 +48,8 @@ def __init__(self, *, def update(self, status: Status, timestamp: datetime, metrics: Optional[Union[Dict[str, Any], float]] = None ) -> Optional[Dict[str, Any]]: + # Make sure to convert the timestamp to UTC before storing it in the database. + timestamp = utcify_timestamp(timestamp, origin="local") metrics = super().update(status, timestamp, metrics) with self._engine.begin() as conn: self._update_status(conn, status, timestamp) @@ -106,6 +109,9 @@ def update(self, status: Status, timestamp: datetime, def update_telemetry(self, status: Status, timestamp: datetime, metrics: List[Tuple[datetime, str, Any]]) -> None: super().update_telemetry(status, timestamp, metrics) + # Make sure to convert the timestamp to UTC before storing it in the database. + timestamp = utcify_timestamp(timestamp, origin="local") + metrics = [(utcify_timestamp(ts, origin="local"), key, val) for (ts, key, val) in metrics] # NOTE: Not every SQLAlchemy dialect supports `Insert.on_conflict_do_nothing()` # and we need to keep `.update_telemetry()` idempotent; hence a loop instead of # a bulk upsert. @@ -130,6 +136,8 @@ def _update_status(self, conn: Connection, status: Status, timestamp: datetime) Insert a new status record into the database. This call is idempotent. """ + # Make sure to convert the timestamp to UTC before storing it in the database. + timestamp = utcify_timestamp(timestamp, origin="local") try: conn.execute(self._schema.trial_status.insert().values( exp_id=self._experiment_id, diff --git a/mlos_bench/mlos_bench/storage/sql/trial_data.py b/mlos_bench/mlos_bench/storage/sql/trial_data.py index e59664272e..3a86ba2bf7 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial_data.py +++ b/mlos_bench/mlos_bench/storage/sql/trial_data.py @@ -5,7 +5,7 @@ """ An interface to access the benchmark trial data stored in SQL DB. """ -from datetime import datetime +from datetime import datetime, UTC from typing import Optional, TYPE_CHECKING import pandas @@ -16,6 +16,7 @@ from mlos_bench.environments.status import Status from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.tunable_config_data import TunableConfigSqlData +from mlos_bench.storage.util import utcify_timestamp if TYPE_CHECKING: from mlos_bench.storage.base_tunable_config_trial_group_data import TunableConfigTrialGroupData @@ -100,8 +101,10 @@ def telemetry_df(self) -> pandas.DataFrame: self._schema.trial_telemetry.c.metric_id, ) ) + # Not all storage backends store the original zone info. + # We try to ensure data is entered in UTC and augment it on return again here. return pandas.DataFrame( - [(row.ts, row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()], + [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()], columns=['ts', 'metric', 'value']) @property diff --git a/mlos_bench/mlos_bench/storage/util.py b/mlos_bench/mlos_bench/storage/util.py index a4610da8de..5e99fc9ba7 100644 --- a/mlos_bench/mlos_bench/storage/util.py +++ b/mlos_bench/mlos_bench/storage/util.py @@ -6,7 +6,8 @@ Utility functions for the storage subsystem. """ -from typing import Dict, Optional +from datetime import datetime, UTC +from typing import Dict, Literal, Optional import pandas @@ -14,6 +15,48 @@ from mlos_bench.util import try_parse_val +def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> datetime: + """ + Augment a timestamp with zoneinfo if missing and convert it to UTC. + + Parameters + ---------- + timestamp : datetime + A timestamp to convert to UTC. + Note: The original datetime may or may not have tzinfo associated with it. + + origin : Literal["utc", "local"] + Whether the source timestamp is considered to be in UTC or local time. + In the case of loading data from storage, where we intentionally convert all + timestamps to UTC, this can help us retrieve the original timezone when the + storage backend doesn't explicitly store it. + Returns + ------- + datetime + A datetime with zoneinfo in UTC. + """ + if timestamp.tzinfo is not None or origin == "local": + # A timestamp with no zoneinfo is interpretted as "local" time + # (e.g., according to the TZ environment variable). + # That could be UTC or some other timezone, but either way we convert it to + # be explicitly UTC with zone info. + return timestamp.astimezone(UTC) + elif origin == "utc": + # If the timestamp is already in UTC, we just add the zoneinfo without conversion. + # Converting with astimezone() when the local time is *not* UTC would cause + # a timestamp conversion which we don't want. + return timestamp.replace(tzinfo=UTC) + else: + raise ValueError(f"Invalid origin: {origin}") + + +def utcify_nullable_timestamp(timestamp: Optional[datetime], *, origin: Literal["utc", "local"]) -> Optional[datetime]: + """ + A nullable version of utcify_timestamp. + """ + return utcify_timestamp(timestamp, origin=origin) if timestamp is not None else None + + def kv_df_to_dict(dataframe: pandas.DataFrame) -> Dict[str, Optional[TunableValue]]: """ Utility function to convert certain flat key-value dataframe formats used by the diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index 03cc2bf780..09366d83a4 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -34,6 +34,7 @@ def storage() -> SqlStorage: config={ "drivername": "sqlite", "database": ":memory:", + # "database": "mlos_bench.pytest.db", } ) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index 090b946243..6bc124f6a7 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -63,7 +63,9 @@ def test_update_telemetry(storage: Storage, # Also check that the TrialData telemetry looks right. trial_data = storage.experiments[exp_storage.experiment_id].trials[trial.trial_id] - assert _telemetry_str([tuple(r) for r in trial_data.telemetry_df.to_numpy()]) == _telemetry_str(telemetry_data) + trial_telemetry_df = trial_data.telemetry_df + trial_telemetry_data = [tuple(r) for r in trial_telemetry_df.to_numpy()] + assert _telemetry_str(trial_telemetry_data) == _telemetry_str(telemetry_data) def test_update_telemetry_twice(exp_storage: Storage.Experiment, From 94b44af63c801232c24716db6fa8c6258c5a654b Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:11:13 +0000 Subject: [PATCH 03/21] add more tests --- .../tests/storage/trial_telemetry_test.py | 57 ++++++++++++++++--- .../storage/trial_telemetry_test_alt_tz.py | 36 ++++++++++++ 2 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index 6bc124f6a7..c8efb8b6eb 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -5,10 +5,12 @@ """ Unit tests for saving and restoring the telemetry data. """ -from datetime import datetime, timedelta, UTC +from datetime import datetime, timedelta, tzinfo, UTC from typing import Any, List, Optional, Tuple +from zoneinfo import ZoneInfo import pytest +from pytest_lazy_fixtures.lazy_fixture import lf as lazy_fixture from mlos_bench.environments.status import Status from mlos_bench.tunables.tunable_groups import TunableGroups @@ -18,8 +20,7 @@ # pylint: disable=redefined-outer-name -@pytest.fixture -def telemetry_data() -> List[Tuple[datetime, str, Any]]: +def zoned_telemetry_data(zone_info: Optional[tzinfo]) -> List[Tuple[datetime, str, Any]]: """ Mock telemetry data for the trial. @@ -28,7 +29,7 @@ def telemetry_data() -> List[Tuple[datetime, str, Any]]: List[Tuple[datetime, str, str]] A list of (timestamp, metric_id, metric_value) """ - timestamp1 = datetime.now(UTC) + timestamp1 = datetime.now(zone_info) timestamp2 = timestamp1 + timedelta(seconds=1) return sorted([ (timestamp1, "cpu_load", 10.1), @@ -40,25 +41,56 @@ def telemetry_data() -> List[Tuple[datetime, str, Any]]: ]) +@pytest.fixture +def telemetry_data_implicit_local() -> List[Tuple[datetime, str, Any]]: + """Telemetry data with implicit (i.e., missing) local timezone info.""" + return zoned_telemetry_data(zone_info=None) + + +@pytest.fixture +def telemetry_data_utc() -> List[Tuple[datetime, str, Any]]: + """Telemetry data with explicit UTC timezone info.""" + return zoned_telemetry_data(zone_info=UTC) + + +@pytest.fixture +def telemetry_data_explicit() -> List[Tuple[datetime, str, Any]]: + """Telemetry data with explicit UTC timezone info.""" + zone_info = ZoneInfo("America/Chicago") + assert zone_info.utcoffset(datetime.now(UTC)) != timedelta(hours=0) + return zoned_telemetry_data(zone_info) + + +ZONE_INFO: List[Optional[tzinfo]] = [UTC, ZoneInfo("America/Chicago"), None] + + def _telemetry_str(data: List[Tuple[datetime, str, Any]] ) -> List[Tuple[datetime, str, Optional[str]]]: """ Convert telemetry values to strings. """ - return [(ts, key, nullable(str, val)) for (ts, key, val) in data] + # All retrieved timestamps should have been converted to UTC. + return [(ts.astimezone(UTC), key, nullable(str, val)) for (ts, key, val) in data] +@pytest.mark.parametrize(("telemetry_data"), [ + (lazy_fixture("telemetry_data_implicit_local")), + (lazy_fixture("telemetry_data_utc")), + (lazy_fixture("telemetry_data_explicit")), +]) +@pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) def test_update_telemetry(storage: Storage, exp_storage: Storage.Experiment, tunable_groups: TunableGroups, - telemetry_data: List[Tuple[datetime, str, Any]]) -> None: + telemetry_data: List[Tuple[datetime, str, Any]], + origin_zone_info: Optional[tzinfo]) -> None: """ Make sure update_telemetry() and load_telemetry() methods work. """ trial = exp_storage.new_trial(tunable_groups) assert exp_storage.load_telemetry(trial.trial_id) == [] - trial.update_telemetry(Status.RUNNING, datetime.now(UTC), telemetry_data) + trial.update_telemetry(Status.RUNNING, datetime.now(origin_zone_info), telemetry_data) assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data) # Also check that the TrialData telemetry looks right. @@ -68,14 +100,21 @@ def test_update_telemetry(storage: Storage, assert _telemetry_str(trial_telemetry_data) == _telemetry_str(telemetry_data) +@pytest.mark.parametrize(("telemetry_data"), [ + (lazy_fixture("telemetry_data_implicit_local")), + (lazy_fixture("telemetry_data_utc")), + (lazy_fixture("telemetry_data_explicit")), +]) +@pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) def test_update_telemetry_twice(exp_storage: Storage.Experiment, tunable_groups: TunableGroups, - telemetry_data: List[Tuple[datetime, str, Any]]) -> None: + telemetry_data: List[Tuple[datetime, str, Any]], + origin_zone_info: Optional[tzinfo]) -> None: """ Make sure update_telemetry() call is idempotent. """ trial = exp_storage.new_trial(tunable_groups) - timestamp = datetime.now(UTC) + timestamp = datetime.now(origin_zone_info) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py new file mode 100644 index 0000000000..3a83fbd9e0 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py @@ -0,0 +1,36 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +""" +Unit tests for saving and restoring the telemetry data when host timezone is in a different timezone. +""" + +from subprocess import run +import os +import sys +from typing import Optional + +import pytest + +# pylint: disable=redefined-outer-name + + +@pytest.mark.skipif(sys.platform == 'win32', reason="sh-like shell only") +@pytest.mark.parametrize(("tz_name"), [None, "America/Chicago", "America/Los_Angeles", "UTC"]) +def test_trial_telemetry_alt_tz(tz_name: Optional[str]) -> None: + """ + Run the trial telemetry tests under alternative default (un-named) TZ info. + """ + env = os.environ.copy() + if tz_name is None: + env.pop("TZ", None) + else: + env["TZ"] = tz_name + cmd = run( + [sys.executable, "-m", "pytest", "-n0", f"{os.path.dirname(__file__)}/trial_telemetry_test.py"], + # , "-k", "implicit_local"], + env=env, + check=True, + ) + assert cmd.returncode == 0 From 274a0208f892d5df507ef641be5d7b781b17f105 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:20:53 +0000 Subject: [PATCH 04/21] reorg --- .../tests/storage/trial_telemetry_test.py | 44 +++++-------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index c8efb8b6eb..3c7cbd18ac 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -10,7 +10,6 @@ from zoneinfo import ZoneInfo import pytest -from pytest_lazy_fixtures.lazy_fixture import lf as lazy_fixture from mlos_bench.environments.status import Status from mlos_bench.tunables.tunable_groups import TunableGroups @@ -41,27 +40,14 @@ def zoned_telemetry_data(zone_info: Optional[tzinfo]) -> List[Tuple[datetime, st ]) -@pytest.fixture -def telemetry_data_implicit_local() -> List[Tuple[datetime, str, Any]]: - """Telemetry data with implicit (i.e., missing) local timezone info.""" - return zoned_telemetry_data(zone_info=None) - - -@pytest.fixture -def telemetry_data_utc() -> List[Tuple[datetime, str, Any]]: - """Telemetry data with explicit UTC timezone info.""" - return zoned_telemetry_data(zone_info=UTC) - - -@pytest.fixture -def telemetry_data_explicit() -> List[Tuple[datetime, str, Any]]: - """Telemetry data with explicit UTC timezone info.""" - zone_info = ZoneInfo("America/Chicago") - assert zone_info.utcoffset(datetime.now(UTC)) != timedelta(hours=0) - return zoned_telemetry_data(zone_info) - - -ZONE_INFO: List[Optional[tzinfo]] = [UTC, ZoneInfo("America/Chicago"), None] +ZONE_INFO: List[Optional[tzinfo]] = [ + # Explicit time zones. + UTC, + ZoneInfo("America/Chicago"), + ZoneInfo("America/Los_Angeles"), + # Implicit local time zone. + None, +] def _telemetry_str(data: List[Tuple[datetime, str, Any]] @@ -73,20 +59,15 @@ def _telemetry_str(data: List[Tuple[datetime, str, Any]] return [(ts.astimezone(UTC), key, nullable(str, val)) for (ts, key, val) in data] -@pytest.mark.parametrize(("telemetry_data"), [ - (lazy_fixture("telemetry_data_implicit_local")), - (lazy_fixture("telemetry_data_utc")), - (lazy_fixture("telemetry_data_explicit")), -]) @pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) def test_update_telemetry(storage: Storage, exp_storage: Storage.Experiment, tunable_groups: TunableGroups, - telemetry_data: List[Tuple[datetime, str, Any]], origin_zone_info: Optional[tzinfo]) -> None: """ Make sure update_telemetry() and load_telemetry() methods work. """ + telemetry_data = zoned_telemetry_data(origin_zone_info) trial = exp_storage.new_trial(tunable_groups) assert exp_storage.load_telemetry(trial.trial_id) == [] @@ -100,19 +81,14 @@ def test_update_telemetry(storage: Storage, assert _telemetry_str(trial_telemetry_data) == _telemetry_str(telemetry_data) -@pytest.mark.parametrize(("telemetry_data"), [ - (lazy_fixture("telemetry_data_implicit_local")), - (lazy_fixture("telemetry_data_utc")), - (lazy_fixture("telemetry_data_explicit")), -]) @pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) def test_update_telemetry_twice(exp_storage: Storage.Experiment, tunable_groups: TunableGroups, - telemetry_data: List[Tuple[datetime, str, Any]], origin_zone_info: Optional[tzinfo]) -> None: """ Make sure update_telemetry() call is idempotent. """ + telemetry_data = zoned_telemetry_data(origin_zone_info) trial = exp_storage.new_trial(tunable_groups) timestamp = datetime.now(origin_zone_info) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) From 5c34993f940ce57dcd50481f798494a830ebe30e Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:33:45 +0000 Subject: [PATCH 05/21] tweak --- mlos_bench/mlos_bench/storage/sql/experiment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 4122e65383..1aa3f21e9c 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -186,8 +186,8 @@ def _save_params(conn: Connection, table: Table, ]) def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: + timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) - timestamp = timestamp.astimezone(UTC) if timestamp.tzinfo else timestamp.replace(tzinfo=UTC) if running: pending_status = ['PENDING', 'READY', 'RUNNING'] else: From 87f6d3c8e55e184ee0556454927f111897e7d6c7 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:35:18 +0000 Subject: [PATCH 06/21] lint --- mlos_bench/mlos_bench/storage/sql/trial_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/storage/sql/trial_data.py b/mlos_bench/mlos_bench/storage/sql/trial_data.py index 3a86ba2bf7..71df2b3a45 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial_data.py +++ b/mlos_bench/mlos_bench/storage/sql/trial_data.py @@ -5,7 +5,7 @@ """ An interface to access the benchmark trial data stored in SQL DB. """ -from datetime import datetime, UTC +from datetime import datetime from typing import Optional, TYPE_CHECKING import pandas From fbbe96898e8af2146a49a83c1c7dcb88deca269b Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:37:08 +0000 Subject: [PATCH 07/21] cleanup --- .../mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py index 3a83fbd9e0..9ed3dcc9bd 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py @@ -29,7 +29,6 @@ def test_trial_telemetry_alt_tz(tz_name: Optional[str]) -> None: env["TZ"] = tz_name cmd = run( [sys.executable, "-m", "pytest", "-n0", f"{os.path.dirname(__file__)}/trial_telemetry_test.py"], - # , "-k", "implicit_local"], env=env, check=True, ) From 00bbb8d6f8072ca4ba4dae396bde17ae531f24ea Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:37:26 +0000 Subject: [PATCH 08/21] cleanup --- .../mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py index 9ed3dcc9bd..ee89d494d7 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py @@ -13,8 +13,6 @@ import pytest -# pylint: disable=redefined-outer-name - @pytest.mark.skipif(sys.platform == 'win32', reason="sh-like shell only") @pytest.mark.parametrize(("tz_name"), [None, "America/Chicago", "America/Los_Angeles", "UTC"]) From d230dade843880442cba69a13ee2ffefb444c463 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:38:03 +0000 Subject: [PATCH 09/21] pydocstyle --- mlos_bench/mlos_bench/storage/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mlos_bench/mlos_bench/storage/util.py b/mlos_bench/mlos_bench/storage/util.py index 5e99fc9ba7..9a7848c5da 100644 --- a/mlos_bench/mlos_bench/storage/util.py +++ b/mlos_bench/mlos_bench/storage/util.py @@ -30,6 +30,7 @@ def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> In the case of loading data from storage, where we intentionally convert all timestamps to UTC, this can help us retrieve the original timezone when the storage backend doesn't explicitly store it. + Returns ------- datetime From b5479e74be00db350a972c761df95353321888fa Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 18:39:07 +0000 Subject: [PATCH 10/21] docs --- mlos_bench/mlos_bench/storage/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mlos_bench/mlos_bench/storage/util.py b/mlos_bench/mlos_bench/storage/util.py index 9a7848c5da..3deb52c23e 100644 --- a/mlos_bench/mlos_bench/storage/util.py +++ b/mlos_bench/mlos_bench/storage/util.py @@ -30,6 +30,8 @@ def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> In the case of loading data from storage, where we intentionally convert all timestamps to UTC, this can help us retrieve the original timezone when the storage backend doesn't explicitly store it. + In the case of receiving data from a client or other source, this can help us + convert the timestamp to UTC if it's not already. Returns ------- From 6dd7e5107f62c09e70c20a5d153cb69cb764d65e Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 22:18:24 +0000 Subject: [PATCH 11/21] convert for better python vers support plus tests --- .../environments/base_environment.py | 4 +- .../environments/remote/remote_env.py | 4 +- .../mlos_bench/schedulers/base_scheduler.py | 4 +- .../mlos_bench/schedulers/sync_scheduler.py | 4 +- .../services/remote/azure/azure_auth.py | 4 +- .../mlos_bench/storage/base_trial_data.py | 3 +- .../mlos_bench/storage/sql/experiment.py | 4 +- mlos_bench/mlos_bench/storage/util.py | 3 +- mlos_bench/mlos_bench/tests/__init__.py | 15 ++++- .../local/composite_local_env_test.py | 6 +- .../local/local_env_telemetry_test.py | 67 ++++++++++++------- .../mlos_bench/tests/storage/exp_load_test.py | 4 +- .../mlos_bench/tests/storage/sql/fixtures.py | 4 +- .../tests/storage/trial_config_test.py | 4 +- .../tests/storage/trial_schedule_test.py | 4 +- .../tests/storage/trial_telemetry_test.py | 17 ++--- 16 files changed, 97 insertions(+), 54 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/base_environment.py b/mlos_bench/mlos_bench/environments/base_environment.py index ff191e97ba..508d78589b 100644 --- a/mlos_bench/mlos_bench/environments/base_environment.py +++ b/mlos_bench/mlos_bench/environments/base_environment.py @@ -9,11 +9,13 @@ import abc import json import logging -from datetime import datetime, UTC +from datetime import datetime from types import TracebackType from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union from typing_extensions import Literal +from pytz import UTC + from mlos_bench.config.schemas import ConfigSchema from mlos_bench.dict_templater import DictTemplater from mlos_bench.environments.status import Status diff --git a/mlos_bench/mlos_bench/environments/remote/remote_env.py b/mlos_bench/mlos_bench/environments/remote/remote_env.py index 89df3cade1..0320b02769 100644 --- a/mlos_bench/mlos_bench/environments/remote/remote_env.py +++ b/mlos_bench/mlos_bench/environments/remote/remote_env.py @@ -9,9 +9,11 @@ """ import logging -from datetime import datetime, UTC +from datetime import datetime from typing import Dict, Iterable, Optional, Tuple +from pytz import UTC + from mlos_bench.environments.status import Status from mlos_bench.environments.script_env import ScriptEnv from mlos_bench.services.base_service import Service diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index 9f1a64d1e1..f623161327 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -8,13 +8,15 @@ import json import logging -from datetime import datetime, UTC +from datetime import datetime from abc import ABCMeta, abstractmethod from types import TracebackType from typing import Any, Dict, Optional, Tuple, Type from typing_extensions import Literal +from pytz import UTC + from mlos_bench.environments.base_environment import Environment from mlos_bench.optimizers.base_optimizer import Optimizer from mlos_bench.storage.base_storage import Storage diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index 67151f0d43..7aa263ce37 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -7,7 +7,9 @@ """ import logging -from datetime import datetime, UTC +from datetime import datetime + +from pytz import UTC from mlos_bench.environments.status import Status from mlos_bench.schedulers.base_scheduler import Scheduler diff --git a/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py b/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py index 32d4ffe23c..b1e484c009 100644 --- a/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py +++ b/mlos_bench/mlos_bench/services/remote/azure/azure_auth.py @@ -8,9 +8,11 @@ import logging from base64 import b64decode -from datetime import datetime, UTC +from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Union +from pytz import UTC + import azure.identity as azure_id from azure.keyvault.secrets import SecretClient diff --git a/mlos_bench/mlos_bench/storage/base_trial_data.py b/mlos_bench/mlos_bench/storage/base_trial_data.py index c886589fae..93d1b62c9b 100644 --- a/mlos_bench/mlos_bench/storage/base_trial_data.py +++ b/mlos_bench/mlos_bench/storage/base_trial_data.py @@ -6,10 +6,11 @@ Base interface for accessing the stored benchmark trial data. """ from abc import ABCMeta, abstractmethod -from datetime import datetime, UTC +from datetime import datetime from typing import Any, Dict, Optional, TYPE_CHECKING import pandas +from pytz import UTC from mlos_bench.environments.status import Status from mlos_bench.tunables.tunable import TunableValue diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 1aa3f21e9c..575358744a 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -8,9 +8,11 @@ import logging import hashlib -from datetime import datetime, UTC +from datetime import datetime from typing import Optional, Tuple, List, Dict, Iterator, Any +from pytz import UTC + from sqlalchemy import Engine, Connection, Table, column, func from mlos_bench.environments.status import Status diff --git a/mlos_bench/mlos_bench/storage/util.py b/mlos_bench/mlos_bench/storage/util.py index 3deb52c23e..a19d118b1c 100644 --- a/mlos_bench/mlos_bench/storage/util.py +++ b/mlos_bench/mlos_bench/storage/util.py @@ -6,9 +6,10 @@ Utility functions for the storage subsystem. """ -from datetime import datetime, UTC +from datetime import datetime from typing import Dict, Literal, Optional +from pytz import UTC import pandas from mlos_bench.tunables.tunable import TunableValue, TunableValueTypeTuple diff --git a/mlos_bench/mlos_bench/tests/__init__.py b/mlos_bench/mlos_bench/tests/__init__.py index a6c3994ff5..8e22c66ac8 100644 --- a/mlos_bench/mlos_bench/tests/__init__.py +++ b/mlos_bench/mlos_bench/tests/__init__.py @@ -6,21 +6,32 @@ Tests for mlos_bench. Used to make mypy happy about multiple conftest.py modules. """ - +from datetime import tzinfo from logging import debug, warning from subprocess import run -from typing import Optional +from typing import List, Optional import filecmp import os import socket import shutil +import pytz import pytest from mlos_bench.util import get_class_from_name +ZONE_INFO: List[Optional[tzinfo]] = [ + # Explicit time zones. + pytz.UTC, + pytz.timezone("America/Chicago"), + pytz.timezone("America/Los_Angeles"), + # Implicit local time zone. + None, +] + + # A decorator for tests that require docker. # Use with @requires_docker above a test_...() function. DOCKER = shutil.which('docker') diff --git a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py index 71126673bf..a85705473b 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py @@ -6,8 +6,8 @@ Unit tests for the composition of several LocalEnv benchmark environments. """ import sys -from datetime import datetime, timedelta, UTC -import pytz +from datetime import datetime, timedelta +from pytz import UTC from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.tests.environments import check_env_success @@ -23,7 +23,7 @@ def test_composite_env(tunable_groups: TunableGroups) -> None: can be used in the shell_envs by its children. See Also: http://github.com/microsoft/MLOS/issues/501 """ - ts1 = datetime.now(UTC).astimezone(pytz.UTC) + ts1 = datetime.now(UTC) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=2) diff --git a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py index 284af92227..d8285432f6 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py @@ -5,26 +5,37 @@ """ Unit tests for telemetry and status of LocalEnv benchmark environment. """ -from datetime import datetime, timedelta, UTC -import pytz +from datetime import datetime, timedelta, tzinfo +from typing import Optional + +from pytz import UTC import pytest from mlos_bench.tunables.tunable_groups import TunableGroups +from mlos_bench.tests import ZONE_INFO from mlos_bench.tests.environments import check_env_success, check_env_fail_telemetry from mlos_bench.tests.environments.local import create_local_env -def test_local_env_telemetry(tunable_groups: TunableGroups) -> None: +def _format_str(zone_info: Optional[tzinfo]) -> str: + if zone_info is not None: + return "%Y-%m-%d %H:%M:%S %z" + return "%Y-%m-%d %H:%M:%S" + + +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) +def test_local_env_telemetry(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ Produce benchmark and telemetry data in a local script and read it. """ - ts1 = datetime.now(UTC).astimezone(pytz.UTC) + ts1 = datetime.now(zone_info) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") + format_str = _format_str(zone_info) + time_str1 = ts1.strftime(format_str) + time_str2 = ts2.strftime(format_str) local_env = create_local_env(tunable_groups, { "run": [ @@ -51,24 +62,26 @@ def test_local_env_telemetry(tunable_groups: TunableGroups) -> None: "score": 0.95, }, expected_telemetry=[ - (ts1, "cpu_load", 0.65), - (ts1, "mem_usage", 10240.0), - (ts2, "cpu_load", 0.8), - (ts2, "mem_usage", 20480.0), + (ts1.astimezone(UTC), "cpu_load", 0.65), + (ts1.astimezone(UTC), "mem_usage", 10240.0), + (ts2.astimezone(UTC), "cpu_load", 0.8), + (ts2.astimezone(UTC), "mem_usage", 20480.0), ], ) -def test_local_env_telemetry_no_header(tunable_groups: TunableGroups) -> None: +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) +def test_local_env_telemetry_no_header(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ Read the telemetry data with no header. """ - ts1 = datetime.now(UTC).astimezone(pytz.UTC) + ts1 = datetime.now(zone_info) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") + format_str = _format_str(zone_info) + time_str1 = ts1.strftime(format_str) + time_str2 = ts2.strftime(format_str) local_env = create_local_env(tunable_groups, { "run": [ @@ -84,25 +97,27 @@ def test_local_env_telemetry_no_header(tunable_groups: TunableGroups) -> None: local_env, tunable_groups, expected_results={}, expected_telemetry=[ - (ts1, "cpu_load", 0.65), - (ts1, "mem_usage", 10240.0), - (ts2, "cpu_load", 0.8), - (ts2, "mem_usage", 20480.0), + (ts1.astimezone(UTC), "cpu_load", 0.65), + (ts1.astimezone(UTC), "mem_usage", 10240.0), + (ts2.astimezone(UTC), "cpu_load", 0.8), + (ts2.astimezone(UTC), "mem_usage", 20480.0), ], ) @pytest.mark.filterwarnings("ignore:.*(Could not infer format, so each element will be parsed individually, falling back to `dateutil`).*:UserWarning::0") # pylint: disable=line-too-long # noqa -def test_local_env_telemetry_wrong_header(tunable_groups: TunableGroups) -> None: +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) +def test_local_env_telemetry_wrong_header(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ Read the telemetry data with incorrect header. """ - ts1 = datetime.now(UTC).astimezone(pytz.UTC) + ts1 = datetime.now(zone_info) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") + format_str = _format_str(zone_info) + time_str1 = ts1.strftime(format_str) + time_str2 = ts2.strftime(format_str) local_env = create_local_env(tunable_groups, { "run": [ @@ -123,12 +138,14 @@ def test_local_env_telemetry_invalid(tunable_groups: TunableGroups) -> None: """ Fail when the telemetry data has wrong format. """ - ts1 = datetime.now(UTC).astimezone(pytz.UTC) + zone_info = UTC + ts1 = datetime.now(zone_info) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=1) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") + format_str = _format_str(zone_info) + time_str1 = ts1.strftime(format_str) + time_str2 = ts2.strftime(format_str) local_env = create_local_env(tunable_groups, { "run": [ diff --git a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py index 207691b7d8..3a945db16f 100644 --- a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py +++ b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py @@ -5,7 +5,9 @@ """ Unit tests for the storage subsystem. """ -from datetime import datetime, UTC +from datetime import datetime + +from pytz import UTC import pytest diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index 09366d83a4..710c50fa52 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -6,10 +6,12 @@ Test fixtures for mlos_bench storage. """ -from datetime import datetime, UTC +from datetime import datetime from random import random, seed as rand_seed from typing import Generator, Optional +from pytz import UTC + import pytest from mlos_bench.environments.status import Status diff --git a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py index e89f5b1919..ba965ed3c6 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py @@ -5,7 +5,9 @@ """ Unit tests for saving and retrieving additional parameters of pending trials. """ -from datetime import datetime, UTC +from datetime import datetime + +from pytz import UTC from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index f7e7bc8115..21f857ae45 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -5,10 +5,12 @@ """ Unit tests for scheduling trials for some future time. """ -from datetime import datetime, timedelta, UTC +from datetime import datetime, timedelta from typing import Iterator, Set +from pytz import UTC + from mlos_bench.environments.status import Status from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index 3c7cbd18ac..deea02128f 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -5,9 +5,10 @@ """ Unit tests for saving and restoring the telemetry data. """ -from datetime import datetime, timedelta, tzinfo, UTC +from datetime import datetime, timedelta, tzinfo from typing import Any, List, Optional, Tuple -from zoneinfo import ZoneInfo + +from pytz import UTC import pytest @@ -16,6 +17,8 @@ from mlos_bench.storage.base_storage import Storage from mlos_bench.util import nullable +from mlos_bench.tests import ZONE_INFO + # pylint: disable=redefined-outer-name @@ -40,16 +43,6 @@ def zoned_telemetry_data(zone_info: Optional[tzinfo]) -> List[Tuple[datetime, st ]) -ZONE_INFO: List[Optional[tzinfo]] = [ - # Explicit time zones. - UTC, - ZoneInfo("America/Chicago"), - ZoneInfo("America/Los_Angeles"), - # Implicit local time zone. - None, -] - - def _telemetry_str(data: List[Tuple[datetime, str, Any]] ) -> List[Tuple[datetime, str, Optional[str]]]: """ From a9b7c231a2ffd42f16ca172d42260e65a556de1e Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 22:24:53 +0000 Subject: [PATCH 12/21] more tests --- .../mlos_bench/tests/storage/exp_load_test.py | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py index 3a945db16f..bdab29a22e 100644 --- a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py +++ b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py @@ -5,7 +5,8 @@ """ Unit tests for the storage subsystem. """ -from datetime import datetime +from datetime import datetime, tzinfo +from typing import Optional from pytz import UTC @@ -14,6 +15,7 @@ from mlos_bench.environments.status import Status from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.storage.base_storage import Storage +from mlos_bench.tests import ZONE_INFO def test_exp_load_empty(exp_storage: Storage.Experiment) -> None: @@ -35,19 +37,23 @@ def test_exp_pending_empty(exp_storage: Storage.Experiment) -> None: assert not trials +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_pending(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Start a trial and check that it is pending. """ trial = exp_storage.new_trial(tunable_groups) - (pending,) = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) + (pending,) = list(exp_storage.pending_trials(datetime.now(zone_info), running=True)) assert pending.trial_id == trial.trial_id assert pending.tunables == tunable_groups +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_pending_many(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Start THREE trials and check that both are pending. """ @@ -60,41 +66,47 @@ def test_exp_trial_pending_many(exp_storage: Storage.Experiment, } pending_ids = { pending.trial_id - for pending in exp_storage.pending_trials(datetime.now(UTC), running=True) + for pending in exp_storage.pending_trials(datetime.now(zone_info), running=True) } assert len(pending_ids) == 3 assert trial_ids == pending_ids +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_pending_fail(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Start a trial, fail it, and and check that it is NOT pending. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.FAILED, datetime.now(UTC)) - trials = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) + trial.update(Status.FAILED, datetime.now(zone_info)) + trials = list(exp_storage.pending_trials(datetime.now(zone_info), running=True)) assert not trials +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_success(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Start a trial, finish it successfully, and and check that it is NOT pending. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.SUCCEEDED, datetime.now(UTC), 99.9) - trials = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) + trial.update(Status.SUCCEEDED, datetime.now(zone_info), 99.9) + trials = list(exp_storage.pending_trials(datetime.now(zone_info), running=True)) assert not trials +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_update_categ(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Update the trial with multiple metrics, some of which are categorical. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.SUCCEEDED, datetime.now(UTC), {"score": 99.9, "benchmark": "test"}) + trial.update(Status.SUCCEEDED, datetime.now(zone_info), {"score": 99.9, "benchmark": "test"}) assert exp_storage.load() == ( [trial.trial_id], [{ @@ -108,19 +120,23 @@ def test_exp_trial_update_categ(exp_storage: Storage.Experiment, ) +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_update_twice(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Update the trial status twice and receive an error. """ trial = exp_storage.new_trial(tunable_groups) - trial.update(Status.FAILED, datetime.now(UTC)) + trial.update(Status.FAILED, datetime.now(zone_info)) with pytest.raises(RuntimeError): trial.update(Status.SUCCEEDED, datetime.now(UTC), 99.9) +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_exp_trial_pending_3(exp_storage: Storage.Experiment, - tunable_groups: TunableGroups) -> None: + tunable_groups: TunableGroups, + zone_info: Optional[tzinfo]) -> None: """ Start THREE trials, let one succeed, another one fail and keep one not updated. Check that one is still pending another one can be loaded into the optimizer. @@ -131,8 +147,8 @@ def test_exp_trial_pending_3(exp_storage: Storage.Experiment, trial_succ = exp_storage.new_trial(tunable_groups) trial_pend = exp_storage.new_trial(tunable_groups) - trial_fail.update(Status.FAILED, datetime.now(UTC)) - trial_succ.update(Status.SUCCEEDED, datetime.now(UTC), score) + trial_fail.update(Status.FAILED, datetime.now(zone_info)) + trial_succ.update(Status.SUCCEEDED, datetime.now(zone_info), score) (pending,) = list(exp_storage.pending_trials(datetime.now(UTC), running=True)) assert pending.trial_id == trial_pend.trial_id From e653535b5479e7886e601e53b5f7b004cba6d94c Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 22:38:59 +0000 Subject: [PATCH 13/21] expanded tests wip --- .../local/composite_local_env_test.py | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py index a85705473b..4815e1c50d 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/composite_local_env_test.py @@ -6,15 +6,27 @@ Unit tests for the composition of several LocalEnv benchmark environments. """ import sys -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo +from typing import Optional + from pytz import UTC +import pytest from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.tests.environments import check_env_success from mlos_bench.tests.environments.local import create_composite_local_env +from mlos_bench.tests import ZONE_INFO + + +def _format_str(zone_info: Optional[tzinfo]) -> str: + if zone_info is not None: + return "%Y-%m-%d %H:%M:%S %z" + return "%Y-%m-%d %H:%M:%S" -def test_composite_env(tunable_groups: TunableGroups) -> None: +# FIXME: This fails with zone_info = None when run with `TZ="America/Chicago pytest -n0 ...` +@pytest.mark.parametrize(("zone_info"), ZONE_INFO) +def test_composite_env(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ Produce benchmark and telemetry data in TWO local environments and combine the results. @@ -23,12 +35,13 @@ def test_composite_env(tunable_groups: TunableGroups) -> None: can be used in the shell_envs by its children. See Also: http://github.com/microsoft/MLOS/issues/501 """ - ts1 = datetime.now(UTC) + ts1 = datetime.now(zone_info) ts1 -= timedelta(microseconds=ts1.microsecond) # Round to a second ts2 = ts1 + timedelta(minutes=2) - time_str1 = ts1.strftime("%Y-%m-%d %H:%M:%S %z") - time_str2 = ts2.strftime("%Y-%m-%d %H:%M:%S %z") + format_str = _format_str(zone_info) + time_str1 = ts1.strftime(format_str) + time_str2 = ts2.strftime(format_str) (var_prefix, var_suffix) = ("%", "%") if sys.platform == 'win32' else ("$", "") @@ -108,9 +121,9 @@ def test_composite_env(tunable_groups: TunableGroups) -> None: "writes": 1111.0, }, expected_telemetry=[ - (ts1, "cpu_load", 0.64), - (ts1, "mem_usage", 5120.0), - (ts2, "cpu_load", 0.79), - (ts2, "mem_usage", 40960.0), + (ts1.astimezone(UTC), "cpu_load", 0.64), + (ts1.astimezone(UTC), "mem_usage", 5120.0), + (ts2.astimezone(UTC), "cpu_load", 0.79), + (ts2.astimezone(UTC), "mem_usage", 40960.0), ], ) From 7f370f4d31f4134b91654fc660513b96d69e227f Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 22:39:07 +0000 Subject: [PATCH 14/21] FIXME comments --- mlos_bench/mlos_bench/environments/local/local_env.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 7ccf187a8c..442127c9eb 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -240,6 +240,7 @@ def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series: ValueError On parse errors. """ + # FIXME: We should probably not be assuming results arrived in UTC timezone. new_datetime_col = pandas.to_datetime(datetime_col, utc=True) if new_datetime_col.isna().any(): raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") From ef2d4173a637f51c062fd1fa218c86cf46a23d55 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 18 Mar 2024 22:40:05 +0000 Subject: [PATCH 15/21] fixme comments --- .../tests/environments/local/local_env_telemetry_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py index d8285432f6..ba104da542 100644 --- a/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/environments/local/local_env_telemetry_test.py @@ -24,6 +24,7 @@ def _format_str(zone_info: Optional[tzinfo]) -> str: return "%Y-%m-%d %H:%M:%S" +# FIXME: This fails with zone_info = None when run with `TZ="America/Chicago pytest -n0 ...` @pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_local_env_telemetry(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ @@ -70,6 +71,7 @@ def test_local_env_telemetry(tunable_groups: TunableGroups, zone_info: Optional[ ) +# FIXME: This fails with zone_info = None when run with `TZ="America/Chicago pytest -n0 ...` @pytest.mark.parametrize(("zone_info"), ZONE_INFO) def test_local_env_telemetry_no_header(tunable_groups: TunableGroups, zone_info: Optional[tzinfo]) -> None: """ From 2137cdc323354162abde1b8741842ecf3ad32b86 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 16:15:50 +0000 Subject: [PATCH 16/21] reorg for increased test coverage --- mlos_bench/mlos_bench/tests/__init__.py | 12 +++-- .../storage/trial_telemetry_test_alt_tz.py | 33 ------------- .../mlos_bench/tests/test_with_alt_tz.py | 49 +++++++++++++++++++ 3 files changed, 57 insertions(+), 37 deletions(-) delete mode 100644 mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py create mode 100644 mlos_bench/mlos_bench/tests/test_with_alt_tz.py diff --git a/mlos_bench/mlos_bench/tests/__init__.py b/mlos_bench/mlos_bench/tests/__init__.py index 8e22c66ac8..7d94c1be5e 100644 --- a/mlos_bench/mlos_bench/tests/__init__.py +++ b/mlos_bench/mlos_bench/tests/__init__.py @@ -22,14 +22,18 @@ from mlos_bench.util import get_class_from_name -ZONE_INFO: List[Optional[tzinfo]] = [ +ZONE_NAMES = [ # Explicit time zones. - pytz.UTC, - pytz.timezone("America/Chicago"), - pytz.timezone("America/Los_Angeles"), + "UTC", + "America/Chicago", + "America/Los_Angeles", # Implicit local time zone. None, ] +ZONE_INFO: List[Optional[tzinfo]] = [ + None if zone_name is None else pytz.timezone(zone_name) + for zone_name in ZONE_NAMES +] # A decorator for tests that require docker. diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py deleted file mode 100644 index ee89d494d7..0000000000 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test_alt_tz.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. -# -""" -Unit tests for saving and restoring the telemetry data when host timezone is in a different timezone. -""" - -from subprocess import run -import os -import sys -from typing import Optional - -import pytest - - -@pytest.mark.skipif(sys.platform == 'win32', reason="sh-like shell only") -@pytest.mark.parametrize(("tz_name"), [None, "America/Chicago", "America/Los_Angeles", "UTC"]) -def test_trial_telemetry_alt_tz(tz_name: Optional[str]) -> None: - """ - Run the trial telemetry tests under alternative default (un-named) TZ info. - """ - env = os.environ.copy() - if tz_name is None: - env.pop("TZ", None) - else: - env["TZ"] = tz_name - cmd = run( - [sys.executable, "-m", "pytest", "-n0", f"{os.path.dirname(__file__)}/trial_telemetry_test.py"], - env=env, - check=True, - ) - assert cmd.returncode == 0 diff --git a/mlos_bench/mlos_bench/tests/test_with_alt_tz.py b/mlos_bench/mlos_bench/tests/test_with_alt_tz.py new file mode 100644 index 0000000000..cd279f4e5d --- /dev/null +++ b/mlos_bench/mlos_bench/tests/test_with_alt_tz.py @@ -0,0 +1,49 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +""" +Tests various other test scenarios with alternative default (un-named) TZ info. +""" + +from subprocess import run +import os +import sys +from typing import Optional + +import pytest + +from mlos_bench.tests import ZONE_NAMES + + +DIRNAME = os.path.dirname(__file__) +TZ_TEST_FILES = [ + DIRNAME + "/environments/local/composite_local_env_test.py", + DIRNAME + "/environments/local/local_env_telemetry_test.py", + DIRNAME + "/storage/exp_load_test.py", + DIRNAME + "/storage/trial_telemetry_test.py", +] + + +@pytest.mark.skipif(sys.platform == 'win32', reason="TZ environment variable is a UNIXism") +@pytest.mark.parametrize(("tz_name"), ZONE_NAMES) +@pytest.mark.parametrize(("test_file"), TZ_TEST_FILES) +def test_trial_telemetry_alt_tz(tz_name: Optional[str], test_file: str) -> None: + """ + Run the tests under alternative default (un-named) TZ info. + """ + env = os.environ.copy() + if tz_name is None: + env.pop("TZ", None) + else: + env["TZ"] = tz_name + cmd = run( + [sys.executable, "-m", "pytest", "-n0", test_file], + env=env, + capture_output=True, + check=False, + ) + if cmd.returncode != 0: + print(cmd.stdout.decode()) + print(cmd.stderr.decode()) + raise AssertionError(f"Test(s) failed: # TZ='{tz_name}' '{sys.executable}' -m pytest -n0 '{test_file}'") From 62a4ff0b5d9979b568c670dc5aaeb1148b9fa194 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 17:14:11 +0000 Subject: [PATCH 17/21] fixup telemetry parsing logic --- mlos_bench/mlos_bench/environments/local/local_env.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 442127c9eb..0f5aead5a8 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -240,8 +240,14 @@ def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series: ValueError On parse errors. """ - # FIXME: We should probably not be assuming results arrived in UTC timezone. - new_datetime_col = pandas.to_datetime(datetime_col, utc=True) + new_datetime_col = pandas.to_datetime(datetime_col, utc=False) + # If timezone data is missing, assume the local timezone. + if new_datetime_col.dt.tz is None: + local_tzinfo = datetime.now().astimezone().tzinfo + new_datetime_col = new_datetime_col.dt.tz_localize(local_tzinfo) + assert new_datetime_col.dt.tz is not None + # And convert it to UTC. + new_datetime_col = new_datetime_col.dt.tz_convert('UTC') if new_datetime_col.isna().any(): raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") if new_datetime_col.le(LocalEnv._MIN_TS).any(): From e38097f172819ef7e9cc2417f70f5229f9c816a5 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 15:19:17 -0500 Subject: [PATCH 18/21] Update mlos_bench/mlos_bench/tests/__init__.py Co-authored-by: Sergiy Matusevych --- mlos_bench/mlos_bench/tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/tests/__init__.py b/mlos_bench/mlos_bench/tests/__init__.py index 7d94c1be5e..a05a61c7c2 100644 --- a/mlos_bench/mlos_bench/tests/__init__.py +++ b/mlos_bench/mlos_bench/tests/__init__.py @@ -31,7 +31,7 @@ None, ] ZONE_INFO: List[Optional[tzinfo]] = [ - None if zone_name is None else pytz.timezone(zone_name) + nullable(pytz.timezone, zone_name) for zone_name in ZONE_NAMES ] From c2f033c6243555c3854a824e9e81cd4818777726 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 15:19:44 -0500 Subject: [PATCH 19/21] Update mlos_bench/mlos_bench/tests/__init__.py Co-authored-by: Sergiy Matusevych --- mlos_bench/mlos_bench/tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/tests/__init__.py b/mlos_bench/mlos_bench/tests/__init__.py index a05a61c7c2..d1c1781ada 100644 --- a/mlos_bench/mlos_bench/tests/__init__.py +++ b/mlos_bench/mlos_bench/tests/__init__.py @@ -19,7 +19,7 @@ import pytz import pytest -from mlos_bench.util import get_class_from_name +from mlos_bench.util import get_class_from_name, nullable ZONE_NAMES = [ From e7e8683cace82b3a0f751d8b1bbef1ab25d7540f Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 20:36:21 +0000 Subject: [PATCH 20/21] reorg --- .../environments/local/local_env.py | 45 +-------- mlos_bench/mlos_bench/storage/sql/common.py | 2 +- .../mlos_bench/storage/sql/experiment.py | 3 +- mlos_bench/mlos_bench/storage/sql/trial.py | 3 +- .../mlos_bench/storage/sql/trial_data.py | 2 +- mlos_bench/mlos_bench/storage/util.py | 49 +-------- mlos_bench/mlos_bench/util.py | 99 ++++++++++++++++++- 7 files changed, 106 insertions(+), 97 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 0f5aead5a8..668965ff63 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -18,7 +18,6 @@ from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union from typing_extensions import Literal -import pytz import pandas from mlos_bench.environments.status import Status @@ -28,7 +27,7 @@ from mlos_bench.services.types.local_exec_type import SupportsLocalExec from mlos_bench.tunables.tunable import TunableValue from mlos_bench.tunables.tunable_groups import TunableGroups -from mlos_bench.util import path_join +from mlos_bench.util import datetime_parser, path_join _LOG = logging.getLogger(__name__) @@ -216,44 +215,6 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame: data.rename(str.rstrip, axis='columns', inplace=True) return data - # All timestamps in the telemetry data must be greater than this date - # (a very rough approximation for the start of this feature). - _MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - - @staticmethod - def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series: - """ - Attempt to convert a column to a datetime format. - - Parameters - ---------- - datetime_col : pandas.Series - The column to convert. - - Returns - ------- - pandas.Series - The converted datetime column. - - Raises - ------ - ValueError - On parse errors. - """ - new_datetime_col = pandas.to_datetime(datetime_col, utc=False) - # If timezone data is missing, assume the local timezone. - if new_datetime_col.dt.tz is None: - local_tzinfo = datetime.now().astimezone().tzinfo - new_datetime_col = new_datetime_col.dt.tz_localize(local_tzinfo) - assert new_datetime_col.dt.tz is not None - # And convert it to UTC. - new_datetime_col = new_datetime_col.dt.tz_convert('UTC') - if new_datetime_col.isna().any(): - raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") - if new_datetime_col.le(LocalEnv._MIN_TS).any(): - raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}") - return new_datetime_col - def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: (status, timestamp, _) = super().status() @@ -271,7 +232,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: data = self._normalize_columns( pandas.read_csv(fname, index_col=False)) - data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) + data.iloc[:, 0] = datetime_parser(data.iloc[:, 0]) expected_col_names = ["timestamp", "metric", "value"] if len(data.columns) != len(expected_col_names): @@ -281,7 +242,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: # Assume no header - this is ok for telemetry data. data = pandas.read_csv( fname, index_col=False, names=expected_col_names) - data.iloc[:, 0] = self._datetime_parser(data.iloc[:, 0]) + data.iloc[:, 0] = datetime_parser(data.iloc[:, 0]) except FileNotFoundError as ex: _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) diff --git a/mlos_bench/mlos_bench/storage/sql/common.py b/mlos_bench/mlos_bench/storage/sql/common.py index 8ecff95b45..a3895f065a 100644 --- a/mlos_bench/mlos_bench/storage/sql/common.py +++ b/mlos_bench/mlos_bench/storage/sql/common.py @@ -14,7 +14,7 @@ from mlos_bench.storage.base_experiment_data import ExperimentData from mlos_bench.storage.base_trial_data import TrialData from mlos_bench.storage.sql.schema import DbSchema -from mlos_bench.storage.util import utcify_timestamp, utcify_nullable_timestamp +from mlos_bench.util import utcify_timestamp, utcify_nullable_timestamp def get_trials( diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 575358744a..f5737b40ed 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -20,8 +20,7 @@ from mlos_bench.storage.base_storage import Storage from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.trial import Trial -from mlos_bench.storage.util import utcify_timestamp -from mlos_bench.util import nullable +from mlos_bench.util import nullable, utcify_timestamp _LOG = logging.getLogger(__name__) diff --git a/mlos_bench/mlos_bench/storage/sql/trial.py b/mlos_bench/mlos_bench/storage/sql/trial.py index 1fd38684e8..6cc93f1b57 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial.py +++ b/mlos_bench/mlos_bench/storage/sql/trial.py @@ -17,8 +17,7 @@ from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.storage.base_storage import Storage from mlos_bench.storage.sql.schema import DbSchema -from mlos_bench.storage.util import utcify_timestamp -from mlos_bench.util import nullable +from mlos_bench.util import nullable, utcify_timestamp _LOG = logging.getLogger(__name__) diff --git a/mlos_bench/mlos_bench/storage/sql/trial_data.py b/mlos_bench/mlos_bench/storage/sql/trial_data.py index 71df2b3a45..7353e96e79 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial_data.py +++ b/mlos_bench/mlos_bench/storage/sql/trial_data.py @@ -16,7 +16,7 @@ from mlos_bench.environments.status import Status from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.tunable_config_data import TunableConfigSqlData -from mlos_bench.storage.util import utcify_timestamp +from mlos_bench.util import utcify_timestamp if TYPE_CHECKING: from mlos_bench.storage.base_tunable_config_trial_group_data import TunableConfigTrialGroupData diff --git a/mlos_bench/mlos_bench/storage/util.py b/mlos_bench/mlos_bench/storage/util.py index a19d118b1c..a4610da8de 100644 --- a/mlos_bench/mlos_bench/storage/util.py +++ b/mlos_bench/mlos_bench/storage/util.py @@ -6,61 +6,14 @@ Utility functions for the storage subsystem. """ -from datetime import datetime -from typing import Dict, Literal, Optional +from typing import Dict, Optional -from pytz import UTC import pandas from mlos_bench.tunables.tunable import TunableValue, TunableValueTypeTuple from mlos_bench.util import try_parse_val -def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> datetime: - """ - Augment a timestamp with zoneinfo if missing and convert it to UTC. - - Parameters - ---------- - timestamp : datetime - A timestamp to convert to UTC. - Note: The original datetime may or may not have tzinfo associated with it. - - origin : Literal["utc", "local"] - Whether the source timestamp is considered to be in UTC or local time. - In the case of loading data from storage, where we intentionally convert all - timestamps to UTC, this can help us retrieve the original timezone when the - storage backend doesn't explicitly store it. - In the case of receiving data from a client or other source, this can help us - convert the timestamp to UTC if it's not already. - - Returns - ------- - datetime - A datetime with zoneinfo in UTC. - """ - if timestamp.tzinfo is not None or origin == "local": - # A timestamp with no zoneinfo is interpretted as "local" time - # (e.g., according to the TZ environment variable). - # That could be UTC or some other timezone, but either way we convert it to - # be explicitly UTC with zone info. - return timestamp.astimezone(UTC) - elif origin == "utc": - # If the timestamp is already in UTC, we just add the zoneinfo without conversion. - # Converting with astimezone() when the local time is *not* UTC would cause - # a timestamp conversion which we don't want. - return timestamp.replace(tzinfo=UTC) - else: - raise ValueError(f"Invalid origin: {origin}") - - -def utcify_nullable_timestamp(timestamp: Optional[datetime], *, origin: Literal["utc", "local"]) -> Optional[datetime]: - """ - A nullable version of utcify_timestamp. - """ - return utcify_timestamp(timestamp, origin=origin) if timestamp is not None else None - - def kv_df_to_dict(dataframe: pandas.DataFrame) -> Dict[str, Optional[TunableValue]]: """ Utility function to convert certain flat key-value dataframe formats used by the diff --git a/mlos_bench/mlos_bench/util.py b/mlos_bench/mlos_bench/util.py index c9133700f2..9b912d6df2 100644 --- a/mlos_bench/mlos_bench/util.py +++ b/mlos_bench/mlos_bench/util.py @@ -8,6 +8,7 @@ # NOTE: This has to be placed in the top-level mlos_bench package to avoid circular imports. +from datetime import datetime import os import json import logging @@ -15,10 +16,14 @@ import subprocess from typing import ( - Any, Callable, Dict, Iterable, Mapping, Optional, + Any, Callable, Dict, Iterable, Literal, Mapping, Optional, Tuple, Type, TypeVar, TYPE_CHECKING, Union, ) +import pandas +import pytz + + _LOG = logging.getLogger(__name__) if TYPE_CHECKING: @@ -302,3 +307,95 @@ def nullable(func: Callable, value: Optional[Any]) -> Optional[Any]: The result of the function application or None if the value is None. """ return None if value is None else func(value) + + +def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> datetime: + """ + Augment a timestamp with zoneinfo if missing and convert it to UTC. + + Parameters + ---------- + timestamp : datetime + A timestamp to convert to UTC. + Note: The original datetime may or may not have tzinfo associated with it. + + origin : Literal["utc", "local"] + Whether the source timestamp is considered to be in UTC or local time. + In the case of loading data from storage, where we intentionally convert all + timestamps to UTC, this can help us retrieve the original timezone when the + storage backend doesn't explicitly store it. + In the case of receiving data from a client or other source, this can help us + convert the timestamp to UTC if it's not already. + + Returns + ------- + datetime + A datetime with zoneinfo in UTC. + """ + if timestamp.tzinfo is not None or origin == "local": + # A timestamp with no zoneinfo is interpretted as "local" time + # (e.g., according to the TZ environment variable). + # That could be UTC or some other timezone, but either way we convert it to + # be explicitly UTC with zone info. + return timestamp.astimezone(pytz.UTC) + elif origin == "utc": + # If the timestamp is already in UTC, we just add the zoneinfo without conversion. + # Converting with astimezone() when the local time is *not* UTC would cause + # a timestamp conversion which we don't want. + return timestamp.replace(tzinfo=pytz.UTC) + else: + raise ValueError(f"Invalid origin: {origin}") + + +def utcify_nullable_timestamp(timestamp: Optional[datetime], *, origin: Literal["utc", "local"]) -> Optional[datetime]: + """ + A nullable version of utcify_timestamp. + """ + return utcify_timestamp(timestamp, origin=origin) if timestamp is not None else None + + +# All timestamps in the telemetry data must be greater than this date +# (a very rough approximation for the start of this feature). +_MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + + +def datetime_parser(datetime_col: pandas.Series, origin: Literal["utc", "local"]) -> pandas.Series: + """ + Attempt to convert a pandas column to a datetime format. + + Parameters + ---------- + datetime_col : pandas.Series + The column to convert. + + origin : Literal["utc", "local"] + Whether to interpret naive timestamps as originating from UTC or local time. + + Returns + ------- + pandas.Series + The converted datetime column. + + Raises + ------ + ValueError + On parse errors. + """ + new_datetime_col = pandas.to_datetime(datetime_col, utc=False) + # If timezone data is missing, assume the local timezone. + if new_datetime_col.dt.tz is None: + if origin == "local": + tzinfo = datetime.now().astimezone().tzinfo + elif origin == "utc": + tzinfo = pytz.UTC + else: + raise ValueError(f"Invalid timezone origin: {origin}") + new_datetime_col = new_datetime_col.dt.tz_localize(tzinfo) + assert new_datetime_col.dt.tz is not None + # And convert it to UTC. + new_datetime_col = new_datetime_col.dt.tz_convert('UTC') + if new_datetime_col.isna().any(): + raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") + if new_datetime_col.le(_MIN_TS).any(): + raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}") + return new_datetime_col From e6667bf5c1848c3436639f99d007cfb80d214d21 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Tue, 19 Mar 2024 20:39:28 +0000 Subject: [PATCH 21/21] tweaks --- mlos_bench/mlos_bench/environments/local/local_env.py | 4 ++-- mlos_bench/mlos_bench/util.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/local/local_env.py b/mlos_bench/mlos_bench/environments/local/local_env.py index 668965ff63..01f0337c1f 100644 --- a/mlos_bench/mlos_bench/environments/local/local_env.py +++ b/mlos_bench/mlos_bench/environments/local/local_env.py @@ -232,7 +232,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: data = self._normalize_columns( pandas.read_csv(fname, index_col=False)) - data.iloc[:, 0] = datetime_parser(data.iloc[:, 0]) + data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") expected_col_names = ["timestamp", "metric", "value"] if len(data.columns) != len(expected_col_names): @@ -242,7 +242,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]: # Assume no header - this is ok for telemetry data. data = pandas.read_csv( fname, index_col=False, names=expected_col_names) - data.iloc[:, 0] = datetime_parser(data.iloc[:, 0]) + data.iloc[:, 0] = datetime_parser(data.iloc[:, 0], origin="local") except FileNotFoundError as ex: _LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex) diff --git a/mlos_bench/mlos_bench/util.py b/mlos_bench/mlos_bench/util.py index 9b912d6df2..da9cea554a 100644 --- a/mlos_bench/mlos_bench/util.py +++ b/mlos_bench/mlos_bench/util.py @@ -359,7 +359,7 @@ def utcify_nullable_timestamp(timestamp: Optional[datetime], *, origin: Literal[ _MIN_TS = datetime(2024, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) -def datetime_parser(datetime_col: pandas.Series, origin: Literal["utc", "local"]) -> pandas.Series: +def datetime_parser(datetime_col: pandas.Series, *, origin: Literal["utc", "local"]) -> pandas.Series: """ Attempt to convert a pandas column to a datetime format. @@ -382,7 +382,7 @@ def datetime_parser(datetime_col: pandas.Series, origin: Literal["utc", "local"] On parse errors. """ new_datetime_col = pandas.to_datetime(datetime_col, utc=False) - # If timezone data is missing, assume the local timezone. + # If timezone data is missing, assume the provided origin timezone. if new_datetime_col.dt.tz is None: if origin == "local": tzinfo = datetime.now().astimezone().tzinfo @@ -395,7 +395,7 @@ def datetime_parser(datetime_col: pandas.Series, origin: Literal["utc", "local"] # And convert it to UTC. new_datetime_col = new_datetime_col.dt.tz_convert('UTC') if new_datetime_col.isna().any(): - raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}") + raise ValueError(f"Invalid date format in the data: {datetime_col}") if new_datetime_col.le(_MIN_TS).any(): - raise ValueError(f"Invalid date range in the telemetry data: {datetime_col}") + raise ValueError(f"Invalid date range in the data: {datetime_col}") return new_datetime_col