diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py index 9b552c9cf5883..dbbada8cf39c7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py @@ -30,7 +30,7 @@ HistoricalMetricDataResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.security import requires_access_dag +from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep, requires_access_dag from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun, DagRunType from airflow.models.taskinstance import TaskInstance @@ -50,10 +50,12 @@ def historical_metrics( session: SessionDep, start_date: DateTimeQuery, + readable_dags_filter: ReadableDagsFilterDep, end_date: OptionalDateTimeQuery = None, ) -> HistoricalMetricDataResponse: """Return cluster activity historical metrics.""" current_time = timezone.utcnow() + permitted_dag_ids = readable_dags_filter.value # DagRuns dag_run_types = session.execute( select(DagRun.run_type, func.count(DagRun.run_id)) @@ -61,6 +63,7 @@ def historical_metrics( func.coalesce(DagRun.start_date, current_time) >= start_date, func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time), ) + .where(DagRun.dag_id.in_(permitted_dag_ids)) .group_by(DagRun.run_type) ).all() @@ -70,6 +73,7 @@ def historical_metrics( func.coalesce(DagRun.start_date, current_time) >= start_date, func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time), ) + .where(DagRun.dag_id.in_(permitted_dag_ids)) .group_by(DagRun.state) ).all() @@ -81,6 +85,7 @@ def historical_metrics( func.coalesce(DagRun.start_date, current_time) >= start_date, func.coalesce(DagRun.end_date, current_time) <= func.coalesce(end_date, current_time), ) + .where(DagRun.dag_id.in_(permitted_dag_ids)) .group_by(TaskInstance.state) ).all() @@ -110,11 +115,14 @@ def historical_metrics( ) def dag_stats( session: SessionDep, + readable_dags_filter: ReadableDagsFilterDep, ) -> DashboardDagStatsResponse: """Return basic DAG stats with counts of DAGs in various states.""" + permitted_dag_ids = readable_dags_filter.value latest_dates_subq = ( select(DagRun.dag_id, func.max(DagRun.logical_date).label("max_logical_date")) .where(DagRun.logical_date.is_not(None)) + .where(DagRun.dag_id.in_(permitted_dag_ids)) .group_by(DagRun.dag_id) .subquery() ) @@ -131,6 +139,7 @@ def dag_stats( (DagRun.dag_id == latest_dates_subq.c.dag_id) & (DagRun.logical_date == latest_dates_subq.c.max_logical_date), ) + .where(DagRun.dag_id.in_(permitted_dag_ids)) .cte() )