Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UTC datetime handling improvements #716

Merged
merged 23 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/environments/base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union
from typing_extensions import Literal

from pytz import UTC

from mlos_bench.config.schemas import ConfigSchema
from mlos_bench.dict_templater import DictTemplater
from mlos_bench.environments.status import Status
Expand Down Expand Up @@ -411,7 +413,7 @@ def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
timestamp = datetime.utcnow()
timestamp = datetime.now(UTC)
if self._is_ready:
return (Status.READY, timestamp, [])
_LOG.warning("Environment not ready: %s", self)
Expand Down
1 change: 1 addition & 0 deletions mlos_bench/mlos_bench/environments/local/local_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def _datetime_parser(datetime_col: pandas.Series) -> pandas.Series:
ValueError
On parse errors.
"""
# FIXME: We should probably not be assuming results arrived in UTC timezone.
new_datetime_col = pandas.to_datetime(datetime_col, utc=True)
bpkroth marked this conversation as resolved.
Show resolved Hide resolved
if new_datetime_col.isna().any():
raise ValueError(f"Invalid date format in the telemetry data: {datetime_col}")
Expand Down
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/environments/remote/remote_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from datetime import datetime
from typing import Dict, Iterable, Optional, Tuple

from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.environments.script_env import ScriptEnv
from mlos_bench.services.base_service import Service
Expand Down Expand Up @@ -174,5 +176,5 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optiona
(status, output) = self._remote_exec_service.get_remote_exec_results(output)
_LOG.debug("Status: %s :: %s", status, output)
# FIXME: get the timestamp from the remote environment!
timestamp = datetime.utcnow()
timestamp = datetime.now(UTC)
return (status, timestamp, output)
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/schedulers/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from typing import Any, Dict, Optional, Tuple, Type
from typing_extensions import Literal

from pytz import UTC

from mlos_bench.environments.base_environment import Environment
from mlos_bench.optimizers.base_optimizer import Optimizer
from mlos_bench.storage.base_storage import Storage
Expand Down Expand Up @@ -231,7 +233,7 @@ def _run_schedule(self, running: bool = False) -> None:
Scheduler part of the loop. Check for pending trials in the queue and run them.
"""
assert self.experiment is not None
for trial in self.experiment.pending_trials(datetime.utcnow(), running=running):
for trial in self.experiment.pending_trials(datetime.now(UTC), running=running):
self.run_trial(trial)

@abstractmethod
Expand Down
4 changes: 3 additions & 1 deletion mlos_bench/mlos_bench/schedulers/sync_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import logging
from datetime import datetime

from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.schedulers.base_scheduler import Scheduler
from mlos_bench.storage.base_storage import Storage
Expand Down Expand Up @@ -49,7 +51,7 @@ def run_trial(self, trial: Storage.Trial) -> None:
_LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables)
# FIXME: Use the actual timestamp from the environment.
_LOG.info("QUEUE: Update trial results: %s :: %s", trial, Status.FAILED)
trial.update(Status.FAILED, datetime.utcnow())
trial.update(Status.FAILED, datetime.now(UTC))
return

(status, timestamp, results) = self.environment.run() # Block and wait for the final result.
Expand Down
10 changes: 6 additions & 4 deletions mlos_bench/mlos_bench/services/remote/azure/azure_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
A collection Service functions for managing VMs on Azure.
"""

import datetime
import logging
from base64 import b64decode
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Union

from pytz import UTC

import azure.identity as azure_id
from azure.keyvault.secrets import SecretClient

Expand Down Expand Up @@ -60,7 +62,7 @@ def __init__(self,
self._req_interval = float(self.config.get("tokenRequestInterval", self._REQ_INTERVAL))

self._access_token = "RENEW *NOW*"
self._token_expiration_ts = datetime.datetime.utcnow() # Typically, some future timestamp.
self._token_expiration_ts = datetime.now(UTC) # Typically, some future timestamp.

# Login as ourselves
self._cred: Union[azure_id.AzureCliCredential, azure_id.CertificateCredential]
Expand Down Expand Up @@ -113,12 +115,12 @@ def get_access_token(self) -> str:
if "spClientId" in self.config:
self._init_sp()

ts_diff = (self._token_expiration_ts - datetime.datetime.utcnow()).total_seconds()
ts_diff = (self._token_expiration_ts - datetime.now(UTC)).total_seconds()
_LOG.debug("Time to renew the token: %.2f sec.", ts_diff)
if ts_diff < self._req_interval:
_LOG.debug("Request new accessToken")
res = self._cred.get_token("https://management.azure.com/.default")
self._token_expiration_ts = datetime.datetime.utcfromtimestamp(res.expires_on)
self._token_expiration_ts = datetime.fromtimestamp(res.expires_on, tz=UTC)
self._access_token = res.token
_LOG.info("Got new accessToken. Expiration time: %s", self._token_expiration_ts)
return self._access_token
Expand Down
3 changes: 3 additions & 0 deletions mlos_bench/mlos_bench/storage/base_trial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Dict, Optional, TYPE_CHECKING

import pandas
from pytz import UTC

from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable import TunableValue
Expand Down Expand Up @@ -38,6 +39,8 @@ def __init__(self, *,
self._experiment_id = experiment_id
self._trial_id = trial_id
self._tunable_config_id = tunable_config_id
assert ts_start.tzinfo == UTC, "ts_start must be in UTC"
assert ts_end is None or ts_end.tzinfo == UTC, "ts_end must be in UTC if not None"
self._ts_start = ts_start
self._ts_end = ts_end
self._status = status
Expand Down
25 changes: 20 additions & 5 deletions mlos_bench/mlos_bench/storage/sql/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from mlos_bench.storage.base_experiment_data import ExperimentData
from mlos_bench.storage.base_trial_data import TrialData
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.util import utcify_timestamp, utcify_nullable_timestamp


def get_trials(
Expand Down Expand Up @@ -48,8 +49,8 @@ def get_trials(
experiment_id=experiment_id,
trial_id=trial.trial_id,
config_id=trial.config_id,
ts_start=trial.ts_start,
ts_end=trial.ts_end,
ts_start=utcify_timestamp(trial.ts_start, origin="utc"),
ts_end=utcify_nullable_timestamp(trial.ts_end, origin="utc"),
status=Status[trial.status],
)
for trial in trials.fetchall()
Expand Down Expand Up @@ -107,9 +108,23 @@ def get_results_df(
)
cur_trials = conn.execute(cur_trials_stmt)
trials_df = pandas.DataFrame(
[(row.trial_id, row.ts_start, row.ts_end, row.config_id, row.tunable_config_trial_group_id, row.status)
for row in cur_trials.fetchall()],
columns=['trial_id', 'ts_start', 'ts_end', 'tunable_config_id', 'tunable_config_trial_group_id', 'status'])
[(
row.trial_id,
utcify_timestamp(row.ts_start, origin="utc"),
utcify_nullable_timestamp(row.ts_end, origin="utc"),
row.config_id,
row.tunable_config_trial_group_id,
row.status,
) for row in cur_trials.fetchall()],
columns=[
'trial_id',
'ts_start',
'ts_end',
'tunable_config_id',
'tunable_config_trial_group_id',
'status',
]
)

# Get each trial's config in wide format.
configs_stmt = schema.trial.select().with_only_columns(
Expand Down
13 changes: 10 additions & 3 deletions mlos_bench/mlos_bench/storage/sql/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
from datetime import datetime
from typing import Optional, Tuple, List, Dict, Iterator, Any

from pytz import UTC

from sqlalchemy import Engine, Connection, Table, column, func

from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.storage.base_storage import Storage
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.sql.trial import Trial
from mlos_bench.storage.util import utcify_timestamp
from mlos_bench.util import nullable

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -120,7 +123,9 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
self._schema.trial_telemetry.c.metric_id,
)
)
return [(row.ts, row.metric_id, row.metric_value)
# Not all storage backends store the original zone info.
# We try to ensure data is entered in UTC and augment it on return again here.
return [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value)
for row in cur_telemetry.fetchall()]

def load(self,
Expand Down Expand Up @@ -183,6 +188,7 @@ def _save_params(conn: Connection, table: Table,
])

def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]:
timestamp = utcify_timestamp(timestamp, origin="local")
_LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp)
if running:
pending_status = ['PENDING', 'READY', 'RUNNING']
Expand Down Expand Up @@ -238,15 +244,16 @@ def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int:

def new_trial(self, tunables: TunableGroups, ts_start: Optional[datetime] = None,
config: Optional[Dict[str, Any]] = None) -> Storage.Trial:
_LOG.debug("Create trial: %s:%d", self._experiment_id, self._trial_id)
ts_start = utcify_timestamp(ts_start or datetime.now(UTC), origin="local")
_LOG.debug("Create trial: %s:%d @ %s", self._experiment_id, self._trial_id, ts_start)
with self._engine.begin() as conn:
try:
config_id = self._get_config_id(conn, tunables)
conn.execute(self._schema.trial.insert().values(
exp_id=self._experiment_id,
trial_id=self._trial_id,
config_id=config_id,
ts_start=ts_start or datetime.utcnow(),
ts_start=ts_start,
status='PENDING',
))

Expand Down
4 changes: 2 additions & 2 deletions mlos_bench/mlos_bench/storage/sql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, engine: Engine):
self._meta,
Column("exp_id", String(self._ID_LEN), nullable=False),
Column("trial_id", Integer, nullable=False),
Column("ts", DateTime, nullable=False, default="now"),
Column("ts", DateTime(timezone=True), nullable=False, default="now"),
Column("status", String(self._STATUS_LEN), nullable=False),

UniqueConstraint("exp_id", "trial_id", "ts"),
Expand All @@ -181,7 +181,7 @@ def __init__(self, engine: Engine):
self._meta,
Column("exp_id", String(self._ID_LEN), nullable=False),
Column("trial_id", Integer, nullable=False),
Column("ts", DateTime, nullable=False, default="now"),
Column("ts", DateTime(timezone=True), nullable=False, default="now"),
Column("metric_id", String(self._ID_LEN), nullable=False),
Column("metric_value", String(self._METRIC_VALUE_LEN)),

Expand Down
8 changes: 8 additions & 0 deletions mlos_bench/mlos_bench/storage/sql/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.storage.base_storage import Storage
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.util import utcify_timestamp
from mlos_bench.util import nullable

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,6 +48,8 @@ def __init__(self, *,
def update(self, status: Status, timestamp: datetime,
metrics: Optional[Union[Dict[str, Any], float]] = None
) -> Optional[Dict[str, Any]]:
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
metrics = super().update(status, timestamp, metrics)
with self._engine.begin() as conn:
self._update_status(conn, status, timestamp)
Expand Down Expand Up @@ -106,6 +109,9 @@ def update(self, status: Status, timestamp: datetime,
def update_telemetry(self, status: Status, timestamp: datetime,
metrics: List[Tuple[datetime, str, Any]]) -> None:
super().update_telemetry(status, timestamp, metrics)
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
metrics = [(utcify_timestamp(ts, origin="local"), key, val) for (ts, key, val) in metrics]
# NOTE: Not every SQLAlchemy dialect supports `Insert.on_conflict_do_nothing()`
# and we need to keep `.update_telemetry()` idempotent; hence a loop instead of
# a bulk upsert.
Expand All @@ -130,6 +136,8 @@ def _update_status(self, conn: Connection, status: Status, timestamp: datetime)
Insert a new status record into the database.
This call is idempotent.
"""
# Make sure to convert the timestamp to UTC before storing it in the database.
timestamp = utcify_timestamp(timestamp, origin="local")
try:
conn.execute(self._schema.trial_status.insert().values(
exp_id=self._experiment_id,
Expand Down
5 changes: 4 additions & 1 deletion mlos_bench/mlos_bench/storage/sql/trial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from mlos_bench.environments.status import Status
from mlos_bench.storage.sql.schema import DbSchema
from mlos_bench.storage.sql.tunable_config_data import TunableConfigSqlData
from mlos_bench.storage.util import utcify_timestamp

if TYPE_CHECKING:
from mlos_bench.storage.base_tunable_config_trial_group_data import TunableConfigTrialGroupData
Expand Down Expand Up @@ -100,8 +101,10 @@ def telemetry_df(self) -> pandas.DataFrame:
self._schema.trial_telemetry.c.metric_id,
)
)
# Not all storage backends store the original zone info.
# We try to ensure data is entered in UTC and augment it on return again here.
return pandas.DataFrame(
[(row.ts, row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()],
[(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()],
columns=['ts', 'metric', 'value'])

@property
Expand Down
49 changes: 48 additions & 1 deletion mlos_bench/mlos_bench/storage/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,61 @@
Utility functions for the storage subsystem.
"""

from typing import Dict, Optional
from datetime import datetime
from typing import Dict, Literal, Optional

from pytz import UTC
import pandas

from mlos_bench.tunables.tunable import TunableValue, TunableValueTypeTuple
from mlos_bench.util import try_parse_val


def utcify_timestamp(timestamp: datetime, *, origin: Literal["utc", "local"]) -> datetime:
"""
Augment a timestamp with zoneinfo if missing and convert it to UTC.

Parameters
----------
timestamp : datetime
A timestamp to convert to UTC.
Note: The original datetime may or may not have tzinfo associated with it.

origin : Literal["utc", "local"]
Whether the source timestamp is considered to be in UTC or local time.
In the case of loading data from storage, where we intentionally convert all
timestamps to UTC, this can help us retrieve the original timezone when the
storage backend doesn't explicitly store it.
In the case of receiving data from a client or other source, this can help us
convert the timestamp to UTC if it's not already.

Returns
-------
datetime
A datetime with zoneinfo in UTC.
"""
if timestamp.tzinfo is not None or origin == "local":
# A timestamp with no zoneinfo is interpretted as "local" time
# (e.g., according to the TZ environment variable).
# That could be UTC or some other timezone, but either way we convert it to
# be explicitly UTC with zone info.
return timestamp.astimezone(UTC)
elif origin == "utc":
# If the timestamp is already in UTC, we just add the zoneinfo without conversion.
# Converting with astimezone() when the local time is *not* UTC would cause
# a timestamp conversion which we don't want.
return timestamp.replace(tzinfo=UTC)
else:
raise ValueError(f"Invalid origin: {origin}")


def utcify_nullable_timestamp(timestamp: Optional[datetime], *, origin: Literal["utc", "local"]) -> Optional[datetime]:
"""
A nullable version of utcify_timestamp.
"""
return utcify_timestamp(timestamp, origin=origin) if timestamp is not None else None


def kv_df_to_dict(dataframe: pandas.DataFrame) -> Dict[str, Optional[TunableValue]]:
"""
Utility function to convert certain flat key-value dataframe formats used by the
Expand Down
Loading
Loading