diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py index 358d1ea4901c6..5420a34c9a9d0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py @@ -161,6 +161,7 @@ class DAGDetailsResponse(DAGResponse): default_args: Mapping | None owner_links: dict[str, str] | None = None is_favorite: bool = False + active_runs_count: int = 0 @field_validator("timezone", mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index a909f386137ca..3faec5af18ece 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -10036,6 +10036,10 @@ components: type: boolean title: Is Favorite default: false + active_runs_count: + type: integer + title: Active Runs Count + default: 0 file_token: type: string title: File Token diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py index b3ca1b7750941..22d6e48d894dd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py @@ -22,7 +22,7 @@ from fastapi import Depends, HTTPException, Query, Response, status from fastapi.exceptions import RequestValidationError from pydantic import ValidationError -from sqlalchemy import delete, insert, select, update +from sqlalchemy import delete, func, insert, select, update from airflow.api.common import delete_dag as delete_dag_module from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag @@ -75,6 +75,7 @@ from airflow.models import DagModel from airflow.models.dag_favorite import DagFavorite from airflow.models.dagrun import DagRun +from airflow.utils.state import DagRunState dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") @@ -233,8 +234,19 @@ def get_dag_details( is not None ) - # Add is_favorite field to the DAG model + # Count active (running + queued) DAG runs for this DAG + active_runs_count = ( + session.scalar( + select(func.count()) + .select_from(DagRun) + .where(DagRun.dag_id == dag_id, DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])) + ) + or 0 + ) + + # Add is_favorite and active_runs_count fields to the DAG model setattr(dag_model, "is_favorite", is_favorite) + setattr(dag_model, "active_runs_count", active_runs_count) return DAGDetailsResponse.model_validate(dag_model) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 030a4cec221c7..c78c896459908 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2073,6 +2073,11 @@ export const $DAGDetailsResponse = { title: 'Is Favorite', default: false }, + active_runs_count: { + type: 'integer', + title: 'Active Runs Count', + default: 0 + }, file_token: { type: 'string', title: 'File Token', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 790edb0cf09c6..e424fa33108e9 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -568,6 +568,7 @@ export type DAGDetailsResponse = { [key: string]: (string); } | null; is_favorite?: boolean; + active_runs_count?: number; /** * Return file token. */ diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx index c7bb74e68355d..aec2be02260ff 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx @@ -55,16 +55,29 @@ export const Dag = () => { ...externalTabs, ]; + const refetchInterval = useAutoRefresh({ dagId }); + const [hasPendingRuns, setHasPendingRuns] = useState(false); + const { data: dag, error, isLoading, - } = useDagServiceGetDagDetails({ - dagId, - }); + } = useDagServiceGetDagDetails( + { + dagId, + }, + undefined, + { + refetchInterval: (query) => { + // Auto-refresh when there are active runs or pending runs + if (hasPendingRuns ?? (query.state.data && (query.state.data.active_runs_count ?? 0) > 0)) { + return refetchInterval; + } - const refetchInterval = useAutoRefresh({ dagId }); - const [hasPendingRuns, setHasPendingRuns] = useState(false); + return false; + }, + }, + ); // Ensures continuous refresh to detect new runs when there's no // pending state and new runs are initiated from other page diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Header.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Header.tsx index 8d55c0f0c397f..ad86404fdd22e 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Header.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Header.tsx @@ -101,6 +101,13 @@ export const Header = ({ /> ) : undefined, }, + { + label: translate("dagDetails.maxActiveRuns"), + value: + dag?.max_active_runs === undefined + ? undefined + : `${dag.active_runs_count ?? 0} of ${dag.max_active_runs}`, + }, { label: translate("dagDetails.owner"), value: , diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index a1e790a167ff8..891d173aeba19 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -954,6 +954,7 @@ def test_dag_details( "timetable_description": "Never, external triggers only", "timezone": UTC_JSON_REPR, "is_favorite": False, + "active_runs_count": 0, } assert res_json == expected @@ -1043,6 +1044,7 @@ def test_dag_details_with_view_url_template( "timetable_description": "Never, external triggers only", "timezone": UTC_JSON_REPR, "is_favorite": False, + "active_runs_count": 0, } assert res_json == expected @@ -1078,6 +1080,63 @@ def test_dag_details_includes_is_favorite_field(self, session, test_client): assert isinstance(body["is_favorite"], bool) assert body["is_favorite"] is False + def test_dag_details_includes_active_runs_count(self, session, test_client): + """Test that DAG details include the active_runs_count field.""" + # Create running and queued DAG runs for DAG2 + session.add( + DagRun( + dag_id=DAG2_ID, + run_id="running_run_1", + logical_date=datetime(2021, 6, 15, 1, 0, 0, tzinfo=timezone.utc), + start_date=datetime(2021, 6, 15, 1, 0, 0, tzinfo=timezone.utc), + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, + triggered_by=DagRunTriggeredByType.TEST, + ) + ) + session.add( + DagRun( + dag_id=DAG2_ID, + run_id="queued_run_1", + logical_date=datetime(2021, 6, 15, 2, 0, 0, tzinfo=timezone.utc), + start_date=datetime(2021, 6, 15, 2, 0, 0, tzinfo=timezone.utc), + run_type=DagRunType.MANUAL, + state=DagRunState.QUEUED, + triggered_by=DagRunTriggeredByType.TEST, + ) + ) + # Add a successful DAG run (should not be counted) + session.add( + DagRun( + dag_id=DAG2_ID, + run_id="success_run_1", + logical_date=datetime(2021, 6, 15, 3, 0, 0, tzinfo=timezone.utc), + start_date=datetime(2021, 6, 15, 3, 0, 0, tzinfo=timezone.utc), + run_type=DagRunType.MANUAL, + state=DagRunState.SUCCESS, + triggered_by=DagRunTriggeredByType.TEST, + ) + ) + session.commit() + + response = test_client.get(f"/dags/{DAG2_ID}/details") + assert response.status_code == 200 + body = response.json() + + # Verify active_runs_count field is present and correct + assert "active_runs_count" in body + assert isinstance(body["active_runs_count"], int) + assert body["active_runs_count"] == 2 # 1 running + 1 queued + + # Test with DAG that has no active runs + response = test_client.get(f"/dags/{DAG1_ID}/details") + assert response.status_code == 200 + body = response.json() + + assert "active_runs_count" in body + assert isinstance(body["active_runs_count"], int) + assert body["active_runs_count"] == 0 + class TestGetDag(TestDagEndpoint): """Unit tests for Get DAG.""" diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index eb9c9586e680b..fae4e48be634d 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1312,6 +1312,7 @@ class DAGDetailsResponse(BaseModel): default_args: Annotated[dict[str, Any] | None, Field(title="Default Args")] = None owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")] = None is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False + active_runs_count: Annotated[int | None, Field(title="Active Runs Count")] = 0 file_token: Annotated[str, Field(description="Return file token.", title="File Token")] concurrency: Annotated[ int,