Skip to content

Commit 7335d52

Browse files
committed
Fix MLflow experiment tracker crash with non-existent runs
Previously, the MLflow experiment tracker would crash with a RESOURCE_DOES_NOT_EXIST error when attempting to resume a run that existed in ZenML's cache but not on the MLflow server. This was particularly problematic with Azure ML deployments. The fix validates that a cached run_id actually exists on the MLflow server before attempting to resume it. If validation fails, the code gracefully creates a new run instead of crashing. Changes: - Added MlflowException import - Added run existence validation in prepare_step_run() - Log warning when creating new run instead of resuming stale one - Added test to verify graceful handling of missing runs Fixes #4207
1 parent 00de1ad commit 7335d52

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

src/zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import mlflow
2121
from mlflow.entities import Experiment, Run
22+
from mlflow.exceptions import MlflowException
2223
from mlflow.store.db.db_types import DATABASE_ENGINES
2324

2425
import zenml
@@ -195,6 +196,18 @@ def prepare_step_run(self, info: "StepRunInfo") -> None:
195196
experiment_name=experiment_name, run_name=info.run_name
196197
)
197198

199+
# Validate that the run exists before attempting to resume it
200+
if run_id:
201+
try:
202+
mlflow.get_run(run_id)
203+
except MlflowException as e:
204+
# Run doesn't exist on the MLflow server, create a new one
205+
logger.warning(
206+
f"Run with id {run_id} not found in MLflow tracking server. "
207+
f"Creating a new run instead. Error: {e}"
208+
)
209+
run_id = None
210+
198211
tags = settings.tags.copy()
199212
tags.update(self._get_internal_tags())
200213

tests/integration/integrations/mlflow/experiment_trackers/test_mlflow_experiment_tracker.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import os
1616
from contextlib import ExitStack as does_not_raise
1717
from datetime import datetime
18+
from unittest.mock import MagicMock, patch
1819
from uuid import uuid4
1920

2021
import pytest
22+
from mlflow.exceptions import MlflowException
2123
from pydantic import ValidationError
2224

2325
from zenml.enums import StackComponentType
@@ -245,3 +247,69 @@ def test_mlflow_experiment_tracker_set_config(local_stack: Stack) -> None:
245247
assert os.environ[DATABRICKS_PASSWORD] == "password"
246248
assert os.environ[DATABRICKS_TOKEN] == "token1234"
247249
assert os.environ[DATABRICKS_HOST] == "https://databricks.com"
250+
251+
252+
@patch("mlflow.start_run")
253+
@patch("mlflow.get_run")
254+
@patch("mlflow.get_experiment_by_name")
255+
@patch("mlflow.set_experiment")
256+
def test_mlflow_experiment_tracker_handles_missing_run(
257+
mock_set_experiment: MagicMock,
258+
mock_get_experiment: MagicMock,
259+
mock_get_run: MagicMock,
260+
mock_start_run: MagicMock,
261+
) -> None:
262+
"""Tests that the MLflow experiment tracker handles missing runs gracefully.
263+
264+
This test verifies the fix for issue #4207 where MLflow would crash
265+
when trying to resume a run that doesn't exist on the server.
266+
"""
267+
# Setup mocks
268+
mock_experiment = MagicMock()
269+
mock_experiment.experiment_id = "test_experiment_id"
270+
mock_get_experiment.return_value = mock_experiment
271+
272+
# Simulate a run that doesn't exist on the MLflow server
273+
mock_get_run.side_effect = MlflowException("RESOURCE_DOES_NOT_EXIST")
274+
275+
# Create experiment tracker
276+
tracker = MLFlowExperimentTracker(
277+
name="test_tracker",
278+
id=uuid4(),
279+
config=MLFlowExperimentTrackerConfig(
280+
tracking_uri="file:///tmp/mlflow",
281+
),
282+
flavor="mlflow",
283+
type=StackComponentType.EXPERIMENT_TRACKER,
284+
user=uuid4(),
285+
created=datetime.now(),
286+
updated=datetime.now(),
287+
)
288+
289+
# Create a mock StepRunInfo
290+
mock_step_info = MagicMock()
291+
mock_step_info.pipeline.name = "test_pipeline"
292+
mock_step_info.run_name = "test_run"
293+
mock_step_info.pipeline_step_name = "test_step"
294+
295+
# Mock get_run_id to return a stale run_id
296+
with patch.object(tracker, "get_run_id", return_value="stale_run_id"):
297+
with patch.object(
298+
tracker,
299+
"get_settings",
300+
return_value=MagicMock(
301+
experiment_name=None,
302+
tags={},
303+
nested=False,
304+
),
305+
):
306+
# This should not raise an exception, even though the run doesn't exist
307+
tracker.prepare_step_run(mock_step_info)
308+
309+
# Verify that start_run was called with run_id=None (creating a new run)
310+
mock_start_run.assert_called_once()
311+
call_kwargs = mock_start_run.call_args[1]
312+
assert call_kwargs["run_id"] is None, (
313+
"Expected run_id to be None when run doesn't exist"
314+
)
315+
assert call_kwargs["run_name"] == "test_run"

0 commit comments

Comments
 (0)