Skip to content

Commit

Permalink
Added tests for issue #786.
Browse files Browse the repository at this point in the history
  • Loading branch information
larsgeorge-db committed Jan 19, 2024
1 parent 0948141 commit 94a143c
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 5 deletions.
9 changes: 6 additions & 3 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,15 +869,16 @@ def _get_ext_hms_conf_from_policy(cluster_policy):
@staticmethod
def _readable_timedelta(epoch):
when = datetime.fromtimestamp(epoch)
duration = datetime.now() - when
duration = datetime.now(when.tzinfo) - when
data = {}
data["days"], remaining = divmod(duration.total_seconds(), 86_400)
data["hours"], remaining = divmod(remaining, 3_600)
data["minutes"], data["seconds"] = divmod(remaining, 60)

time_parts = ((name, round(value)) for name, value in data.items())
time_parts = [f"{value} {name[:-1] if value == 1 else name}" for name, value in time_parts if value > 0]
time_parts.append("ago")
if len(time_parts) > 0:
time_parts.append("ago")
if time_parts:
return " ".join(time_parts)
else:
Expand All @@ -903,7 +904,9 @@ def latest_job_status(self) -> list[dict]:
{
"step": step,
"state": "UNKNOWN" if not (job_runs and job_state) else job_state,
"started": "<never run>" if not job_runs else self._readable_timedelta(start_time),
"started": "<never run>"
if not (job_runs and start_time)
else self._readable_timedelta(start_time),
}
)
except InvalidParameterValue as e:
Expand Down
145 changes: 143 additions & 2 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import io
from datetime import timedelta
from datetime import datetime
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, create_autospec, patch
Expand All @@ -23,7 +23,12 @@
GlobalInitScriptDetailsWithContent,
Policy,
)
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState
from databricks.sdk.service.jobs import (
BaseRun,
RunLifeCycleState,
RunResultState,
RunState,
)
from databricks.sdk.service.sql import (
Dashboard,
DataSource,
Expand Down Expand Up @@ -1200,3 +1205,139 @@ def test_repair_run_result_state(ws, caplog):
ws.jobs.list_runs.repair_run = None
install.repair_run("assessment")
assert "Please try after sometime" in caplog.text


@pytest.mark.parametrize(
"state,expected",
[
(
RunState(
result_state=None,
life_cycle_state=RunLifeCycleState.RUNNING,
),
"RUNNING",
),
(
RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
"SUCCESS",
),
(
RunState(
result_state=RunResultState.FAILED,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
"FAILED",
),
(
RunState(
result_state=None,
life_cycle_state=None,
),
"UNKNOWN",
),
],
)
def test_latest_job_status_states(ws, state, expected):
base = [
BaseRun(
job_id=123,
run_name="assessment",
state=state,
start_time=1704114000000,
)
]
install = WorkspaceInstaller(ws)
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.return_value = base
status = install.latest_job_status()
assert len(status) == 1
assert status[0]["state"] == expected


@patch(f"{databricks.labs.ucx.install.__name__}.datetime", wraps=datetime)
@pytest.mark.parametrize(
"start_time,expected",
[
(1704114000000, "1 hour ago"), # 2024-01-01 14:00:00
(1704117600000, "less than 1 second ago"), # 2024-01-01 15:00:00
(None, "<never run>"),
],
)
def test_latest_job_status_success_with_time(mock_datetime, ws, start_time, expected):
base = [
BaseRun(
job_id=123,
run_name="assessment",
state=RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
start_time=start_time,
)
]
install = WorkspaceInstaller(ws)
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.return_value = base
faked_now = datetime(2024, 1, 1, 15, 0, 0)
mock_datetime.now.return_value = faked_now
status = install.latest_job_status()
assert status[0]["started"] == expected


def test_latest_job_status_list(ws):
runs = [
[
BaseRun(
job_id=1,
run_name="job1",
state=RunState(
result_state=None,
life_cycle_state=RunLifeCycleState.RUNNING,
),
start_time=1705577671907,
)
],
[
BaseRun(
job_id=2,
run_name="job2",
state=RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
start_time=1705577671907,
)
],
[], # the last job has no runs
]
install = WorkspaceInstaller(ws)
install._state.jobs = {"job1": "1", "job2": "2", "job3": "3"}
ws.jobs.list_runs.side_effect = iter(runs)
status = install.latest_job_status()
assert len(status) == 3
assert status[0]["step"] == "job1"
assert status[0]["state"] == "RUNNING"
assert status[1]["step"] == "job2"
assert status[1]["state"] == "SUCCESS"
assert status[2]["step"] == "job3"
assert status[2]["state"] == "UNKNOWN"


def test_latest_job_status_no_job_run(ws):
install = WorkspaceInstaller(ws)
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.return_value = ""
status = install.latest_job_status()
assert len(status) == 1
assert status[0]["step"] == "assessment"


def test_latest_job_status_exception(ws):
install = WorkspaceInstaller(ws)
install._state.jobs = {"assessment": "123"}
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
status = install.latest_job_status()
assert len(status) == 0

0 comments on commit 94a143c

Please sign in to comment.