diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 91bff694a7295..942ea15ab9e61 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -285,21 +285,22 @@ def _update_import_errors( ): from airflow.listeners.listener import get_listener_manager - # We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old - # errors (i.e. from files that are removed) separately - - session.execute( - delete(ParseImportError).where( - tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_parsed) - ) - ) - - # the below query has to match (bundle_name, filename) tuple in that order since the - # import_errors list is a dict with keys as (bundle_name, relative_fileloc) + # Check existing import errors BEFORE deleting, so we can determine if we should update or create existing_import_error_files = set( session.execute(select(ParseImportError.bundle_name, ParseImportError.filename)) ) - # Add the errors of the processed files + + # Delete errors for files that were parsed but don't have errors in import_errors + # (i.e., files that were successfully parsed without errors) + files_to_clear = files_parsed.difference(import_errors) + if files_to_clear: + session.execute( + delete(ParseImportError).where( + tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_to_clear) + ) + ) + + # Add or update the errors of the processed files for key, stacktrace in import_errors.items(): bundle_name_, relative_fileloc = key @@ -371,6 +372,7 @@ def update_dag_parsing_results_in_db( session: Session, *, warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,), + files_parsed: set[tuple[str, str]] | None = None, ): """ Update everything to do with DAG parsing in the DB. @@ -388,6 +390,10 @@ def update_dag_parsing_results_in_db( then all warnings and errors related to this file will be removed. ``import_errors`` will be updated in place with an new errors + + :param files_parsed: Set of (bundle_name, relative_fileloc) tuples for all files that were parsed. + If None, will be inferred from dags and import_errors. Passing this explicitly ensures that + import errors are cleared for files that were parsed but no longer contain DAGs. """ # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case # of any Operational Errors @@ -423,16 +429,8 @@ def update_dag_parsing_results_in_db( import_errors.update(serialize_errors) # Record import errors into the ORM - we don't retry on this one as it's not as critical that it works try: - # TODO: This won't clear errors for files that exist that no longer contain DAGs. Do we need to pass - # in the list of file parsed? - - good_dag_filelocs = { - (bundle_name, dag.relative_fileloc) - for dag in dags - if dag.relative_fileloc is not None and (bundle_name, dag.relative_fileloc) not in import_errors - } _update_import_errors( - files_parsed=good_dag_filelocs, + files_parsed=files_parsed if files_parsed is not None else set(), bundle_name=bundle_name, import_errors=import_errors, session=session, diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py b/airflow-core/src/airflow/dag_processing/dagbag.py index 173f5b05b4e2f..0259992dc78f7 100644 --- a/airflow-core/src/airflow/dag_processing/dagbag.py +++ b/airflow-core/src/airflow/dag_processing/dagbag.py @@ -695,6 +695,16 @@ def sync_bag_to_db( from airflow.dag_processing.collection import update_dag_parsing_results_in_db import_errors = {(bundle_name, rel_path): error for rel_path, error in dagbag.import_errors.items()} + + # Build the set of all files that were parsed and include files with import errors + # in case they are not in file_last_changed + files_parsed = set(import_errors) + if dagbag.bundle_path: + files_parsed.update( + (bundle_name, dagbag._get_relative_fileloc(abs_filepath)) + for abs_filepath in dagbag.file_last_changed + ) + update_dag_parsing_results_in_db( bundle_name, bundle_version, @@ -703,4 +713,5 @@ def sync_bag_to_db( None, # file parsing duration is not well defined when parsing multiple files / multiple DAGs. dagbag.dag_warnings, session=session, + files_parsed=files_parsed, ) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 232c0d978acff..ae45a690074c0 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -858,6 +858,7 @@ def _collect_results(self, session: Session = NEW_SESSION): parsing_result=proc.parsing_result, session=session, is_callback_only=is_callback_only, + relative_fileloc=str(file.rel_path), ) for file in finished: @@ -1147,6 +1148,7 @@ def process_parse_results( session: Session, *, is_callback_only: bool = False, + relative_fileloc: str | None = None, ) -> DagFileStat: """Take the parsing result and stats about the parser process and convert it into a DagFileStat.""" if is_callback_only: @@ -1181,6 +1183,14 @@ def process_parse_results( import_errors = { (bundle_name, rel_path): error for rel_path, error in parsing_result.import_errors.items() } + + # Build the set of files that were parsed. This includes the file that was parsed, + # even if it no longer contains DAGs, so we can clear old import errors. + files_parsed: set[tuple[str, str]] | None = None + if relative_fileloc is not None: + files_parsed = {(bundle_name, relative_fileloc)} + files_parsed.update(import_errors.keys()) + update_dag_parsing_results_in_db( bundle_name=bundle_name, bundle_version=bundle_version, @@ -1189,6 +1199,7 @@ def process_parse_results( parse_duration=run_duration, warnings=set(parsing_result.warnings or []), session=session, + files_parsed=files_parsed, ) stat.num_dags = len(parsing_result.serialized_dags) if parsing_result.import_errors: diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index 64e15ede31795..f98cf715b2f29 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -492,7 +492,9 @@ def test_serialized_dag_errors_are_import_errors( mock_full_path.return_value = "abc.py" import_errors = {} - update_dag_parsing_results_in_db("testing", None, [dag], import_errors, None, set(), session) + update_dag_parsing_results_in_db( + "testing", None, [dag], import_errors, None, set(), session, files_parsed={("testing", "abc.py")} + ) assert "SerializationError" in caplog.text # Should have been edited in place @@ -656,6 +658,7 @@ def test_new_import_error_replaces_old( parse_duration=None, warnings=set(), session=session, + files_parsed={("testing", "abc.py")}, ) import_error = ( @@ -714,6 +717,7 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session): parse_duration=None, warnings=set(), session=session, + files_parsed={(bundle_name, "abc.py")}, ) dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is False @@ -758,6 +762,7 @@ def test_remove_error_updates_loaded_dag_model(self, testing_dag_bundle, session parse_duration=None, warnings=set(), session=session, + files_parsed={(bundle_name, "abc.py")}, ) dag_model = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is True @@ -774,6 +779,53 @@ def test_remove_error_updates_loaded_dag_model(self, testing_dag_bundle, session ) assert dag_model.has_import_errors is False + @pytest.mark.usefixtures("clean_db") + def test_clear_import_error_for_file_without_dags(self, testing_dag_bundle, session): + """ + Test that import errors are cleared for files that were parsed but no longer contain DAGs. + """ + bundle_name = "testing" + filename = "no_dags.py" + + prev_error = ParseImportError( + filename=filename, + bundle_name=bundle_name, + timestamp=tz.utcnow(), + stacktrace="Previous import error", + ) + session.add(prev_error) + + # And import error for another file we haven't parsed (this shouldn't be deleted) + other_file_error = ParseImportError( + filename="other.py", + bundle_name=bundle_name, + timestamp=tz.utcnow(), + stacktrace="Some error", + ) + session.add(other_file_error) + session.flush() + + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) + assert import_errors == {("no_dags.py", bundle_name), ("other.py", bundle_name)} + + # Simulate parsing the file: it was parsed successfully (no import errors), + # but it no longer contains any DAGs. By passing files_parsed, we ensure + # the import error is cleared even though there are no DAGs. + files_parsed = {(bundle_name, filename)} + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], # No DAGs in this file + import_errors={}, # No import errors + parse_duration=None, + warnings=set(), + session=session, + files_parsed=files_parsed, + ) + + import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))) + assert import_errors == {("other.py", bundle_name)}, "Import error for parsed file should be cleared" + @pytest.mark.need_serialized_dag(False) @pytest.mark.parametrize( ("attrs", "expected"),