diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 123b6d9841f95..1d48187f1cea9 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -66,6 +66,8 @@ if TYPE_CHECKING: from sqlalchemy.sql import ColumnElement, Select + from airflow.serialization.serialized_objects import SerializedDAG + T = TypeVar("T") @@ -181,6 +183,57 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self: raise NotImplementedError("Use search_param_factory instead , depends is not implemented.") +class QueryTaskInstanceTaskGroupFilter(BaseParam[str]): + """Task group filter - returns all tasks in the specified group.""" + + def __init__(self, dag=None, skip_none: bool = True): + super().__init__(skip_none=skip_none) + self._dag: None | SerializedDAG = dag + + @property + def dag(self) -> None | SerializedDAG: + return self._dag + + @dag.setter + def dag(self, value: None | SerializedDAG) -> None: + self._dag = value + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + + if not self.dag: + raise ValueError("Dag must be set before calling to_orm") + + if not hasattr(self.dag, "task_group"): + return select + + # Exact matching on group_id + task_groups = self.dag.task_group.get_task_group_dict() + task_group = task_groups.get(self.value) + if not task_group: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Task group {self.value} not found", + }, + ) + + return select.where(TaskInstance.task_id.in_(task.task_id for task in task_group.iter_tasks())) + + @classmethod + def depends( + cls, + value: str | None = Query( + alias="task_group_id", + default=None, + description="Filter by exact task group ID. Returns all tasks within the specified task group.", + ), + ) -> QueryTaskInstanceTaskGroupFilter: + return cls(dag=None).set_value(value) + + def search_param_factory( attribute: ColumnElement, pattern_name: str, @@ -862,6 +915,9 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N QueryTITaskDisplayNamePatternSearch = Annotated[ _SearchParam, Depends(search_param_factory(TaskInstance.task_display_name, "task_display_name_pattern")) ] +QueryTITaskGroupFilter = Annotated[ + QueryTaskInstanceTaskGroupFilter, Depends(QueryTaskInstanceTaskGroupFilter.depends) +] QueryTIDagVersionFilter = Annotated[ FilterParam[list[int]], Depends( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5f7d5144e7ed8..4231c25b16b2b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6499,6 +6499,18 @@ paths: title: Task Display Name Pattern description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ \ Regular expressions are **not** supported." + - name: task_group_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: Filter by exact task group ID. Returns all tasks within the + specified task group. + title: Task Group Id + description: Filter by exact task group ID. Returns all tasks within the specified + task group. - name: state in: query required: false diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 68764b5456aa6..79e59c009fae3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -49,6 +49,7 @@ QueryTIQueueFilter, QueryTIStateFilter, QueryTITaskDisplayNamePatternSearch, + QueryTITaskGroupFilter, QueryTITryNumberFilter, Range, RangeFilter, @@ -407,6 +408,7 @@ def get_task_instances( update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", TI))], duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", TI))], task_display_name_pattern: QueryTITaskDisplayNamePatternSearch, + task_group_id: QueryTITaskGroupFilter, state: QueryTIStateFilter, pool: QueryTIPoolFilter, queue: QueryTIQueueFilter, @@ -468,8 +470,10 @@ def get_task_instances( ) query = query.where(TI.run_id == dag_run_id) if dag_id != "~": - get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session) + dag = get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session) query = query.where(TI.dag_id == dag_id) + if dag: + task_group_id.dag = dag task_instance_select, total_entries = paginated_select( statement=query, @@ -486,6 +490,7 @@ def get_task_instances( executor, task_id, task_display_name_pattern, + task_group_id, version_number, readable_ti_filter, try_number, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 994fe2d91c14d..de50c91bd1c09 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -464,7 +464,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceKeyFn = ({ dagId, dagRun export type TaskInstanceServiceGetTaskInstancesDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstancesQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstancesKey = "TaskInstanceServiceGetTaskInstances"; -export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagRunId: string; durationGt?: number; @@ -497,6 +497,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, d startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -504,7 +505,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, d updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; +}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey = "TaskInstanceServiceGetTaskInstanceTryDetails"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 41fe0005dcd1b..b376420458833 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -905,6 +905,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.state * @param data.pool * @param data.queue @@ -919,7 +920,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagRunId: string; durationGt?: number; @@ -952,6 +953,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -959,7 +961,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index fa6162ec588bf..fbb00335e4d0a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -905,6 +905,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.state * @param data.pool * @param data.queue @@ -919,7 +920,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagRunId: string; durationGt?: number; @@ -952,6 +953,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -959,7 +961,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 955e5049e6049..cd9c9f0e99b20 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -905,6 +905,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance = = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstances = = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagRunId: string; durationGt?: number; @@ -952,6 +953,7 @@ export const useTaskInstanceServiceGetTaskInstances = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index aafe12ed9bcac..460cad0b04923 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -905,6 +905,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstancesSuspense = = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagRunId: string; durationGt?: number; @@ -952,6 +953,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 91ded2463e7f3..153da21a141fb 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2315,6 +2315,7 @@ export class TaskInstanceService { * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.state * @param data.pool * @param data.queue @@ -2364,6 +2365,7 @@ export class TaskInstanceService { duration_lte: data.durationLte, duration_lt: data.durationLt, task_display_name_pattern: data.taskDisplayNamePattern, + task_group_id: data.taskGroupId, state: data.state, pool: data.pool, queue: data.queue, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 1a1bf0f50e9ac..eaf8190a369c4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2775,6 +2775,10 @@ export type GetTaskInstancesData = { * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ taskDisplayNamePattern?: string | null; + /** + * Filter by exact task group ID. Returns all tasks within the specified task group. + */ + taskGroupId?: string | null; taskId?: string | null; tryNumber?: Array<(number)>; updatedAtGt?: string | null; diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx index b365a18161281..7821c3598b6d0 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx @@ -66,7 +66,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr { dagId, dagRunId: runId, - taskDisplayNamePattern: groupId, + taskGroupId: groupId, }, undefined, { diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx index d003174b98752..8940efad08db6 100644 --- a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx @@ -50,7 +50,7 @@ export const Overview = () => { runAfterGte: startDate, runAfterLte: endDate, state: ["failed"], - taskDisplayNamePattern: groupId ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, }); @@ -60,7 +60,7 @@ export const Overview = () => { dagRunId: "~", limit: 14, orderBy: ["-run_after"], - taskDisplayNamePattern: groupId ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, }, undefined, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index c51430b993575..911a03debe454 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -251,7 +251,8 @@ export const TaskInstances = () => { pool: hasFilteredPool ? pool : undefined, startDateGte: startDate ?? undefined, state: hasFilteredState ? filteredState : undefined, - taskDisplayNamePattern: groupId ?? taskDisplayNamePattern ?? undefined, + taskDisplayNamePattern: taskDisplayNamePattern ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, }, undefined, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index a353b110791a3..ee5e972478482 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1191,6 +1191,24 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint): 3, id="test task_display_name_pattern filter", ), + pytest.param( + "task_group_test", + True, + ("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"), + {"task_group_id": "section_1"}, + 3, + 7, + id="test task_group filter with exact match", + ), + pytest.param( + "task_group_test", + True, + ("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"), + {"task_group_id": "section_2"}, + 4, # section_2 has 4 tasks: task_1 + inner_section_2 (task_2, task_3, task_4) + 7, + id="test task_group filter exact match on group_id", + ), pytest.param( [ {"task_id": "task_match_id_1"}, @@ -1294,12 +1312,24 @@ def test_should_respond_200( expected_queries_number, session, ): - self.create_task_instances( - session, - update_extras=update_extras, - task_instances=task_instances, - ) - with mock.patch("airflow.models.dag_version.DagBundlesManager"): + # Special handling for dag_id_pattern tests that require multiple DAGs + if task_instances == "dag_id_pattern_test": + # Create task instances for multiple DAGs like the original test_dag_id_pattern_filter + dag1_id = "example_python_operator" + dag2_id = "example_skip_dag" + self.create_task_instances(session, dag_id=dag1_id) + self.create_task_instances(session, dag_id=dag2_id) + elif task_instances == "task_group_test": + # test with task group expansion + self.create_task_instances(session, dag_id="example_task_group") + else: + self.create_task_instances( + session, + update_extras=update_extras, + task_instances=task_instances, + ) + with mock.patch("airflow.models.dag_version.DagBundlesManager") as dag_bundle_manager_mock: + dag_bundle_manager_mock.return_value.view_url.return_value = "some_url" # Mock DagBundlesManager to avoid checking if dags-folder bundle is configured with assert_queries_count(expected_queries_number): response = test_client.get(url, params=params) diff --git a/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json b/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json index 1f74a7b6616c3..b02339b749a0b 100644 --- a/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json +++ b/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json @@ -1 +1 @@ -{"admin": "admin", "viewer": "viewer", "user": "user", "op": "op"} +{"admin": "admin"}