Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@

import itertools
import logging
import traceback
from typing import TYPE_CHECKING, NamedTuple

from sqlalchemy import and_, exists, func, select, tuple_
from sqlalchemy import and_, delete, exists, func, select, tuple_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, load_only

from airflow.assets.manager import asset_manager
Expand All @@ -45,9 +47,12 @@
)
from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.trigger import Trigger
from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.triggers.base import BaseTrigger
from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
from airflow.utils.sqlalchemy import with_row_locks
from airflow.utils.timezone import utcnow
from airflow.utils.types import DagRunType
Expand All @@ -58,6 +63,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.models.dagwarning import DagWarning
from airflow.typing_compat import Self

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -163,6 +169,181 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se
)


def _serialize_dag_capturing_errors(dag: DAG, session: Session, processor_subdir: str | None):
"""
Try to serialize the dag to the DB, but make a note of any errors.

We can't place them directly in import_errors, as this may be retried, and work the next time
"""
from airflow import settings
from airflow.configuration import conf
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel

try:
# We can't use bulk_write_to_db as we want to capture each error individually
dag_was_updated = SerializedDagModel.write_dag(
dag,
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
processor_subdir=processor_subdir,
)
if dag_was_updated:
_sync_dag_perms(dag, session=session)
else:
# Check and update DagCode
DagCode.update_source_code(dag)
return []
except OperationalError:
raise
except Exception:
log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc)
dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]


def _sync_dag_perms(dag: DAG, session: Session):
"""Sync DAG specific permissions."""
dag_id = dag.dag_id

log.debug("Syncing DAG permissions: %s to the DB", dag_id)
from airflow.www.security_appless import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(dag_id, dag.access_control)


def _update_dag_warnings(
dag_ids: list[str], warnings: set[DagWarning], warning_types: tuple[DagWarningType], session: Session
):
from airflow.models.dagwarning import DagWarning

stored_warnings = set(
session.scalars(
select(DagWarning).where(
DagWarning.dag_id.in_(dag_ids),
DagWarning.warning_type.in_(warning_types),
)
)
)

for warning_to_delete in stored_warnings - warnings:
session.delete(warning_to_delete)

for warning_to_add in warnings:
session.merge(warning_to_add)


def _update_import_errors(
files_parsed: set[str], import_errors: dict[str, str], processor_subdir: str | None, session: Session
):
from airflow.listeners.listener import get_listener_manager

# We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old
# errors (i.e. from files that are removed) separately

session.execute(delete(ParseImportError).where(ParseImportError.filename.in_(list(files_parsed))))

query = select(ParseImportError.filename).where(ParseImportError.processor_subdir == processor_subdir)
existing_import_error_files = set(session.scalars(query))

# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
if filename in existing_import_error_files:
session.query(ParseImportError).where(ParseImportError.filename == filename).update(
{"filename": filename, "timestamp": utcnow(), "stacktrace": stacktrace},
)
# sending notification when an existing dag import error occurs
get_listener_manager().hook.on_existing_dag_import_error(filename=filename, stacktrace=stacktrace)
else:
session.add(
ParseImportError(
filename=filename,
timestamp=utcnow(),
stacktrace=stacktrace,
processor_subdir=processor_subdir,
)
)
# sending notification when a new dag import error occurs
get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace)
session.query(DagModel).filter(DagModel.fileloc == filename).update({"has_import_errors": True})


def update_dag_parsing_results_in_db(
dags: Collection[DAG],
import_errors: dict[str, str],
processor_subdir: str | None,
warnings: set[DagWarning],
session: Session,
*,
warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,),
):
"""
Update everything to do with DAG parsing in the DB.

This function will create or update rows in the following tables:

- DagModel (`dag` table), DagTag, DagCode and DagVersion
- SerializedDagModel (`serialized_dag` table)
- ParseImportError (including with any errors as a result of serialization, not just parsing)
- DagWarning
- DAG Permissions

This function will not remove any rows for dags not passed in. It will remove parse errors and warnings
from dags/dag files that are passed in. In order words, if a DAG is passed in with a fileloc of `a.py`
then all warnings and errors related to this file will be removed.

``import_errors`` will be updated in place with an new errors
"""
# Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
# of any Operational Errors
# In case of failures, provide_session handles rollback
for attempt in run_with_db_retries(logger=log):
with attempt:
serialize_errors = []
log.debug(
"Running dagbag.bulk_write_to_db with retries. Try %d of %d",
attempt.retry_state.attempt_number,
MAX_DB_RETRIES,
)
log.debug("Calling the DAG.bulk_sync_to_db method")
try:
DAG.bulk_write_to_db(dags, processor_subdir=processor_subdir, session=session)
# Write Serialized DAGs to DB, capturing errors
# Write Serialized DAGs to DB, capturing errors
for dag in dags:
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session, processor_subdir))
except OperationalError:
session.rollback()
raise
# Only now we are "complete" do we update import_errors - don't want to record errors from
# previous failed attempts
import_errors.update(dict(serialize_errors))

# Record import errors into the ORM - we don't retry on this one as it's not as critical that it works
try:
# TODO: This won't clear errors for files that exist that no longer contain DAGs. Do we need to pass
# in the list of file parsed?

good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in import_errors}
_update_import_errors(
files_parsed=good_dag_filelocs,
import_errors=import_errors,
processor_subdir=processor_subdir,
session=session,
)
except Exception:
log.exception("Error logging import errors!")

# Record DAG warnings in the metadatabase.
try:
_update_dag_warnings([dag.dag_id for dag in dags], warnings, warning_types, session)
except Exception:
log.exception("Error logging DAG warnings.")

session.flush()


class DagModelOperation(NamedTuple):
"""Collect DAG objects and perform database operations for them."""

Expand Down
20 changes: 6 additions & 14 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,10 +661,7 @@ def _refresh_dag_dir(self) -> bool:
self.set_file_paths(self._file_paths)

try:
self.log.debug("Removing old import errors")
DagFileProcessorManager.clear_nonexistent_import_errors(
file_paths=self._file_paths, processor_subdir=self.get_dag_directory()
)
self.clear_nonexistent_import_errors()
except Exception:
self.log.exception("Error removing old import errors")

Expand Down Expand Up @@ -702,24 +699,19 @@ def _print_stat(self):
self._log_file_processing_stats(self._file_paths)
self.last_stat_print_time = time.monotonic()

@staticmethod
@provide_session
def clear_nonexistent_import_errors(
file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION
):
def clear_nonexistent_import_errors(self, session=NEW_SESSION):
"""
Clear import errors for files that no longer exist.

:param file_paths: list of paths to DAG definition files
:param session: session for ORM operations
"""
query = delete(ParseImportError)
self.log.debug("Removing old import errors")
query = delete(ParseImportError).where(ParseImportError.processor_subdir == self.get_dag_directory())

if file_paths:
query = query.where(
~ParseImportError.filename.in_(file_paths),
ParseImportError.processor_subdir == processor_subdir,
)
if self._file_paths:
query = query.where(ParseImportError.filename.notin_(self._file_paths))

session.execute(query.execution_options(synchronize_session="fetch"))
session.commit()
Expand Down
Loading