From f7d6659a5854471e3d55ca43bf536289cf5e406c Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Sun, 10 Aug 2025 14:07:08 +0800 Subject: [PATCH] Add exact match filters for dag_id and task_id in HITL details endpoint --- .../airflow/api_fastapi/common/parameters.py | 20 +++++++++++++++++++ .../openapi/v2-rest-api-generated.yaml | 16 +++++++++++++++ .../core_api/routes/public/hitl.py | 8 +++++++- .../airflow/ui/openapi-gen/queries/common.ts | 6 ++++-- .../ui/openapi-gen/queries/ensureQueryData.ts | 8 ++++++-- .../ui/openapi-gen/queries/prefetch.ts | 8 ++++++-- .../airflow/ui/openapi-gen/queries/queries.ts | 8 ++++++-- .../ui/openapi-gen/queries/suspense.ts | 8 ++++++-- .../ui/openapi-gen/requests/services.gen.ts | 4 ++++ .../ui/openapi-gen/requests/types.gen.ts | 2 ++ .../core_api/routes/public/test_hitl.py | 8 ++++++-- 11 files changed, 83 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index ccc27fe53bef7..5494fe0d304d2 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -853,3 +853,23 @@ def _optional_boolean(value: bool | None) -> bool | None: ) ), ] +QueryHITLDetailDagIdFilter = Annotated[ + FilterParam[str | None], + Depends( + filter_param_factory( + TaskInstance.dag_id, + str | None, + filter_name="dag_id", + ) + ), +] +QueryHITLDetailTaskIdFilter = Annotated[ + FilterParam[str | None], + Depends( + filter_param_factory( + TaskInstance.task_id, + str | None, + filter_name="task_id", + ) + ), +] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5c413bf03b937..b06dd3cb515bc 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -7614,6 +7614,14 @@ paths: default: - ti_id title: Order By + - name: dag_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id - name: dag_id_pattern in: query required: false @@ -7632,6 +7640,14 @@ paths: schema: type: string title: Dag Run Id + - name: task_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Task Id - name: task_id_pattern in: query required: false diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 63a5f575b2d85..9f17bc3fd714d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -28,10 +28,12 @@ from airflow.api_fastapi.common.db.common import SessionDep, paginated_select from airflow.api_fastapi.common.parameters import ( QueryHITLDetailBodySearch, + QueryHITLDetailDagIdFilter, QueryHITLDetailDagIdPatternSearch, QueryHITLDetailDagRunIdFilter, QueryHITLDetailResponseReceivedFilter, QueryHITLDetailSubjectSearch, + QueryHITLDetailTaskIdFilter, QueryHITLDetailTaskIdPatternSearch, QueryHITLDetailUserIdFilter, QueryLimit, @@ -301,9 +303,11 @@ def get_hitl_details( session: SessionDep, # ti related filter readable_ti_filter: ReadableTIFilterDep, + dag_id: QueryHITLDetailDagIdFilter, dag_id_pattern: QueryHITLDetailDagIdPatternSearch, dag_run_id: QueryHITLDetailDagRunIdFilter, - task_id: QueryHITLDetailTaskIdPatternSearch, + task_id: QueryHITLDetailTaskIdFilter, + task_id_pattern: QueryHITLDetailTaskIdPatternSearch, ti_state: QueryTIStateFilter, # hitl detail related filter response_received: QueryHITLDetailResponseReceivedFilter, @@ -322,9 +326,11 @@ def get_hitl_details( filters=[ # ti related filter readable_ti_filter, + dag_id, dag_id_pattern, dag_run_id, task_id, + task_id_pattern, ti_state, # hitl detail related filter response_received, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 5621fb8c5e5ed..65a87575ad64e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -651,8 +651,9 @@ export const UseHumanInTheLoopServiceGetMappedTiHitlDetailKeyFn = ({ dagId, dagR export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited>; export type HumanInTheLoopServiceGetHitlDetailsQueryResult = UseQueryResult; export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails"; -export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }: { +export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: { bodySearch?: string; + dagId?: string; dagIdPattern?: string; dagRunId?: string; limit?: number; @@ -661,9 +662,10 @@ export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagIdP responseReceived?: boolean; state?: string[]; subjectSearch?: string; + taskId?: string; taskIdPattern?: string; userId?: string[]; -} = {}, queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }])]; +} = {}, queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }])]; export type MonitorServiceGetHealthDefaultResponse = Awaited>; export type MonitorServiceGetHealthQueryResult = UseQueryResult; export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 7024c46f61084..7a18c205ce06b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1231,8 +1231,10 @@ export const ensureUseHumanInTheLoopServiceGetMappedTiHitlDetailData = (queryCli * @param data.limit * @param data.offset * @param data.orderBy +* @param data.dagId * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.dagRunId +* @param data.taskId * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived @@ -1242,8 +1244,9 @@ export const ensureUseHumanInTheLoopServiceGetMappedTiHitlDetailData = (queryCli * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }: { +export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: { bodySearch?: string; + dagId?: string; dagIdPattern?: string; dagRunId?: string; limit?: number; @@ -1252,9 +1255,10 @@ export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: Qu responseReceived?: boolean; state?: string[]; subjectSearch?: string; + taskId?: string; taskIdPattern?: string; userId?: string[]; -} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }) }); +} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index caa59c7135d23..d4df09ab1c730 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1231,8 +1231,10 @@ export const prefetchUseHumanInTheLoopServiceGetMappedTiHitlDetail = (queryClien * @param data.limit * @param data.offset * @param data.orderBy +* @param data.dagId * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.dagRunId +* @param data.taskId * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived @@ -1242,8 +1244,9 @@ export const prefetchUseHumanInTheLoopServiceGetMappedTiHitlDetail = (queryClien * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }: { +export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: { bodySearch?: string; + dagId?: string; dagIdPattern?: string; dagRunId?: string; limit?: number; @@ -1252,9 +1255,10 @@ export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: Quer responseReceived?: boolean; state?: string[]; subjectSearch?: string; + taskId?: string; taskIdPattern?: string; userId?: string[]; -} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }) }); +} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index d48e9342d43ec..f5b65f7596b10 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1231,8 +1231,10 @@ export const useHumanInTheLoopServiceGetMappedTiHitlDetail = = unknown[]>({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }: { +export const useHumanInTheLoopServiceGetHitlDetails = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: { bodySearch?: string; + dagId?: string; dagIdPattern?: string; dagRunId?: string; limit?: number; @@ -1252,9 +1255,10 @@ export const useHumanInTheLoopServiceGetHitlDetails = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }) as TData, ...options }); +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index dabcabf0b94a7..d2b2ee9fcd6bd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1231,8 +1231,10 @@ export const useHumanInTheLoopServiceGetMappedTiHitlDetailSuspense = = unknown[]>({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }: { +export const useHumanInTheLoopServiceGetHitlDetailsSuspense = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }: { bodySearch?: string; + dagId?: string; dagIdPattern?: string; dagRunId?: string; limit?: number; @@ -1252,9 +1255,10 @@ export const useHumanInTheLoopServiceGetHitlDetailsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskIdPattern, userId }) as TData, ...options }); +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, taskId, taskIdPattern, userId }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 942628787a27a..3ddb842f32238 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3524,8 +3524,10 @@ export class HumanInTheLoopService { * @param data.limit * @param data.offset * @param data.orderBy + * @param data.dagId * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.dagRunId + * @param data.taskId * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived @@ -3543,8 +3545,10 @@ export class HumanInTheLoopService { limit: data.limit, offset: data.offset, order_by: data.orderBy, + dag_id: data.dagId, dag_id_pattern: data.dagIdPattern, dag_run_id: data.dagRunId, + task_id: data.taskId, task_id_pattern: data.taskIdPattern, state: data.state, response_received: data.responseReceived, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 5a7cd9d3cad2d..15c706dc41f9b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2953,6 +2953,7 @@ export type GetHitlDetailsData = { * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ bodySearch?: string | null; + dagId?: string | null; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ @@ -2967,6 +2968,7 @@ export type GetHitlDetailsData = { * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ subjectSearch?: string | null; + taskId?: string | null; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 237410ab808b0..d6a1764592c80 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -487,7 +487,9 @@ def test_should_respond_200_with_existing_response( # ti related filter ({"dag_id_pattern": "hitl_dag"}, 5), ({"dag_id_pattern": "other_Dag_"}, 3), + ({"dag_id": "hitl_dag_0"}, 1), ({"dag_run_id": "hitl_run_0"}, 1), + ({"task_id": "hitl_task_0"}, 1), ({"task_id_pattern": "another_hitl"}, 3), ({"state": "running"}, 5), ({"state": "success"}, 3), @@ -499,9 +501,11 @@ def test_should_respond_200_with_existing_response( ({"user_id": ["test"]}, 3), ], ids=[ - "dag_id_hitl_dag", - "dag_id_other_dag", + "dag_id_pattern_hitl_dag", + "dag_id_pattern_other_dag", + "dag_id", "dag_run_id", + "task_id_pattern", "task_id", "ti_state_running", "ti_state_success",