Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
session.execute(
delete(ParseImportError)
.where(
ParseImportError.filename == dag.fileloc,
ParseImportError.filename == dag.relative_fileloc,
ParseImportError.bundle_name == dag.bundle_name,
)
.execution_options(synchronize_session="fetch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,21 @@ def get_import_errors(
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
readable_dag_ids = auth_manager.get_authorized_dag_ids(method="GET", user=user)
# Build a cte that fetches dag_ids for each file location
visiable_files_cte = (
select(DagModel.fileloc, DagModel.dag_id).where(DagModel.dag_id.in_(readable_dag_ids)).cte()
visible_files_cte = (
select(DagModel.relative_fileloc, DagModel.dag_id, DagModel.bundle_name)
.where(DagModel.dag_id.in_(readable_dag_ids))
.cte()
)

# Prepare the import errors query by joining with the cte.
# Each returned row will be a tuple: (ParseImportError, dag_id)
import_errors_stmt = (
select(ParseImportError, visiable_files_cte.c.dag_id)
.join(visiable_files_cte, ParseImportError.filename == visiable_files_cte.c.fileloc)
select(ParseImportError, visible_files_cte.c.dag_id)
.join(
visible_files_cte,
ParseImportError.filename == visible_files_cte.c.relative_fileloc,
ParseImportError.bundle_name == visible_files_cte.c.bundle_name,
)
.order_by(ParseImportError.id)
)

Expand Down
86 changes: 56 additions & 30 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,12 @@ def _serialize_dag_capturing_errors(
except Exception:
log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc)
dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
# todo AIP-66: this needs to use bundle name / rel fileloc instead
return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
return [
(
(bundle_name, dag.relative_fileloc),
traceback.format_exc(limit=-dagbag_import_error_traceback_depth),
)
]


def _sync_dag_perms(dag: MaybeSerializedDAG, session: Session):
Expand Down Expand Up @@ -245,7 +249,10 @@ def _update_dag_warnings(


def _update_import_errors(
files_parsed: set[str], bundle_name: str, import_errors: dict[str, str], session: Session
files_parsed: set[tuple[str, str]],
bundle_name: str,
import_errors: dict[tuple[str, str], str],
session: Session,
):
from airflow.listeners.listener import get_listener_manager

Expand All @@ -254,51 +261,67 @@ def _update_import_errors(

session.execute(
delete(ParseImportError).where(
ParseImportError.filename.in_(list(files_parsed)), ParseImportError.bundle_name == bundle_name
tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_parsed)
)
)

# the below query has to match (bundle_name, filename) tuple in that order since the
# import_errors list is a dict with keys as (bundle_name, relative_fileloc)
existing_import_error_files = set(
session.execute(select(ParseImportError.filename, ParseImportError.bundle_name))
session.execute(select(ParseImportError.bundle_name, ParseImportError.filename))
)

# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
if (filename, bundle_name) in existing_import_error_files:
session.query(ParseImportError).where(
ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name
).update(
{
"filename": filename,
"bundle_name": bundle_name,
"timestamp": utcnow(),
"stacktrace": stacktrace,
},
for key, stacktrace in import_errors.items():
bundle_name_, relative_fileloc = key

if key in existing_import_error_files:
session.execute(
update(ParseImportError)
.where(
ParseImportError.filename == relative_fileloc,
ParseImportError.bundle_name == bundle_name_,
)
.values(
filename=relative_fileloc,
bundle_name=bundle_name_,
timestamp=utcnow(),
stacktrace=stacktrace,
),
)
# sending notification when an existing dag import error occurs
try:
# todo: make listener accept bundle_name and relative_filename
import_error = session.scalar(
select(ParseImportError).where(
ParseImportError.bundle_name == bundle_name_,
ParseImportError.filename == relative_fileloc,
)
)
get_listener_manager().hook.on_existing_dag_import_error(
filename=filename, stacktrace=stacktrace
filename=import_error.full_file_path(), stacktrace=stacktrace
)
except Exception:
log.exception("error calling listener")
else:
session.add(
ParseImportError(
filename=filename,
bundle_name=bundle_name,
timestamp=utcnow(),
stacktrace=stacktrace,
)
import_error = ParseImportError(
filename=relative_fileloc,
bundle_name=bundle_name,
timestamp=utcnow(),
stacktrace=stacktrace,
)
session.add(import_error)
# sending notification when a new dag import error occurs
try:
get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace)
get_listener_manager().hook.on_new_dag_import_error(
filename=import_error.full_file_path(), stacktrace=stacktrace
)
except Exception:
log.exception("error calling listener")
session.execute(
update(DagModel)
.where(DagModel.fileloc == filename)
.where(
DagModel.relative_fileloc == relative_fileloc,
)
.values(
has_import_errors=True,
bundle_name=bundle_name,
Expand All @@ -312,7 +335,7 @@ def update_dag_parsing_results_in_db(
bundle_name: str,
bundle_version: str | None,
dags: Collection[MaybeSerializedDAG],
import_errors: dict[str, str],
import_errors: dict[tuple[str, str], str],
warnings: set[DagWarning],
session: Session,
*,
Expand Down Expand Up @@ -362,13 +385,16 @@ def update_dag_parsing_results_in_db(
# Only now we are "complete" do we update import_errors - don't want to record errors from
# previous failed attempts
import_errors.update(dict(serialize_errors))

# Record import errors into the ORM - we don't retry on this one as it's not as critical that it works
try:
# TODO: This won't clear errors for files that exist that no longer contain DAGs. Do we need to pass
# in the list of file parsed?

good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in import_errors}
good_dag_filelocs = {
(bundle_name, dag.relative_fileloc)
for dag in dags
if dag.relative_fileloc is not None and (bundle_name, dag.relative_fileloc) not in import_errors
}
_update_import_errors(
files_parsed=good_dag_filelocs,
bundle_name=bundle_name,
Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files)
self.clear_orphaned_import_errors(
bundle_name=bundle.name,
observed_filelocs={str(x.absolute_path) for x in found_files}, # todo: make relative
observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative
)

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
Expand Down Expand Up @@ -1141,11 +1141,16 @@ def process_parse_results(
stat.import_errors = 1
else:
# record DAGs and import errors to database
import_errors = {}
if parsing_result.import_errors:
import_errors = {
(bundle_name, rel_path): error for rel_path, error in parsing_result.import_errors.items()
}
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=bundle_version,
dags=parsing_result.serialized_dags,
import_errors=parsing_result.import_errors or {},
import_errors=import_errors,
warnings=set(parsing_result.warnings or []),
session=session,
)
Expand Down
38 changes: 26 additions & 12 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(
bundle_path: Path | None = None,
):
super().__init__()
self.bundle_path: Path | None = bundle_path
self.bundle_path = bundle_path
include_examples = (
include_examples
if isinstance(include_examples, bool)
Expand All @@ -145,6 +145,7 @@ def __init__(
self.dags: dict[str, DAG] = {}
# the file's last modified timestamp when we last read it
self.file_last_changed: dict[str, datetime] = {}
# Store import errors with relative file paths as keys (relative to bundle_path)
self.import_errors: dict[str, str] = {}
self.captured_warnings: dict[str, tuple[str, ...]] = {}
self.has_logged = False
Expand Down Expand Up @@ -356,14 +357,26 @@ def get_pools(dag) -> dict[str, set[str]]:
)
return warnings

def _get_relative_fileloc(self, filepath: str) -> str:
"""
Get the relative file location for a given filepath.

:param filepath: Absolute path to the file
:return: Relative path from bundle_path, or original filepath if no bundle_path
"""
if self.bundle_path:
return str(Path(filepath).relative_to(self.bundle_path))
return filepath

def _load_modules_from_file(self, filepath, safe_mode):
from airflow.sdk.definitions._internal.contextmanager import DagContext

def handler(signum, frame):
"""Handle SIGSEGV signal and let the user know that the import failed."""
msg = f"Received SIGSEGV signal while processing {filepath}."
self.log.error(msg)
self.import_errors[filepath] = msg
relative_filepath = self._get_relative_fileloc(filepath)
self.import_errors[relative_filepath] = msg

try:
signal.signal(signal.SIGSEGV, handler)
Expand Down Expand Up @@ -402,12 +415,13 @@ def parse(mod_name, filepath):
# This would also catch `exit()` in a dag file
DagContext.autoregistered_dags.clear()
self.log.exception("Failed to import: %s", filepath)
relative_filepath = self._get_relative_fileloc(filepath)
if self.dagbag_import_error_tracebacks:
self.import_errors[filepath] = traceback.format_exc(
self.import_errors[relative_filepath] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[filepath] = str(e)
self.import_errors[relative_filepath] = str(e)
return []

dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
Expand Down Expand Up @@ -467,12 +481,13 @@ def _load_modules_from_zip(self, filepath, safe_mode):
DagContext.autoregistered_dags.clear()
fileloc = os.path.join(filepath, zip_info.filename)
self.log.exception("Failed to import: %s", fileloc)
relative_fileloc = self._get_relative_fileloc(fileloc)
if self.dagbag_import_error_tracebacks:
self.import_errors[fileloc] = traceback.format_exc(
self.import_errors[relative_fileloc] = traceback.format_exc(
limit=-self.dagbag_import_error_traceback_depth
)
else:
self.import_errors[fileloc] = str(e)
self.import_errors[relative_fileloc] = str(e)
finally:
if sys.path[0] == filepath:
del sys.path[0]
Expand All @@ -494,18 +509,16 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):

for dag, mod in top_level_dags:
dag.fileloc = mod.__file__
if self.bundle_path:
dag.relative_fileloc = str(Path(mod.__file__).relative_to(self.bundle_path))
else:
dag.relative_fileloc = dag.fileloc
relative_fileloc = self._get_relative_fileloc(dag.fileloc)
dag.relative_fileloc = relative_fileloc
try:
dag.validate()
self.bag_dag(dag=dag)
except AirflowClusterPolicySkipDag:
pass
except Exception as e:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
self.import_errors[relative_fileloc] = f"{type(e).__name__}: {e}"
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
else:
found_dags.append(dag)
Expand Down Expand Up @@ -665,12 +678,13 @@ def sync_to_db(self, bundle_name: str, bundle_version: str | None, session: Sess
else LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
for dag in self.dags.values()
]
import_errors = {(bundle_name, rel_path): error for rel_path, error in self.import_errors.items()}

update_dag_parsing_results_in_db(
bundle_name,
bundle_version,
dags,
self.import_errors,
import_errors,
self.dag_warnings,
session=session,
)
Expand Down
8 changes: 7 additions & 1 deletion airflow-core/src/airflow/models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from sqlalchemy import Column, Integer, String, Text

from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.models.base import Base, StringID
from airflow.utils.sqlalchemy import UtcDateTime

Expand All @@ -29,6 +30,11 @@ class ParseImportError(Base):
__tablename__ = "import_error"
id = Column(Integer, primary_key=True)
timestamp = Column(UtcDateTime)
filename = Column(String(1024)) # todo AIP-66: make this bundle and relative fileloc
filename = Column(String(1024))
bundle_name = Column(StringID())
stacktrace = Column(Text)

def full_file_path(self) -> str:
"""Return the full file path of the dag."""
bundle = DagBundlesManager().get_bundle(self.bundle_name)
return "/".join([str(bundle.path), self.filename])
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
@pytest.fixture(scope="class")
@provide_session
def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id1", is_paused=False)
dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, dag_id="dag_id1", is_paused=False)
session.add(dag_model)
session.commit()
return dag_model
Expand All @@ -60,7 +60,7 @@ def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
@pytest.fixture(scope="class")
@provide_session
def not_permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id4", is_paused=False)
dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, dag_id="dag_id4", is_paused=False)
session.add(dag_model)
session.commit()
return dag_model
Expand Down
Loading