Skip to content

Commit

Permalink
Added tests for issue #786. (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
larsgeorge-db authored and dmoore247 committed Mar 23, 2024
1 parent a689361 commit 50c14cd
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 26 deletions.
46 changes: 23 additions & 23 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ def _job_clusters(self, names: set[str]):

@staticmethod
def _readable_timedelta(epoch):
when = datetime.fromtimestamp(epoch)
when = datetime.utcfromtimestamp(epoch)
duration = datetime.now() - when
data = {}
data["days"], remaining = divmod(duration.total_seconds(), 86_400)
Expand All @@ -808,41 +808,41 @@ def _readable_timedelta(epoch):

time_parts = ((name, round(value)) for name, value in data.items())
time_parts = [f"{value} {name[:-1] if value == 1 else name}" for name, value in time_parts if value > 0]
time_parts.append("ago")
if len(time_parts) > 0:
time_parts.append("ago")
if time_parts:
return " ".join(time_parts)
return "less than 1 second ago"

def latest_job_status(self) -> list[dict]:
latest_status = []
for step, job_id in self._state.jobs.items():
job_state = None
start_time = None
try:
step_status = self._step_status(job_id, step)
latest_status.append(step_status)
job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1))
except InvalidParameterValue as e:
logger.warning(f"skipping {step}: {e}")
continue
if job_runs:
state = job_runs[0].state
if state and state.result_state:
job_state = state.result_state.name
elif state and state.life_cycle_state:
job_state = state.life_cycle_state.name
if job_runs[0].start_time:
start_time = job_runs[0].start_time / 1000
latest_status.append(
{
"step": step,
"state": "UNKNOWN" if not (job_runs and job_state) else job_state,
"started": (
"<never run>" if not (job_runs and start_time) else self._readable_timedelta(start_time)
),
}
)
return latest_status

def _step_status(self, job_id, step):
job_state = None
start_time = None
job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1))
if job_runs:
state = job_runs[0].state
job_state = None
if state and state.result_state:
job_state = state.result_state.name
elif state and state.life_cycle_state:
job_state = state.life_cycle_state.name
if job_runs[0].start_time:
start_time = job_runs[0].start_time / 1000
return {
"step": step,
"state": "UNKNOWN" if not (job_runs and job_state) else job_state,
"started": "<never run>" if not job_runs else self._readable_timedelta(start_time),
}

def _get_result_state(self, job_id):
job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1))
latest_job_run = job_runs[0]
Expand Down
178 changes: 175 additions & 3 deletions tests/unit/test_install.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from datetime import timedelta
from unittest.mock import MagicMock, create_autospec
from datetime import datetime, timedelta
from unittest.mock import MagicMock, create_autospec, patch

import pytest
from databricks.labs.blueprint.installation import Installation, MockInstallation
Expand All @@ -18,7 +18,12 @@
)
from databricks.sdk.service import compute, iam, jobs, sql
from databricks.sdk.service.compute import CreatePolicyResponse, Policy, State
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState
from databricks.sdk.service.jobs import (
BaseRun,
RunLifeCycleState,
RunResultState,
RunState,
)
from databricks.sdk.service.sql import (
Dashboard,
DataSource,
Expand Down Expand Up @@ -1045,3 +1050,170 @@ def test_repair_run_result_state(ws, caplog, mock_installation_with_jobs, any_pr

workspace_installation.repair_run("assessment")
assert "Please try after sometime" in caplog.text


@pytest.mark.parametrize(
"state,expected",
[
(
RunState(
result_state=None,
life_cycle_state=RunLifeCycleState.RUNNING,
),
"RUNNING",
),
(
RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
"SUCCESS",
),
(
RunState(
result_state=RunResultState.FAILED,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
"FAILED",
),
(
RunState(
result_state=None,
life_cycle_state=None,
),
"UNKNOWN",
),
],
)
def test_latest_job_status_states(ws, mock_installation_with_jobs, any_prompt, state, expected):
base = [
BaseRun(
job_id=123,
run_name="assessment",
state=state,
start_time=1704114000000,
)
]
sql_backend = MockBackend()
wheels = create_autospec(WheelsV2)
config = WorkspaceConfig(inventory_database='ucx')
timeout = timedelta(seconds=1)
workspace_installation = WorkspaceInstallation(
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
)
ws.jobs.list_runs.return_value = base
status = workspace_installation.latest_job_status()
assert len(status) == 1
assert status[0]["state"] == expected


@patch(f"{databricks.labs.ucx.install.__name__}.datetime", wraps=datetime)
@pytest.mark.parametrize(
"start_time,expected",
[
(1704114000000, "1 hour ago"), # 2024-01-01 13:00:00
(1704117600000, "less than 1 second ago"), # 2024-01-01 14:00:00
(1704116990000, "10 minutes 10 seconds ago"), # 2024-01-01 13:49:50
(None, "<never run>"),
],
)
def test_latest_job_status_success_with_time(
mock_datetime, ws, mock_installation_with_jobs, any_prompt, start_time, expected
):
base = [
BaseRun(
job_id=123,
run_name="assessment",
state=RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
start_time=start_time,
)
]
sql_backend = MockBackend()
wheels = create_autospec(WheelsV2)
config = WorkspaceConfig(inventory_database='ucx')
timeout = timedelta(seconds=1)
workspace_installation = WorkspaceInstallation(
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
)
ws.jobs.list_runs.return_value = base
faked_now = datetime(2024, 1, 1, 14, 0, 0)
mock_datetime.now.return_value = faked_now
status = workspace_installation.latest_job_status()
assert status[0]["started"] == expected


def test_latest_job_status_list(ws, any_prompt):
runs = [
[
BaseRun(
job_id=1,
run_name="job1",
state=RunState(
result_state=None,
life_cycle_state=RunLifeCycleState.RUNNING,
),
start_time=1705577671907,
)
],
[
BaseRun(
job_id=2,
run_name="job2",
state=RunState(
result_state=RunResultState.SUCCESS,
life_cycle_state=RunLifeCycleState.TERMINATED,
),
start_time=1705577671907,
)
],
[], # the last job has no runs
]
sql_backend = MockBackend()
wheels = create_autospec(WheelsV2)
config = WorkspaceConfig(inventory_database='ucx')
timeout = timedelta(seconds=1)
mock_installation = MockInstallation(
{'state.json': {'resources': {'jobs': {"job1": "1", "job2": "2", "job3": "3"}}}}
)
workspace_installation = WorkspaceInstallation(
config, mock_installation, sql_backend, wheels, ws, any_prompt, timeout
)
ws.jobs.list_runs.side_effect = iter(runs)
status = workspace_installation.latest_job_status()
assert len(status) == 3
assert status[0]["step"] == "job1"
assert status[0]["state"] == "RUNNING"
assert status[1]["step"] == "job2"
assert status[1]["state"] == "SUCCESS"
assert status[2]["step"] == "job3"
assert status[2]["state"] == "UNKNOWN"


def test_latest_job_status_no_job_run(ws, mock_installation_with_jobs, any_prompt):
sql_backend = MockBackend()
wheels = create_autospec(WheelsV2)
config = WorkspaceConfig(inventory_database='ucx')
timeout = timedelta(seconds=1)
workspace_installation = WorkspaceInstallation(
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
)
ws.jobs.list_runs.return_value = ""
status = workspace_installation.latest_job_status()
assert len(status) == 1
assert status[0]["step"] == "assessment"


def test_latest_job_status_exception(ws, mock_installation_with_jobs, any_prompt):
sql_backend = MockBackend()
wheels = create_autospec(WheelsV2)
config = WorkspaceConfig(inventory_database='ucx')
timeout = timedelta(seconds=1)
workspace_installation = WorkspaceInstallation(
config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout
)
ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists")
status = workspace_installation.latest_job_status()
assert len(status) == 0

0 comments on commit 50c14cd

Please sign in to comment.