diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index dd80027cf0c59..924206602fc3a 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -32,7 +32,7 @@ overload, ) -from fastapi import Depends, HTTPException, Query +from fastapi import Depends, HTTPException, Query, status from pendulum.parsing.exceptions import ParserError from pydantic import AfterValidator, BaseModel, NonNegativeInt from sqlalchemy import Column, and_, func, not_, or_, select as sql_select @@ -69,6 +69,7 @@ from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.sql import ColumnElement, Select + from airflow.serialization.serialized_objects import SerializedDAG T = TypeVar("T") @@ -185,6 +186,57 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self: raise NotImplementedError("Use search_param_factory instead , depends is not implemented.") +class QueryTaskInstanceTaskGroupFilter(BaseParam[str]): + """Task group filter - returns all tasks in the specified group.""" + + def __init__(self, dag=None, skip_none: bool = True): + super().__init__(skip_none=skip_none) + self._dag: None | SerializedDAG = dag + + @property + def dag(self) -> None | SerializedDAG: + return self._dag + + @dag.setter + def dag(self, value: None | SerializedDAG) -> None: + self._dag = value + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + + if not self.dag: + raise ValueError("Dag must be set before calling to_orm") + + if not hasattr(self.dag, "task_group"): + return select + + # Exact matching on group_id + task_groups = self.dag.task_group.get_task_group_dict() + task_group = task_groups.get(self.value) + if not task_group: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail={ + "reason": "not_found", + "message": f"Task group {self.value} not found", + }, + ) + + return select.where(TaskInstance.task_id.in_(task.task_id for task in task_group.iter_tasks())) + + @classmethod + def depends( + cls, + value: str | None = Query( + alias="task_group_id", + default=None, + description="Filter by exact task group ID. Returns all tasks within the specified task group.", + ), + ) -> QueryTaskInstanceTaskGroupFilter: + return cls(dag=None).set_value(value) + + def search_param_factory( attribute: ColumnElement, pattern_name: str, @@ -888,6 +940,9 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N QueryTITaskDisplayNamePatternSearch = Annotated[ _SearchParam, Depends(search_param_factory(TaskInstance.task_display_name, "task_display_name_pattern")) ] +QueryTITaskGroupFilter = Annotated[ + QueryTaskInstanceTaskGroupFilter, Depends(QueryTaskInstanceTaskGroupFilter.depends) +] QueryTIDagVersionFilter = Annotated[ FilterParam[list[int]], Depends( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5bcc026a26e56..a19e33230fa57 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6597,6 +6597,18 @@ paths: title: Task Display Name Pattern description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ \ Regular expressions are **not** supported." + - name: task_group_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: Filter by exact task group ID. Returns all tasks within the + specified task group. + title: Task Group Id + description: Filter by exact task group ID. Returns all tasks within the specified + task group. - name: dag_id_pattern in: query required: false diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index f0cea4100d753..7ccebd396dd53 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -52,6 +52,7 @@ QueryTIQueueNamePatternSearch, QueryTIStateFilter, QueryTITaskDisplayNamePatternSearch, + QueryTITaskGroupFilter, QueryTITryNumberFilter, Range, RangeFilter, @@ -424,6 +425,7 @@ def get_task_instances( update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", TI))], duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", TI))], task_display_name_pattern: QueryTITaskDisplayNamePatternSearch, + task_group_id: QueryTITaskGroupFilter, dag_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(TI.dag_id, "dag_id_pattern"))], run_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(TI.run_id, "run_id_pattern"))], state: QueryTIStateFilter, @@ -490,8 +492,10 @@ def get_task_instances( ) query = query.where(TI.run_id == dag_run_id) if dag_id != "~": - get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session) + dag = get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session) query = query.where(TI.dag_id == dag_id) + if dag: + task_group_id.dag = dag task_instance_select, total_entries = paginated_select( statement=query, @@ -510,6 +514,7 @@ def get_task_instances( executor, task_id, task_display_name_pattern, + task_group_id, dag_id_pattern, run_id_pattern, version_number, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 3a3ea3dd54eda..aa544d1108aaf 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -474,7 +474,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceKeyFn = ({ dagId, dagRun export type TaskInstanceServiceGetTaskInstancesDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstancesQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstancesKey = "TaskInstanceServiceGetTaskInstances"; -export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagIdPattern?: string; dagRunId: string; @@ -512,6 +512,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPatter startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -519,7 +520,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagIdPatter updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; +}, queryKey?: Array) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }])]; export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse = Awaited>; export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult = UseQueryResult; export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey = "TaskInstanceServiceGetTaskInstanceTryDetails"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index fb3c855f1d5e5..3e921c98eab42 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -925,6 +925,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state @@ -944,7 +945,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagIdPattern?: string; dagRunId: string; @@ -982,6 +983,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -989,7 +991,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 39452099cae66..4a2fa7fc2c2bc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -925,6 +925,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state @@ -944,7 +945,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagIdPattern?: string; dagRunId: string; @@ -982,6 +983,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer startDateLte?: string; state?: string[]; taskDisplayNamePattern?: string; + taskGroupId?: string; taskId?: string; tryNumber?: number[]; updatedAtGt?: string; @@ -989,7 +991,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 2c290c1daf6db..ccd2ff54d9947 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -925,6 +925,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstances = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagIdPattern?: string; dagRunId: string; @@ -982,6 +983,7 @@ export const useTaskInstanceServiceGetTaskInstances = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 4dcb5ddfb25d8..51a339c4d7b08 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -925,6 +925,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { +export const useTaskInstanceServiceGetTaskInstancesSuspense = = unknown[]>({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }: { dagId: string; dagIdPattern?: string; dagRunId: string; @@ -982,6 +983,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }, queryKey), queryFn: () => TaskInstanceService.getTaskInstances({ dagId, dagIdPattern, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, operatorNamePattern, orderBy, pool, poolNamePattern, queue, queueNamePattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskGroupId, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionNumber }) as TData, ...options }); /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 0d3210084ab34..d016eaa00f4fd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2335,6 +2335,7 @@ export class TaskInstanceService { * @param data.durationLte * @param data.durationLt * @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.taskGroupId Filter by exact task group ID. Returns all tasks within the specified task group. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state @@ -2389,6 +2390,7 @@ export class TaskInstanceService { duration_lte: data.durationLte, duration_lt: data.durationLt, task_display_name_pattern: data.taskDisplayNamePattern, + task_group_id: data.taskGroupId, dag_id_pattern: data.dagIdPattern, run_id_pattern: data.runIdPattern, state: data.state, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 05391098ac158..6e2d8edb4f505 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2926,6 +2926,10 @@ export type GetTaskInstancesData = { * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ taskDisplayNamePattern?: string | null; + /** + * Filter by exact task group ID. Returns all tasks within the specified task group. + */ + taskGroupId?: string | null; taskId?: string | null; tryNumber?: Array<(number)>; updatedAtGt?: string | null; diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx index b365a18161281..7821c3598b6d0 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx @@ -66,7 +66,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr { dagId, dagRunId: runId, - taskDisplayNamePattern: groupId, + taskGroupId: groupId, }, undefined, { diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx index cee8163cb94b4..164b34c02d7ba 100644 --- a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx @@ -50,7 +50,7 @@ export const Overview = () => { runAfterGte: startDate, runAfterLte: endDate, state: ["failed"], - taskDisplayNamePattern: groupId ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, }); @@ -60,7 +60,7 @@ export const Overview = () => { dagRunId: "~", limit: 14, orderBy: ["-run_after"], - taskDisplayNamePattern: groupId ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, }, undefined, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx index f4e0e14aca6c5..ba952c0e419e5 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx @@ -278,7 +278,8 @@ export const TaskInstances = () => { runIdPattern: filteredRunId ?? undefined, startDateGte: startDate ?? undefined, state: hasFilteredState ? filteredState : undefined, - taskDisplayNamePattern: groupId ?? taskDisplayNamePattern ?? undefined, + taskDisplayNamePattern: taskDisplayNamePattern ?? undefined, + taskGroupId: groupId ?? undefined, taskId: Boolean(groupId) ? undefined : taskId, tryNumber: tryNumberFilter !== null && tryNumberFilter !== "" ? [Number(tryNumberFilter)] : undefined, versionNumber: diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 3f61061453c90..778024eebc6bb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1223,6 +1223,24 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint): 3, id="test task_display_name_pattern filter", ), + pytest.param( + "task_group_test", + True, + ("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"), + {"task_group_id": "section_1"}, + 3, + 7, + id="test task_group filter with exact match", + ), + pytest.param( + "task_group_test", + True, + ("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"), + {"task_group_id": "section_2"}, + 4, # section_2 has 4 tasks: task_1 + inner_section_2 (task_2, task_3, task_4) + 7, + id="test task_group filter exact match on group_id", + ), pytest.param( [ {"task_id": "task_match_id_1"}, @@ -1396,6 +1414,9 @@ def test_should_respond_200( dag2_id = "example_skip_dag" self.create_task_instances(session, dag_id=dag1_id) self.create_task_instances(session, dag_id=dag2_id) + elif task_instances == "task_group_test": + # test with task group expansion + self.create_task_instances(session, dag_id="example_task_group") else: self.create_task_instances( session,