+
{{ dag_docs(doc_md) }}
{{ dataset_next_run_modal(id='dataset-next-run-modal') }}
diff --git a/airflow/www/templates/airflow/dag_audit_log.html b/airflow/www/templates/airflow/dag_audit_log.html
deleted file mode 100644
index 07766ea32d2fb..0000000000000
--- a/airflow/www/templates/airflow/dag_audit_log.html
+++ /dev/null
@@ -1,119 +0,0 @@
-{#
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-#}
-
-{% extends "airflow/dag.html" %}
-{% block title %}Dag Audit Log{% endblock %}
-
-{%- macro sortable_column(display_name, attribute_name) -%}
- {% set curr_ordering_direction = (request.args.get('sorting_direction', 'desc')) %}
- {% set new_ordering_direction = ('asc' if (request.args.get('sorting_key') != attribute_name or curr_ordering_direction == 'desc') else 'desc') %}
-
- {{ display_name }}
-
-
- {% if curr_ordering_direction == 'desc' and request.args.get('sorting_key') == attribute_name %}
- expand_more
- {% elif curr_ordering_direction == 'asc' and request.args.get('sorting_key') == attribute_name %}
- expand_less
- {% else %}
- unfold_more
- {% endif %}
-
-
-{%- endmacro -%}
-
-{% block head_css %}
-{{ super() }}
-
-{% endblock %}
-
-{% block content %}
- {{ super() }}
-
Dag Audit Log
-
-
- This view displays selected events and operations that have been taken on this dag.
- The included and excluded events are set in the Airflow configuration,
- which by default remove view only actions. For a full list of events regarding this DAG, click
- here.
-
{{ log.execution_date if log.execution_date else None }}
-
-
{{ log.run_id if log.run_id else None }}
-
-
{{ log.owner if log.owner else None }}
-
-
{{ log.extra if log.extra else None }}
-
- {% endfor %}
-
-
-
-
- {{ paging }}
-
-
- Showing {{ num_log_from }}-{{ num_log_to }} of {{ audit_logs_count }} Dag Audit Log
-
-
-
-
-
-
-
-{% endblock %}
-
-{% block tail %}
- {{ super() }}
-
-
-{% endblock %}
diff --git a/airflow/www/templates/airflow/duration_chart.html b/airflow/www/templates/airflow/duration_chart.html
deleted file mode 100644
index 97f7d0b03fcdb..0000000000000
--- a/airflow/www/templates/airflow/duration_chart.html
+++ /dev/null
@@ -1,71 +0,0 @@
-{#
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-#}
-
-{% extends "airflow/dag.html" %}
-{% block page_title %}{{ dag.dag_id }} - Duration chart - {{ appbuilder.app_name }}{% endblock %}
-
-{% block head_css %}
- {{ super() }}
-
-
-
-
-{% endblock %}
-
-{% block content %}
- {{ super() }}
-
-
-
-
-
-
-
-
-
-
-
{{ chart }}
-
{{ cum_chart }}
-{% endblock %}
-
-{% block tail %}
-
- {{ super() }}
-{% endblock %}
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index c7e64fdba5599..8cbb19d17694a 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -655,17 +655,6 @@ def get_attr_renderer():
}
-def get_chart_height(dag):
- """
- Use the number of tasks in the DAG to approximate the size of generated chart.
-
- Without this the charts are tiny and unreadable when DAGs have a large number of tasks).
- Ideally nvd3 should allow for dynamic-height charts, that is charts that take up space
- based on the size of the components within.
- """
- return 600 + len(dag.tasks) * 10
-
-
class UtcAwareFilterMixin:
"""Mixin for filter for UTC time."""
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 00361f397e491..5a8da5f28a10d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -41,7 +41,6 @@
import configupdater
import flask.json
import lazy_object_proxy
-import nvd3
import re2
import sqlalchemy as sqla
from croniter import croniter
@@ -102,7 +101,7 @@
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, Trigger, XCom, errors
+from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom, errors
from airflow.models.dag import get_dataset_triggered_next_run_info
from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel
@@ -120,7 +119,6 @@
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
-from airflow.utils.dates import infer_time_unit, scale_time_units
from airflow.utils.db import get_query_count
from airflow.utils.docs import get_doc_url_for_provider, get_docs_url
from airflow.utils.helpers import exactly_one
@@ -140,7 +138,6 @@
from airflow.www.forms import (
DagRunEditForm,
DateTimeForm,
- DateTimeWithNumRunsForm,
TaskInstanceEditForm,
create_connection_form_class,
)
@@ -167,7 +164,7 @@
logger = logging.getLogger(__name__)
-def sanitize_args(args: dict[str, str]) -> dict[str, str]:
+def sanitize_args(args: dict[str, Any]) -> dict[str, Any]:
"""
Remove all parameters starting with `_`.
@@ -2835,111 +2832,11 @@ def legacy_calendar(self):
return redirect(url_for("Airflow.calendar", **sanitize_args(request.args)))
@expose("/dags//calendar")
- @auth.has_access_dag("GET", DagAccessEntity.RUN)
- @gzipped
- @provide_session
- def calendar(self, dag_id: str, session: Session = NEW_SESSION):
- """Get DAG runs as calendar."""
- dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
- dag_model = DagModel.get_dagmodel(dag_id, session=session)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
- return redirect(url_for("Airflow.index"))
-
- wwwutils.check_import_errors(dag.fileloc, session)
- wwwutils.check_dag_warnings(dag.dag_id, session)
-
- root = request.args.get("root")
- if root:
- dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True)
-
- dag_states = session.execute(
- select(
- func.date(DagRun.execution_date).label("date"),
- DagRun.state,
- func.max(DagRun.data_interval_start).label("data_interval_start"),
- func.max(DagRun.data_interval_end).label("data_interval_end"),
- func.count("*").label("count"),
- )
- .where(DagRun.dag_id == dag.dag_id)
- .group_by(func.date(DagRun.execution_date), DagRun.state)
- .order_by(func.date(DagRun.execution_date).asc())
- ).all()
-
- data_dag_states = [
- {
- # DATE() in SQLite and MySQL behave differently:
- # SQLite returns a string, MySQL returns a date.
- "date": dr.date if isinstance(dr.date, str) else dr.date.isoformat(),
- "state": dr.state,
- "count": dr.count,
- }
- for dr in dag_states
- ]
-
- if dag_states and dag_states[-1].data_interval_start and dag_states[-1].data_interval_end:
- last_automated_data_interval = DataInterval(
- timezone.coerce_datetime(dag_states[-1].data_interval_start),
- timezone.coerce_datetime(dag_states[-1].data_interval_end),
- )
-
- year = last_automated_data_interval.end.year
- restriction = TimeRestriction(dag.start_date, dag.end_date, False)
- dates: dict[datetime.date, int] = collections.Counter()
-
- if isinstance(dag.timetable, CronMixin):
- # Optimized calendar generation for timetables based on a cron expression.
- dates_iter: Iterator[datetime.datetime | None] = croniter(
- dag.timetable._expression,
- start_time=last_automated_data_interval.end,
- ret_type=datetime.datetime,
- )
- for dt in dates_iter:
- if dt is None:
- break
- if dt.year != year:
- break
- if dag.end_date and dt > dag.end_date:
- break
- dates[dt.date()] += 1
- else:
- prev_logical_date = DateTime.min
- while True:
- curr_info = dag.timetable.next_dagrun_info(
- last_automated_data_interval=last_automated_data_interval,
- restriction=restriction,
- )
- if curr_info is None:
- break # Reached the end.
- if curr_info.logical_date <= prev_logical_date:
- break # We're not progressing. Maybe a malformed timetable? Give up.
- if curr_info.logical_date.year != year:
- break # Crossed the year boundary.
- last_automated_data_interval = curr_info.data_interval
- dates[curr_info.logical_date] += 1
- prev_logical_date = curr_info.logical_date
-
- data_dag_states.extend(
- {"date": date.isoformat(), "state": "planned", "count": count}
- for (date, count) in dates.items()
- )
+ def calendar(self, dag_id: str):
+ """Redirect to the replacement - grid + calendar. Kept for backwards compatibility."""
+ kwargs = {**sanitize_args(request.args), "dag_id": dag_id, "tab": "calendar"}
- now = timezone.utcnow()
- data = {
- "dag_states": data_dag_states,
- "start_date": (dag.start_date or now).date().isoformat(),
- "end_date": (dag.end_date or now).date().isoformat(),
- }
-
- return self.render_template(
- "airflow/calendar.html",
- dag=dag,
- show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
- doc_md=wwwutils.wrapped_markdown(getattr(dag, "doc_md", None)),
- data=htmlsafe_json_dumps(data, separators=(",", ":")), # Avoid spaces to reduce payload size.
- root=root,
- dag_model=dag_model,
- )
+ return redirect(url_for("Airflow.grid", **kwargs))
@expose("/object/calendar_data")
@auth.has_access_dag("GET", DagAccessEntity.RUN)
@@ -3079,150 +2976,9 @@ def legacy_duration(self):
return redirect(url_for("Airflow.duration", **sanitize_args(request.args)))
@expose("/dags//duration")
- @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
- @provide_session
- def duration(self, dag_id: str, session: Session = NEW_SESSION):
- """Get Dag as duration graph."""
- dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
- dag_model = DagModel.get_dagmodel(dag_id, session=session)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing.', "error")
- return redirect(url_for("Airflow.index"))
-
- wwwutils.check_import_errors(dag.fileloc, session)
- wwwutils.check_dag_warnings(dag.dag_id, session)
-
- default_dag_run = conf.getint("webserver", "default_dag_run_display_number")
- base_date_str = request.args.get("base_date")
- num_runs = request.args.get("num_runs", default=default_dag_run, type=int)
-
- if base_date_str:
- base_date = _safe_parse_datetime(base_date_str)
- else:
- base_date = dag.get_latest_execution_date() or timezone.utcnow()
-
- root = request.args.get("root")
- if root:
- dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)
- chart_height = wwwutils.get_chart_height(dag)
- chart = nvd3.lineChart(
- name="lineChart",
- x_custom_format=True,
- x_axis_date=True,
- x_axis_format=LINECHART_X_AXIS_TICKFORMAT,
- height=chart_height,
- chart_attr=self.line_chart_attr,
- )
- cum_chart = nvd3.lineChart(
- name="cumLineChart",
- x_custom_format=True,
- x_axis_date=True,
- x_axis_format=LINECHART_X_AXIS_TICKFORMAT,
- height=chart_height,
- chart_attr=self.line_chart_attr,
- )
-
- y_points = defaultdict(list)
- x_points = defaultdict(list)
-
- task_instances = dag.get_task_instances_before(base_date, num_runs, session=session)
- if task_instances:
- min_date = task_instances[0].execution_date
- else:
- min_date = timezone.utc_epoch()
- ti_fails = (
- select(TaskFail)
- .join(TaskFail.dag_run)
- .where(
- TaskFail.dag_id == dag.dag_id,
- DagRun.execution_date >= min_date,
- DagRun.execution_date <= base_date,
- )
- )
- if dag.partial:
- ti_fails = ti_fails.where(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
- ti_fails = session.scalars(ti_fails)
- fails_totals: dict[tuple[str, str, str], int] = defaultdict(int)
- for failed_task_instance in ti_fails:
- dict_key = (
- failed_task_instance.dag_id,
- failed_task_instance.task_id,
- failed_task_instance.run_id,
- )
- if failed_task_instance.duration:
- fails_totals[dict_key] += failed_task_instance.duration
-
- # We must group any mapped TIs by dag_id, task_id, run_id
- def grouping_key(ti: TaskInstance):
- return ti.dag_id, ti.task_id, ti.run_id
-
- mapped_tis = set()
- for _, group in itertools.groupby(sorted(task_instances, key=grouping_key), key=grouping_key):
- tis = list(group)
- duration = sum(x.duration for x in tis if x.duration)
- if duration:
- first_ti = tis[0]
- if first_ti.map_index >= 0:
- mapped_tis.add(first_ti.task_id)
- date_time = wwwutils.epoch(first_ti.execution_date)
- x_points[first_ti.task_id].append(date_time)
- fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id)
- fails_total = fails_totals[fails_dict_key]
- y_points[first_ti.task_id].append(float(duration + fails_total))
-
- cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()}
-
- # determine the most relevant time unit for the set of task instance
- # durations for the DAG
- y_unit = infer_time_unit([d for t in y_points.values() for d in t])
- cum_y_unit = infer_time_unit([d for t in cumulative_y.values() for d in t])
- # update the y Axis on both charts to have the correct time units
- chart.create_y_axis("yAxis", format=".02f", custom_format=False, label=f"Duration ({y_unit})")
- chart.axislist["yAxis"]["axisLabelDistance"] = "-15"
- cum_chart.create_y_axis("yAxis", format=".02f", custom_format=False, label=f"Duration ({cum_y_unit})")
- cum_chart.axislist["yAxis"]["axisLabelDistance"] = "-15"
-
- for task_id in x_points:
- chart.add_serie(
- name=task_id + "[]" if task_id in mapped_tis else task_id,
- x=x_points[task_id],
- y=scale_time_units(y_points[task_id], y_unit),
- )
- cum_chart.add_serie(
- name=task_id + "[]" if task_id in mapped_tis else task_id,
- x=x_points[task_id],
- y=scale_time_units(cumulative_y[task_id], cum_y_unit),
- )
-
- max_date = max((ti.execution_date for ti in task_instances), default=None)
-
- session.commit()
-
- form = DateTimeWithNumRunsForm(
- data={
- "base_date": max_date or timezone.utcnow(),
- "num_runs": num_runs,
- }
- )
- chart.buildcontent()
- cum_chart.buildcontent()
- s_index = cum_chart.htmlcontent.rfind("});")
- cum_chart.htmlcontent = (
- f"{cum_chart.htmlcontent[:s_index]}"
- "$( document ).trigger('chartload')"
- f"{cum_chart.htmlcontent[s_index:]}"
- )
-
- return self.render_template(
- "airflow/duration_chart.html",
- dag=dag,
- show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
- root=root,
- form=form,
- chart=Markup(chart.htmlcontent),
- cum_chart=Markup(cum_chart.htmlcontent),
- dag_model=dag_model,
- )
+ def duration(self, dag_id: str):
+ """Redirect to Grid view."""
+ return redirect(url_for("Airflow.grid", dag_id=dag_id))
@expose("/tries")
def legacy_tries(self):
@@ -3230,80 +2986,13 @@ def legacy_tries(self):
return redirect(url_for("Airflow.tries", **sanitize_args(request.args)))
@expose("/dags//tries")
- @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
- @provide_session
- def tries(self, dag_id: str, session: Session = NEW_SESSION):
- """Show all tries."""
- dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
- dag_model = DagModel.get_dagmodel(dag_id, session=session)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing.', "error")
- return redirect(url_for("Airflow.index"))
-
- wwwutils.check_import_errors(dag.fileloc, session)
- wwwutils.check_dag_warnings(dag.dag_id, session)
-
- default_dag_run = conf.getint("webserver", "default_dag_run_display_number")
- base_date_str = request.args.get("base_date")
- num_runs = request.args.get("num_runs", default=default_dag_run, type=int)
-
- if base_date_str:
- base_date = _safe_parse_datetime(base_date_str)
- else:
- base_date = dag.get_latest_execution_date() or timezone.utcnow()
-
- root = request.args.get("root")
- if root:
- dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)
-
- chart_height = wwwutils.get_chart_height(dag)
- chart = nvd3.lineChart(
- name="lineChart",
- x_custom_format=True,
- x_axis_date=True,
- x_axis_format=LINECHART_X_AXIS_TICKFORMAT,
- height=chart_height,
- chart_attr=self.line_chart_attr,
- )
-
- tis = dag.get_task_instances_before(base_date, num_runs, session=session)
- for task in dag.tasks:
- y_points = []
- x_points = []
- for ti in tis:
- if ti.task_id == task.task_id:
- dttm = wwwutils.epoch(ti.execution_date)
- x_points.append(dttm)
- # y value should reflect completed tries to have a 0 baseline.
- y_points.append(ti.prev_attempted_tries)
- if x_points:
- chart.add_serie(name=task.task_id, x=x_points, y=y_points)
-
- max_date = max((ti.execution_date for ti in tis), default=None)
- chart.create_y_axis("yAxis", format=".02f", custom_format=False, label="Tries")
- chart.axislist["yAxis"]["axisLabelDistance"] = "-15"
-
- session.commit()
-
- form = DateTimeWithNumRunsForm(
- data={
- "base_date": max_date or timezone.utcnow(),
- "num_runs": num_runs,
- }
- )
-
- chart.buildcontent()
-
- return self.render_template(
- "airflow/chart.html",
- dag=dag,
- show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
- root=root,
- form=form,
- chart=Markup(chart.htmlcontent),
- tab_title="Tries",
- dag_model=dag_model,
- )
+ def tries(self, dag_id: str):
+ """Redirect to grid view."""
+ kwargs = {
+ **sanitize_args(request.args),
+ "dag_id": dag_id,
+ }
+ return redirect(url_for("Airflow.grid", **kwargs))
@expose("/landing_times")
def legacy_landing_times(self):
@@ -3311,93 +3000,15 @@ def legacy_landing_times(self):
return redirect(url_for("Airflow.landing_times", **sanitize_args(request.args)))
@expose("/dags//landing-times")
- @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
- @provide_session
- def landing_times(self, dag_id: str, session: Session = NEW_SESSION):
- """Show landing times."""
- dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
- dag_model = DagModel.get_dagmodel(dag_id, session=session)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing.', "error")
- return redirect(url_for("Airflow.index"))
-
- wwwutils.check_import_errors(dag.fileloc, session)
- wwwutils.check_dag_warnings(dag.dag_id, session)
-
- default_dag_run = conf.getint("webserver", "default_dag_run_display_number")
- base_date_str = request.args.get("base_date")
- num_runs = request.args.get("num_runs", default=default_dag_run, type=int)
-
- if base_date_str:
- base_date = _safe_parse_datetime(base_date_str)
- else:
- base_date = dag.get_latest_execution_date() or timezone.utcnow()
-
- root = request.args.get("root")
- if root:
- dag = dag.partial_subset(task_ids_or_regex=root, include_upstream=True, include_downstream=False)
-
- tis = dag.get_task_instances_before(base_date, num_runs, session=session)
-
- chart_height = wwwutils.get_chart_height(dag)
- chart = nvd3.lineChart(
- name="lineChart",
- x_custom_format=True,
- x_axis_date=True,
- x_axis_format=LINECHART_X_AXIS_TICKFORMAT,
- height=chart_height,
- chart_attr=self.line_chart_attr,
- )
-
- y_points: dict[str, list[float]] = defaultdict(list)
- x_points: dict[str, list[tuple[int]]] = defaultdict(list)
- for task in dag.tasks:
- task_id = task.task_id
- for ti in tis:
- if ti.task_id == task.task_id:
- ts = dag.get_run_data_interval(ti.dag_run).end
- if ti.end_date:
- dttm = wwwutils.epoch(ti.execution_date)
- secs = (ti.end_date - ts).total_seconds()
- x_points[task_id].append(dttm)
- y_points[task_id].append(secs)
-
- # determine the most relevant time unit for the set of landing times
- # for the DAG
- y_unit = infer_time_unit([d for t in y_points.values() for d in t])
- # update the y Axis to have the correct time units
- chart.create_y_axis("yAxis", format=".02f", custom_format=False, label=f"Landing Time ({y_unit})")
- chart.axislist["yAxis"]["axisLabelDistance"] = "-15"
-
- for task_id in x_points:
- chart.add_serie(
- name=task_id,
- x=x_points[task_id],
- y=scale_time_units(y_points[task_id], y_unit),
- )
- max_date = max(ti.execution_date for ti in tis) if tis else None
-
- session.commit()
-
- form = DateTimeWithNumRunsForm(
- data={
- "base_date": max_date or timezone.utcnow(),
- "num_runs": num_runs,
- }
- )
- chart.buildcontent()
+ def landing_times(self, dag_id: str):
+ """Redirect to run duration page."""
+ kwargs = {
+ **sanitize_args(request.args),
+ "dag_id": dag_id,
+ "tab": "run_duration",
+ }
- return self.render_template(
- "airflow/chart.html",
- dag=dag,
- show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
- chart=Markup(chart.htmlcontent),
- height=f"{chart_height + 100}px",
- root=root,
- form=form,
- tab_title="Landing times",
- dag_model=dag_model,
- )
+ return redirect(url_for("Airflow.grid", **kwargs))
@expose("/paused", methods=["POST"])
@auth.has_access_dag("PUT")
@@ -3861,65 +3472,22 @@ def legacy_audit_log(self):
return redirect(url_for("Airflow.audit_log", **sanitize_args(request.args)))
@expose("/dags//audit_log")
- @auth.has_access_dag("GET", DagAccessEntity.AUDIT_LOG)
- @provide_session
- def audit_log(self, dag_id: str, session: Session = NEW_SESSION):
- dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
- dag_model = DagModel.get_dagmodel(dag_id, session=session)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
- return redirect(url_for("Airflow.index"))
-
- included_events_raw = conf.get("webserver", "audit_view_included_events", fallback=None)
- excluded_events_raw = conf.get("webserver", "audit_view_excluded_events", fallback=None)
-
- query = select(Log).where(Log.dag_id == dag_id)
- if included_events_raw:
- included_events = {event.strip() for event in included_events_raw.split(",")}
- query = query.where(Log.event.in_(included_events))
- elif excluded_events_raw:
- excluded_events = {event.strip() for event in excluded_events_raw.split(",")}
- query = query.where(Log.event.notin_(excluded_events))
-
- current_page = request.args.get("page", default=0, type=int)
- arg_sorting_key = request.args.get("sorting_key", "dttm")
- arg_sorting_direction = request.args.get("sorting_direction", default="desc")
-
- logs_per_page = PAGE_SIZE
- audit_logs_count = get_query_count(query, session=session)
- num_of_pages = math.ceil(audit_logs_count / logs_per_page)
-
- start = current_page * logs_per_page
- end = start + logs_per_page
-
- sort_column = Log.__table__.c.get(arg_sorting_key)
- if sort_column is not None:
- if arg_sorting_direction == "desc":
- sort_column = sort_column.desc()
- query = query.order_by(sort_column)
+ def audit_log(self, dag_id: str):
+ current_page = request.args.get("page")
+ arg_sorting_key = request.args.get("sorting_key")
+ arg_sorting_direction = request.args.get("sorting_direction")
+ sort_args = {
+ "offset": current_page,
+ f"sort.{arg_sorting_key}": arg_sorting_direction,
+ "limit": PAGE_SIZE,
+ }
+ kwargs = {
+ **sanitize_args(sort_args),
+ "dag_id": dag_id,
+ "tab": "audit_log",
+ }
- dag_audit_logs = session.scalars(query.offset(start).limit(logs_per_page)).all()
- return self.render_template(
- "airflow/dag_audit_log.html",
- dag=dag,
- show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
- dag_model=dag_model,
- root=request.args.get("root"),
- dag_id=dag_id,
- dag_logs=dag_audit_logs,
- num_log_from=min(start + 1, audit_logs_count),
- num_log_to=min(end, audit_logs_count),
- audit_logs_count=audit_logs_count,
- page_size=PAGE_SIZE,
- paging=wwwutils.generate_pages(
- current_page,
- num_of_pages,
- sorting_key=arg_sorting_key or None,
- sorting_direction=arg_sorting_direction or None,
- ),
- sorting_key=arg_sorting_key,
- sorting_direction=arg_sorting_direction,
- )
+ return redirect(url_for("Airflow.grid", **kwargs))
class ConfigurationView(AirflowBaseView):
diff --git a/airflow/www/webpack.config.js b/airflow/www/webpack.config.js
index 4e3de410bc8d0..c690749ad921c 100644
--- a/airflow/www/webpack.config.js
+++ b/airflow/www/webpack.config.js
@@ -76,8 +76,6 @@ const config = {
grid: `${JS_DIR}/dag/index.tsx`,
clusterActivity: `${JS_DIR}/cluster-activity/index.tsx`,
datasets: `${JS_DIR}/datasets/index.tsx`,
- calendar: [`${CSS_DIR}/calendar.css`, `${JS_DIR}/calendar.js`],
- durationChart: `${JS_DIR}/duration_chart.js`,
trigger: `${JS_DIR}/trigger.js`,
variableEdit: `${JS_DIR}/variable_edit.js`,
},
@@ -202,10 +200,6 @@ const config = {
// we'll have the dependencies imported within the custom JS
new CopyWebpackPlugin({
patterns: [
- {
- from: "node_modules/nvd3/build/*.min.*",
- flatten: true,
- },
{
from: "node_modules/d3/d3.min.*",
flatten: true,
diff --git a/airflow/www/yarn.lock b/airflow/www/yarn.lock
index 62dd14d358de5..600a4b7a3f852 100644
--- a/airflow/www/yarn.lock
+++ b/airflow/www/yarn.lock
@@ -9020,11 +9020,6 @@ nth-check@^2.0.1:
dependencies:
boolbase "^1.0.0"
-nvd3@^1.8.6:
- version "1.8.6"
- resolved "https://registry.yarnpkg.com/nvd3/-/nvd3-1.8.6.tgz#2d3eba74bf33363b5101ebf1d093c59a53ae73c4"
- integrity sha512-YGQ9hAQHuQCF0JmYkT2GhNMHb5pA+vDfQj6C2GdpQPzdRPj/srPG3mh/3fZzUFt+at1NusLk/RqICUWkxm4viQ==
-
nwsapi@^2.2.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/nwsapi/-/nwsapi-2.2.0.tgz#204879a9e3d068ff2a55139c2c772780681a38b7"
diff --git a/newsfragments/37988.significant.rst b/newsfragments/37988.significant.rst
new file mode 100644
index 0000000000000..2cf6ced182aa5
--- /dev/null
+++ b/newsfragments/37988.significant.rst
@@ -0,0 +1 @@
+Remove all remaining flask DAG views and redirect to their React equivalents. Page navigation now happens grid view selection and the tabs to the right of the grid.
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index 79d205c335796..a588cb1864a93 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -507,18 +507,6 @@ def test_get_task_stats_from_query():
"dags/example_bash_operator/graph?execution_date=invalid",
INVALID_DATETIME_RESPONSE,
),
- (
- "dags/example_bash_operator/duration?base_date=invalid",
- INVALID_DATETIME_RESPONSE,
- ),
- (
- "dags/example_bash_operator/tries?base_date=invalid",
- INVALID_DATETIME_RESPONSE,
- ),
- (
- "dags/example_bash_operator/landing-times?base_date=invalid",
- INVALID_DATETIME_RESPONSE,
- ),
(
"dags/example_bash_operator/gantt?execution_date=invalid",
INVALID_DATETIME_RESPONSE,
diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py
index a8f199f595d0a..508e7ef32d687 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -151,14 +151,3 @@ def test_action_logging_variables_masked_secrets(session, admin_client):
admin_client.post("/variable/add", data=form)
session.commit()
_check_last_log_masked_variable(session, dag_id=None, event="variable.create", execution_date=None)
-
-
-def test_calendar(admin_client, dagruns):
- url = "calendar?dag_id=example_bash_operator"
- resp = admin_client.get(url, follow_redirects=True)
-
- bash_dagrun, _, _ = dagruns
-
- datestr = bash_dagrun.execution_date.date().isoformat()
- expected = rf"{{\"date\":\"{datestr}\",\"state\":\"running\",\"count\":1}}"
- check_content_in_response(expected, resp)
diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py
index a60e44c3420b8..5ddcb65a871f5 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -436,16 +436,6 @@ def test_dashboard_flash_messages_type(user_client):
check_content_in_response("alert-foo", resp)
-def test_audit_log_view_admin(admin_client, working_dags):
- resp = admin_client.get("/dags/filter_test_1/audit_log")
- check_content_in_response("Dag Audit Log", resp)
-
-
-def test_audit_log_view_user(user_client, working_dags):
- resp = user_client.get("/dags/filter_test_1/audit_log")
- check_content_not_in_response("Dag Audit Log", resp, resp_code=302)
-
-
@pytest.mark.parametrize(
"url, lower_key, greater_key",
[
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 1998dffc58c7d..61661e89419aa 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -19,7 +19,6 @@
import html
import json
-import re
import unittest.mock
import urllib.parse
from getpass import getuser
@@ -29,11 +28,9 @@
import time_machine
from airflow import settings
-from airflow.exceptions import AirflowException
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagcode import DagCode
-from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import XCom
@@ -1012,49 +1009,6 @@ def test_action_muldelete_task_instance(session, admin_client, task_search_tuple
assert session.query(TaskReschedule).count() == 0
-def test_task_fail_duration(app, admin_client, dag_maker, session):
- """Task duration page with a TaskFail entry should render without error."""
- with dag_maker() as dag:
- op1 = BashOperator(task_id="fail", bash_command="exit 1")
- op2 = BashOperator(task_id="success", bash_command="exit 0")
-
- with pytest.raises(AirflowException):
- op1.run()
- op2.run()
-
- op1_fails = (
- session.query(TaskFail)
- .filter(
- TaskFail.task_id == "fail",
- TaskFail.dag_id == dag.dag_id,
- )
- .all()
- )
-
- op2_fails = (
- session.query(TaskFail)
- .filter(
- TaskFail.task_id == "success",
- TaskFail.dag_id == dag.dag_id,
- )
- .all()
- )
-
- assert len(op1_fails) == 1
- assert len(op2_fails) == 0
-
- with unittest.mock.patch.object(app, "dag_bag") as mocked_dag_bag:
- mocked_dag_bag.get_dag.return_value = dag
- resp = admin_client.get(f"dags/{dag.dag_id}/duration", follow_redirects=True)
- html = resp.get_data().decode()
- cumulative_chart = json.loads(re.search("data_cumlinechart=(.*);", html).group(1))
- line_chart = json.loads(re.search("data_linechart=(.*);", html).group(1))
-
- assert resp.status_code == 200
- assert sorted(item["key"] for item in cumulative_chart) == ["fail", "success"]
- assert sorted(item["key"] for item in line_chart) == ["fail", "success"]
-
-
def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client):
"""Test that the graph view doesn't fail on a recursion error."""
from airflow.models.baseoperator import chain