Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show import errors in DAG views #17818

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import markdown
import sqlalchemy as sqla
from flask import Markup, Response, request, url_for
from flask.helpers import flash
from flask_appbuilder.forms import FieldConverter
from flask_appbuilder.models.sqla import filters as fab_sqlafilters
from flask_appbuilder.models.sqla.interface import SQLAInterface
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter

from airflow.models import errors
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
from airflow.utils.json import AirflowJsonEncoder
Expand All @@ -37,6 +39,14 @@
from airflow.www.widgets import AirflowDateTimePickerWidget


def check_import_errors(fileloc, session):
# Check dag import errors
import_errors = session.query(errors.ImportError).filter(errors.ImportError.filename == fileloc).all()
if import_errors:
for import_error in import_errors:
flash("Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error), "dag_import_error")


def get_sensitive_variables_fields():
import warnings

Expand Down
58 changes: 37 additions & 21 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,8 @@ def code(self, session=None):
escape(all_errors)
)

wwwutils.check_import_errors(dag_orm.fileloc, session)

return self.render_template(
'airflow/dag_code.html',
html_code=html_code,
Expand All @@ -977,6 +979,8 @@ def dag_details(self, session=None):
title = "DAG Details"
root = request.args.get('root', '')

wwwutils.check_import_errors(dag.fileloc, session)

states = (
session.query(TaskInstance.state, sqla.func.count(TaskInstance.dag_id))
.filter(TaskInstance.dag_id == dag_id)
Expand Down Expand Up @@ -2163,13 +2167,15 @@ def recurse_nodes(task, visited):
)
@gzipped
@action_logging
def tree(self):
@provide_session
def tree(self, session=None):
"""Get Dag as tree."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
return redirect(url_for('Airflow.index'))
wwwutils.check_import_errors(dag.fileloc, session)

root = request.args.get('root')
if root:
Expand All @@ -2184,14 +2190,13 @@ def tree(self):
except (KeyError, ValueError):
base_date = dag.get_latest_execution_date() or timezone.utcnow()

with create_session() as session:
dag_runs = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)
.order_by(DagRun.execution_date.desc())
.limit(num_runs)
.all()
)
dag_runs = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)
.order_by(DagRun.execution_date.desc())
.limit(num_runs)
.all()
)
dag_runs = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}

max_date = max(dag_runs.keys(), default=None)
Expand Down Expand Up @@ -2238,7 +2243,8 @@ def tree(self):
)
@gzipped
@action_logging
def calendar(self):
@provide_session
def calendar(self, session=None):
"""Get DAG runs as calendar"""

def _convert_to_date(session, column):
Expand All @@ -2254,22 +2260,23 @@ def _convert_to_date(session, column):
flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
return redirect(url_for('Airflow.index'))

wwwutils.check_import_errors(dag.fileloc, session)

root = request.args.get('root')
if root:
dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True)

with create_session() as session:
dag_states = (
session.query(
(_convert_to_date(session, DagRun.execution_date)).label('date'),
DagRun.state,
func.count('*').label('count'),
)
.filter(DagRun.dag_id == dag.dag_id)
.group_by(_convert_to_date(session, DagRun.execution_date), DagRun.state)
.order_by(_convert_to_date(session, DagRun.execution_date).asc())
.all()
dag_states = (
session.query(
(_convert_to_date(session, DagRun.execution_date)).label('date'),
DagRun.state,
func.count('*').label('count'),
)
.filter(DagRun.dag_id == dag.dag_id)
.group_by(_convert_to_date(session, DagRun.execution_date), DagRun.state)
.order_by(_convert_to_date(session, DagRun.execution_date).asc())
.all()
)

dag_states = [
{
Expand Down Expand Up @@ -2319,6 +2326,7 @@ def graph(self, session=None):
if not dag:
flash(f'DAG "{dag_id}" seems to be missing.', "error")
return redirect(url_for('Airflow.index'))
wwwutils.check_import_errors(dag.fileloc, session)

root = request.args.get('root')
if root:
Expand Down Expand Up @@ -2412,6 +2420,8 @@ def duration(self, session=None):
flash(f'DAG "{dag_id}" seems to be missing.', "error")
return redirect(url_for('Airflow.index'))

wwwutils.check_import_errors(dag.fileloc, session)

base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs', default=default_dag_run, type=int)

Expand Down Expand Up @@ -2543,6 +2553,8 @@ def tries(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()

wwwutils.check_import_errors(dag.fileloc, session)

root = request.args.get('root')
if root:
dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)
Expand Down Expand Up @@ -2615,6 +2627,8 @@ def landing_times(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()

wwwutils.check_import_errors(dag.fileloc, session)

root = request.args.get('root')
if root:
dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)
Expand Down Expand Up @@ -2709,6 +2723,8 @@ def gantt(self, session=None):
if root:
dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)

wwwutils.check_import_errors(dag.fileloc, session)

dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dttm = dt_nr_dr_data['dttm']

Expand Down