diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 0caa4b4cb2b48..b08e78b286d7a 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -427,7 +427,9 @@ def update_dag_parsing_results_in_db( # Only now we are "complete" do we update import_errors - don't want to record errors from # previous failed attempts import_errors.update(serialize_errors) - # Record import errors into the ORM - we don't retry on this one as it's not as critical that it works + # Record import errors into the ORM. + # While this operation is not retried (unlike DAG serialization above), failures here + # should not crash the DAG processor - they are logged and the processor continues. try: _update_import_errors( files_parsed=files_parsed if files_parsed is not None else set(), @@ -437,14 +439,23 @@ def update_dag_parsing_results_in_db( ) except Exception: log.exception("Error logging import errors!") + session.rollback() # Clean up session state to prevent subsequent operations from failing # Record DAG warnings in the metadatabase. try: _update_dag_warnings([dag.dag_id for dag in dags], warnings, warning_types, session) except Exception: log.exception("Error logging DAG warnings.") + session.rollback() # Clean up session state to prevent subsequent operations from failing - session.flush() + # Flush all changes to the database. + # This is wrapped in error handling to prevent crashes from session state issues + # that could occur if earlier operations failed without proper cleanup. + try: + session.flush() + except Exception: + log.exception("Error flushing session after parsing results") + session.rollback() # Clean up session state even on flush failure class DagModelOperation(NamedTuple): diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index f98cf715b2f29..b17baea21868b 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -1025,3 +1025,177 @@ def test_update_dag_tags(self, testing_dag_bundle, session, initial_tags, new_ta session.commit() assert {t.name for t in dag_model.tags} == expected_tags + + +@pytest.mark.db_test +class TestDagProcessorCrashFix: + """Test that DAG processor doesn't crash when database operations fail during import error recording.""" + + def test_update_dag_parsing_results_handles_db_failure_gracefully(self, session): + """Test that update_dag_parsing_results_in_db doesn't crash when DB operations fail.""" + import_errors = {("test_dag.py", "default"): "ModuleNotFoundError: No module named 'nonexistent_module'"} + warnings = set() + bundle_name = "test_bundle" + + # Mock _update_import_errors to raise an exception (simulating DB failure) + with mock.patch( + "airflow.dag_processing.collection._update_import_errors", + side_effect=Exception("Simulated database connection failure") + ): + # Mock _update_dag_warnings to succeed + with mock.patch( + "airflow.dag_processing.collection._update_dag_warnings", + return_value=None + ): + # This should NOT crash with our fix - it should handle the exception gracefully + try: + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=None, + warnings=warnings, + session=session, + ) + # If we get here, the fix worked - no crash occurred + assert True, "Function completed without crashing" + except Exception as e: + pytest.fail(f"Function should not crash but raised: {e}") + + def test_update_dag_parsing_results_handles_dag_warnings_db_failure_gracefully(self, session): + """Test that update_dag_parsing_results_in_db handles DAG warnings DB failures gracefully.""" + import_errors = {} + warnings = set() + bundle_name = "test_bundle" + + # Mock _update_import_errors to succeed + with mock.patch( + "airflow.dag_processing.collection._update_import_errors", + return_value=None + ): + # Mock _update_dag_warnings to raise an exception (simulating DB failure) + with mock.patch( + "airflow.dag_processing.collection._update_dag_warnings", + side_effect=Exception("Simulated DAG warnings database failure") + ): + # This should NOT crash with our fix + try: + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=None, + warnings=warnings, + session=session, + ) + # If we get here, the fix worked - no crash occurred + assert True, "Function completed without crashing" + except Exception as e: + pytest.fail(f"Function should not crash but raised: {e}") + + def test_update_dag_parsing_results_handles_session_flush_failure_gracefully(self, session): + """Test that update_dag_parsing_results_in_db handles session.flush() failures gracefully.""" + import_errors = {} + warnings = set() + bundle_name = "test_bundle" + + # Mock all DB operations to succeed, but session.flush to fail + with mock.patch( + "airflow.dag_processing.collection._update_import_errors", + return_value=None + ): + with mock.patch( + "airflow.dag_processing.collection._update_dag_warnings", + return_value=None + ): + # Mock session.flush to raise an exception + with mock.patch.object( + session, + "flush", + side_effect=Exception("Simulated session flush failure") + ): + # This should NOT crash with our fix - it should handle the flush error gracefully + try: + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=None, + warnings=warnings, + session=session, + ) + # If we get here, the fix worked - no crash occurred + assert True, "Function completed without crashing despite flush failure" + except Exception as e: + pytest.fail(f"Function should not crash but raised: {e}") + + def test_session_rollback_called_on_import_errors_failure(self, session): + """Test that session.rollback() is called when _update_import_errors fails.""" + import_errors = {("test_dag.py", "default"): "ModuleNotFoundError: No module named 'nonexistent_module'"} + warnings = set() + bundle_name = "test_bundle" + + # Mock _update_import_errors to raise an exception + with mock.patch( + "airflow.dag_processing.collection._update_import_errors", + side_effect=Exception("Simulated database connection failure") + ): + # Mock _update_dag_warnings to succeed + with mock.patch( + "airflow.dag_processing.collection._update_dag_warnings", + return_value=None + ): + # Mock session.rollback to track if it's called + rollback_mock = mock.Mock() + with mock.patch.object(session, "rollback", rollback_mock): + # Mock session.flush to succeed + with mock.patch.object(session, "flush", return_value=None): + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=None, + warnings=warnings, + session=session, + ) + + # Verify that session.rollback was called (our fix) + rollback_mock.assert_called_once() + + def test_session_rollback_called_on_dag_warnings_failure(self, session): + """Test that session.rollback() is called when _update_dag_warnings fails.""" + import_errors = {} + warnings = set() + bundle_name = "test_bundle" + + # Mock _update_import_errors to succeed + with mock.patch( + "airflow.dag_processing.collection._update_import_errors", + return_value=None + ): + # Mock _update_dag_warnings to raise an exception + with mock.patch( + "airflow.dag_processing.collection._update_dag_warnings", + side_effect=Exception("Simulated DAG warnings database failure") + ): + # Mock session.rollback to track if it's called + rollback_mock = mock.Mock() + with mock.patch.object(session, "rollback", rollback_mock): + # Mock session.flush to succeed + with mock.patch.object(session, "flush", return_value=None): + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=None, + warnings=warnings, + session=session, + ) + + # Verify that session.rollback was called (our fix) + rollback_mock.assert_called_once()