diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index 185359239d..4c8bc69709 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -799,7 +799,7 @@ def _job_clusters(self, names: set[str]): @staticmethod def _readable_timedelta(epoch): - when = datetime.fromtimestamp(epoch) + when = datetime.utcfromtimestamp(epoch) duration = datetime.now() - when data = {} data["days"], remaining = divmod(duration.total_seconds(), 86_400) @@ -808,7 +808,8 @@ def _readable_timedelta(epoch): time_parts = ((name, round(value)) for name, value in data.items()) time_parts = [f"{value} {name[:-1] if value == 1 else name}" for name, value in time_parts if value > 0] - time_parts.append("ago") + if len(time_parts) > 0: + time_parts.append("ago") if time_parts: return " ".join(time_parts) return "less than 1 second ago" @@ -816,33 +817,32 @@ def _readable_timedelta(epoch): def latest_job_status(self) -> list[dict]: latest_status = [] for step, job_id in self._state.jobs.items(): + job_state = None + start_time = None try: - step_status = self._step_status(job_id, step) - latest_status.append(step_status) + job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1)) except InvalidParameterValue as e: logger.warning(f"skipping {step}: {e}") continue + if job_runs: + state = job_runs[0].state + if state and state.result_state: + job_state = state.result_state.name + elif state and state.life_cycle_state: + job_state = state.life_cycle_state.name + if job_runs[0].start_time: + start_time = job_runs[0].start_time / 1000 + latest_status.append( + { + "step": step, + "state": "UNKNOWN" if not (job_runs and job_state) else job_state, + "started": ( + "" if not (job_runs and start_time) else self._readable_timedelta(start_time) + ), + } + ) return latest_status - def _step_status(self, job_id, step): - job_state = None - start_time = None - job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1)) - if job_runs: - state = job_runs[0].state - job_state = None - if state and state.result_state: - job_state = state.result_state.name - elif state and state.life_cycle_state: - job_state = state.life_cycle_state.name - if job_runs[0].start_time: - start_time = job_runs[0].start_time / 1000 - return { - "step": step, - "state": "UNKNOWN" if not (job_runs and job_state) else job_state, - "started": "" if not job_runs else self._readable_timedelta(start_time), - } - def _get_result_state(self, job_id): job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1)) latest_job_run = job_runs[0] diff --git a/tests/unit/test_install.py b/tests/unit/test_install.py index 4bf25e4582..eef8fef662 100644 --- a/tests/unit/test_install.py +++ b/tests/unit/test_install.py @@ -1,6 +1,6 @@ import json -from datetime import timedelta -from unittest.mock import MagicMock, create_autospec +from datetime import datetime, timedelta +from unittest.mock import MagicMock, create_autospec, patch import pytest from databricks.labs.blueprint.installation import Installation, MockInstallation @@ -18,7 +18,12 @@ ) from databricks.sdk.service import compute, iam, jobs, sql from databricks.sdk.service.compute import CreatePolicyResponse, Policy, State -from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState +from databricks.sdk.service.jobs import ( + BaseRun, + RunLifeCycleState, + RunResultState, + RunState, +) from databricks.sdk.service.sql import ( Dashboard, DataSource, @@ -1045,3 +1050,170 @@ def test_repair_run_result_state(ws, caplog, mock_installation_with_jobs, any_pr workspace_installation.repair_run("assessment") assert "Please try after sometime" in caplog.text + + +@pytest.mark.parametrize( + "state,expected", + [ + ( + RunState( + result_state=None, + life_cycle_state=RunLifeCycleState.RUNNING, + ), + "RUNNING", + ), + ( + RunState( + result_state=RunResultState.SUCCESS, + life_cycle_state=RunLifeCycleState.TERMINATED, + ), + "SUCCESS", + ), + ( + RunState( + result_state=RunResultState.FAILED, + life_cycle_state=RunLifeCycleState.TERMINATED, + ), + "FAILED", + ), + ( + RunState( + result_state=None, + life_cycle_state=None, + ), + "UNKNOWN", + ), + ], +) +def test_latest_job_status_states(ws, mock_installation_with_jobs, any_prompt, state, expected): + base = [ + BaseRun( + job_id=123, + run_name="assessment", + state=state, + start_time=1704114000000, + ) + ] + sql_backend = MockBackend() + wheels = create_autospec(WheelsV2) + config = WorkspaceConfig(inventory_database='ucx') + timeout = timedelta(seconds=1) + workspace_installation = WorkspaceInstallation( + config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout + ) + ws.jobs.list_runs.return_value = base + status = workspace_installation.latest_job_status() + assert len(status) == 1 + assert status[0]["state"] == expected + + +@patch(f"{databricks.labs.ucx.install.__name__}.datetime", wraps=datetime) +@pytest.mark.parametrize( + "start_time,expected", + [ + (1704114000000, "1 hour ago"), # 2024-01-01 13:00:00 + (1704117600000, "less than 1 second ago"), # 2024-01-01 14:00:00 + (1704116990000, "10 minutes 10 seconds ago"), # 2024-01-01 13:49:50 + (None, ""), + ], +) +def test_latest_job_status_success_with_time( + mock_datetime, ws, mock_installation_with_jobs, any_prompt, start_time, expected +): + base = [ + BaseRun( + job_id=123, + run_name="assessment", + state=RunState( + result_state=RunResultState.SUCCESS, + life_cycle_state=RunLifeCycleState.TERMINATED, + ), + start_time=start_time, + ) + ] + sql_backend = MockBackend() + wheels = create_autospec(WheelsV2) + config = WorkspaceConfig(inventory_database='ucx') + timeout = timedelta(seconds=1) + workspace_installation = WorkspaceInstallation( + config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout + ) + ws.jobs.list_runs.return_value = base + faked_now = datetime(2024, 1, 1, 14, 0, 0) + mock_datetime.now.return_value = faked_now + status = workspace_installation.latest_job_status() + assert status[0]["started"] == expected + + +def test_latest_job_status_list(ws, any_prompt): + runs = [ + [ + BaseRun( + job_id=1, + run_name="job1", + state=RunState( + result_state=None, + life_cycle_state=RunLifeCycleState.RUNNING, + ), + start_time=1705577671907, + ) + ], + [ + BaseRun( + job_id=2, + run_name="job2", + state=RunState( + result_state=RunResultState.SUCCESS, + life_cycle_state=RunLifeCycleState.TERMINATED, + ), + start_time=1705577671907, + ) + ], + [], # the last job has no runs + ] + sql_backend = MockBackend() + wheels = create_autospec(WheelsV2) + config = WorkspaceConfig(inventory_database='ucx') + timeout = timedelta(seconds=1) + mock_installation = MockInstallation( + {'state.json': {'resources': {'jobs': {"job1": "1", "job2": "2", "job3": "3"}}}} + ) + workspace_installation = WorkspaceInstallation( + config, mock_installation, sql_backend, wheels, ws, any_prompt, timeout + ) + ws.jobs.list_runs.side_effect = iter(runs) + status = workspace_installation.latest_job_status() + assert len(status) == 3 + assert status[0]["step"] == "job1" + assert status[0]["state"] == "RUNNING" + assert status[1]["step"] == "job2" + assert status[1]["state"] == "SUCCESS" + assert status[2]["step"] == "job3" + assert status[2]["state"] == "UNKNOWN" + + +def test_latest_job_status_no_job_run(ws, mock_installation_with_jobs, any_prompt): + sql_backend = MockBackend() + wheels = create_autospec(WheelsV2) + config = WorkspaceConfig(inventory_database='ucx') + timeout = timedelta(seconds=1) + workspace_installation = WorkspaceInstallation( + config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout + ) + ws.jobs.list_runs.return_value = "" + status = workspace_installation.latest_job_status() + assert len(status) == 1 + assert status[0]["step"] == "assessment" + + +def test_latest_job_status_exception(ws, mock_installation_with_jobs, any_prompt): + sql_backend = MockBackend() + wheels = create_autospec(WheelsV2) + config = WorkspaceConfig(inventory_database='ucx') + timeout = timedelta(seconds=1) + workspace_installation = WorkspaceInstallation( + config, mock_installation_with_jobs, sql_backend, wheels, ws, any_prompt, timeout + ) + ws.jobs.list_runs.side_effect = InvalidParameterValue("Workflow does not exists") + status = workspace_installation.latest_job_status() + assert len(status) == 0