Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,9 @@ def update_dag_parsing_results_in_db(
# Only now we are "complete" do we update import_errors - don't want to record errors from
# previous failed attempts
import_errors.update(serialize_errors)
# Record import errors into the ORM - we don't retry on this one as it's not as critical that it works
# Record import errors into the ORM.
# While this operation is not retried (unlike DAG serialization above), failures here
# should not crash the DAG processor - they are logged and the processor continues.
try:
_update_import_errors(
files_parsed=files_parsed if files_parsed is not None else set(),
Expand All @@ -437,14 +439,23 @@ def update_dag_parsing_results_in_db(
)
except Exception:
log.exception("Error logging import errors!")
session.rollback() # Clean up session state to prevent subsequent operations from failing

# Record DAG warnings in the metadatabase.
try:
_update_dag_warnings([dag.dag_id for dag in dags], warnings, warning_types, session)
except Exception:
log.exception("Error logging DAG warnings.")
session.rollback() # Clean up session state to prevent subsequent operations from failing

session.flush()
# Flush all changes to the database.
# This is wrapped in error handling to prevent crashes from session state issues
# that could occur if earlier operations failed without proper cleanup.
try:
session.flush()
except Exception:
log.exception("Error flushing session after parsing results")
session.rollback() # Clean up session state even on flush failure


class DagModelOperation(NamedTuple):
Expand Down
174 changes: 174 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,3 +1025,177 @@ def test_update_dag_tags(self, testing_dag_bundle, session, initial_tags, new_ta
session.commit()

assert {t.name for t in dag_model.tags} == expected_tags


@pytest.mark.db_test
class TestDagProcessorCrashFix:
"""Test that DAG processor doesn't crash when database operations fail during import error recording."""

def test_update_dag_parsing_results_handles_db_failure_gracefully(self, session):
"""Test that update_dag_parsing_results_in_db doesn't crash when DB operations fail."""
import_errors = {("test_dag.py", "default"): "ModuleNotFoundError: No module named 'nonexistent_module'"}
warnings = set()
bundle_name = "test_bundle"

# Mock _update_import_errors to raise an exception (simulating DB failure)
with mock.patch(
"airflow.dag_processing.collection._update_import_errors",
side_effect=Exception("Simulated database connection failure")
):
# Mock _update_dag_warnings to succeed
with mock.patch(
"airflow.dag_processing.collection._update_dag_warnings",
return_value=None
):
# This should NOT crash with our fix - it should handle the exception gracefully
try:
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=None,
warnings=warnings,
session=session,
)
# If we get here, the fix worked - no crash occurred
assert True, "Function completed without crashing"
except Exception as e:
pytest.fail(f"Function should not crash but raised: {e}")

def test_update_dag_parsing_results_handles_dag_warnings_db_failure_gracefully(self, session):
"""Test that update_dag_parsing_results_in_db handles DAG warnings DB failures gracefully."""
import_errors = {}
warnings = set()
bundle_name = "test_bundle"

# Mock _update_import_errors to succeed
with mock.patch(
"airflow.dag_processing.collection._update_import_errors",
return_value=None
):
# Mock _update_dag_warnings to raise an exception (simulating DB failure)
with mock.patch(
"airflow.dag_processing.collection._update_dag_warnings",
side_effect=Exception("Simulated DAG warnings database failure")
):
# This should NOT crash with our fix
try:
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=None,
warnings=warnings,
session=session,
)
# If we get here, the fix worked - no crash occurred
assert True, "Function completed without crashing"
except Exception as e:
pytest.fail(f"Function should not crash but raised: {e}")

def test_update_dag_parsing_results_handles_session_flush_failure_gracefully(self, session):
"""Test that update_dag_parsing_results_in_db handles session.flush() failures gracefully."""
import_errors = {}
warnings = set()
bundle_name = "test_bundle"

# Mock all DB operations to succeed, but session.flush to fail
with mock.patch(
"airflow.dag_processing.collection._update_import_errors",
return_value=None
):
with mock.patch(
"airflow.dag_processing.collection._update_dag_warnings",
return_value=None
):
# Mock session.flush to raise an exception
with mock.patch.object(
session,
"flush",
side_effect=Exception("Simulated session flush failure")
):
# This should NOT crash with our fix - it should handle the flush error gracefully
try:
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=None,
warnings=warnings,
session=session,
)
# If we get here, the fix worked - no crash occurred
assert True, "Function completed without crashing despite flush failure"
except Exception as e:
pytest.fail(f"Function should not crash but raised: {e}")

def test_session_rollback_called_on_import_errors_failure(self, session):
"""Test that session.rollback() is called when _update_import_errors fails."""
import_errors = {("test_dag.py", "default"): "ModuleNotFoundError: No module named 'nonexistent_module'"}
warnings = set()
bundle_name = "test_bundle"

# Mock _update_import_errors to raise an exception
with mock.patch(
"airflow.dag_processing.collection._update_import_errors",
side_effect=Exception("Simulated database connection failure")
):
# Mock _update_dag_warnings to succeed
with mock.patch(
"airflow.dag_processing.collection._update_dag_warnings",
return_value=None
):
# Mock session.rollback to track if it's called
rollback_mock = mock.Mock()
with mock.patch.object(session, "rollback", rollback_mock):
# Mock session.flush to succeed
with mock.patch.object(session, "flush", return_value=None):
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=None,
warnings=warnings,
session=session,
)

# Verify that session.rollback was called (our fix)
rollback_mock.assert_called_once()

def test_session_rollback_called_on_dag_warnings_failure(self, session):
"""Test that session.rollback() is called when _update_dag_warnings fails."""
import_errors = {}
warnings = set()
bundle_name = "test_bundle"

# Mock _update_import_errors to succeed
with mock.patch(
"airflow.dag_processing.collection._update_import_errors",
return_value=None
):
# Mock _update_dag_warnings to raise an exception
with mock.patch(
"airflow.dag_processing.collection._update_dag_warnings",
side_effect=Exception("Simulated DAG warnings database failure")
):
# Mock session.rollback to track if it's called
rollback_mock = mock.Mock()
with mock.patch.object(session, "rollback", rollback_mock):
# Mock session.flush to succeed
with mock.patch.object(session, "flush", return_value=None):
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=None,
warnings=warnings,
session=session,
)

# Verify that session.rollback was called (our fix)
rollback_mock.assert_called_once()
Loading