Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,21 +285,22 @@ def _update_import_errors(
):
from airflow.listeners.listener import get_listener_manager

# We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old
# errors (i.e. from files that are removed) separately

session.execute(
delete(ParseImportError).where(
tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_parsed)
)
)

# the below query has to match (bundle_name, filename) tuple in that order since the
# import_errors list is a dict with keys as (bundle_name, relative_fileloc)
# Check existing import errors BEFORE deleting, so we can determine if we should update or create
existing_import_error_files = set(
session.execute(select(ParseImportError.bundle_name, ParseImportError.filename))
)
# Add the errors of the processed files

# Delete errors for files that were parsed but don't have errors in import_errors
# (i.e., files that were successfully parsed without errors)
files_to_clear = files_parsed.difference(import_errors)
if files_to_clear:
session.execute(
delete(ParseImportError).where(
tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_to_clear)
)
)

# Add or update the errors of the processed files
for key, stacktrace in import_errors.items():
bundle_name_, relative_fileloc = key

Expand Down Expand Up @@ -371,6 +372,7 @@ def update_dag_parsing_results_in_db(
session: Session,
*,
warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,),
files_parsed: set[tuple[str, str]] | None = None,
):
"""
Update everything to do with DAG parsing in the DB.
Expand All @@ -388,6 +390,10 @@ def update_dag_parsing_results_in_db(
then all warnings and errors related to this file will be removed.

``import_errors`` will be updated in place with an new errors

:param files_parsed: Set of (bundle_name, relative_fileloc) tuples for all files that were parsed.
If None, will be inferred from dags and import_errors. Passing this explicitly ensures that
import errors are cleared for files that were parsed but no longer contain DAGs.
"""
# Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
# of any Operational Errors
Expand Down Expand Up @@ -423,16 +429,8 @@ def update_dag_parsing_results_in_db(
import_errors.update(serialize_errors)
# Record import errors into the ORM - we don't retry on this one as it's not as critical that it works
try:
# TODO: This won't clear errors for files that exist that no longer contain DAGs. Do we need to pass
# in the list of file parsed?

good_dag_filelocs = {
(bundle_name, dag.relative_fileloc)
for dag in dags
if dag.relative_fileloc is not None and (bundle_name, dag.relative_fileloc) not in import_errors
}
_update_import_errors(
files_parsed=good_dag_filelocs,
files_parsed=files_parsed if files_parsed is not None else set(),
bundle_name=bundle_name,
import_errors=import_errors,
session=session,
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/dag_processing/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,16 @@ def sync_bag_to_db(
from airflow.dag_processing.collection import update_dag_parsing_results_in_db

import_errors = {(bundle_name, rel_path): error for rel_path, error in dagbag.import_errors.items()}

# Build the set of all files that were parsed and include files with import errors
# in case they are not in file_last_changed
files_parsed = set(import_errors)
if dagbag.bundle_path:
files_parsed.update(
(bundle_name, dagbag._get_relative_fileloc(abs_filepath))
for abs_filepath in dagbag.file_last_changed
)

update_dag_parsing_results_in_db(
bundle_name,
bundle_version,
Expand All @@ -703,4 +713,5 @@ def sync_bag_to_db(
None, # file parsing duration is not well defined when parsing multiple files / multiple DAGs.
dagbag.dag_warnings,
session=session,
files_parsed=files_parsed,
)
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ def _collect_results(self, session: Session = NEW_SESSION):
parsing_result=proc.parsing_result,
session=session,
is_callback_only=is_callback_only,
relative_fileloc=str(file.rel_path),
)

for file in finished:
Expand Down Expand Up @@ -1147,6 +1148,7 @@ def process_parse_results(
session: Session,
*,
is_callback_only: bool = False,
relative_fileloc: str | None = None,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert it into a DagFileStat."""
if is_callback_only:
Expand Down Expand Up @@ -1181,6 +1183,14 @@ def process_parse_results(
import_errors = {
(bundle_name, rel_path): error for rel_path, error in parsing_result.import_errors.items()
}

# Build the set of files that were parsed. This includes the file that was parsed,
# even if it no longer contains DAGs, so we can clear old import errors.
files_parsed: set[tuple[str, str]] | None = None
if relative_fileloc is not None:
files_parsed = {(bundle_name, relative_fileloc)}
files_parsed.update(import_errors.keys())

update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=bundle_version,
Expand All @@ -1189,6 +1199,7 @@ def process_parse_results(
parse_duration=run_duration,
warnings=set(parsing_result.warnings or []),
session=session,
files_parsed=files_parsed,
)
stat.num_dags = len(parsing_result.serialized_dags)
if parsing_result.import_errors:
Expand Down
54 changes: 53 additions & 1 deletion airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,9 @@ def test_serialized_dag_errors_are_import_errors(
mock_full_path.return_value = "abc.py"

import_errors = {}
update_dag_parsing_results_in_db("testing", None, [dag], import_errors, None, set(), session)
update_dag_parsing_results_in_db(
"testing", None, [dag], import_errors, None, set(), session, files_parsed={("testing", "abc.py")}
)
assert "SerializationError" in caplog.text

# Should have been edited in place
Expand Down Expand Up @@ -656,6 +658,7 @@ def test_new_import_error_replaces_old(
parse_duration=None,
warnings=set(),
session=session,
files_parsed={("testing", "abc.py")},
)

import_error = (
Expand Down Expand Up @@ -714,6 +717,7 @@ def test_remove_error_clears_import_error(self, testing_dag_bundle, session):
parse_duration=None,
warnings=set(),
session=session,
files_parsed={(bundle_name, "abc.py")},
)
dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
assert dag_model.has_import_errors is False
Expand Down Expand Up @@ -758,6 +762,7 @@ def test_remove_error_updates_loaded_dag_model(self, testing_dag_bundle, session
parse_duration=None,
warnings=set(),
session=session,
files_parsed={(bundle_name, "abc.py")},
)
dag_model = session.get(DagModel, (dag.dag_id,))
assert dag_model.has_import_errors is True
Expand All @@ -774,6 +779,53 @@ def test_remove_error_updates_loaded_dag_model(self, testing_dag_bundle, session
)
assert dag_model.has_import_errors is False

@pytest.mark.usefixtures("clean_db")
def test_clear_import_error_for_file_without_dags(self, testing_dag_bundle, session):
"""
Test that import errors are cleared for files that were parsed but no longer contain DAGs.
"""
bundle_name = "testing"
filename = "no_dags.py"

prev_error = ParseImportError(
filename=filename,
bundle_name=bundle_name,
timestamp=tz.utcnow(),
stacktrace="Previous import error",
)
session.add(prev_error)

# And import error for another file we haven't parsed (this shouldn't be deleted)
other_file_error = ParseImportError(
filename="other.py",
bundle_name=bundle_name,
timestamp=tz.utcnow(),
stacktrace="Some error",
)
session.add(other_file_error)
session.flush()

import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name)))
assert import_errors == {("no_dags.py", bundle_name), ("other.py", bundle_name)}

# Simulate parsing the file: it was parsed successfully (no import errors),
# but it no longer contains any DAGs. By passing files_parsed, we ensure
# the import error is cleared even though there are no DAGs.
files_parsed = {(bundle_name, filename)}
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[], # No DAGs in this file
import_errors={}, # No import errors
parse_duration=None,
warnings=set(),
session=session,
files_parsed=files_parsed,
)

import_errors = set(session.execute(select(ParseImportError.filename, ParseImportError.bundle_name)))
assert import_errors == {("other.py", bundle_name)}, "Import error for parsed file should be cleared"

@pytest.mark.need_serialized_dag(False)
@pytest.mark.parametrize(
("attrs", "expected"),
Expand Down