diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 4cf35d68871ba..767b9fed46db5 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -52,6 +52,7 @@ from airflow.models.dag_favorite import DagFavorite from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun +from airflow.models.hitl import HITLDetail from airflow.models.pool import Pool from airflow.models.taskinstance import TaskInstance from airflow.models.variable import Variable @@ -750,3 +751,64 @@ def _optional_boolean(value: bool | None) -> bool | None: QueryConnectionIdPatternSearch = Annotated[ _SearchParam, Depends(search_param_factory(Connection.conn_id, "connection_id_pattern")) ] + +# Human in the loop +QueryHITLDetailDagIdPatternSearch = Annotated[ + _SearchParam, + Depends( + search_param_factory( + TaskInstance.dag_id, + "dag_id_pattern", + ) + ), +] +QueryHITLDetailDagRunIdFilter = Annotated[ + FilterParam[str], + Depends( + filter_param_factory( + TaskInstance.run_id, + str, + filter_name="dag_run_id", + ), + ), +] +QueryHITLDetailSubjectSearch = Annotated[ + _SearchParam, + Depends( + search_param_factory( + HITLDetail.subject, + "subject_search", + ) + ), +] +QueryHITLDetailBodySearch = Annotated[ + _SearchParam, + Depends( + search_param_factory( + HITLDetail.body, + "body_search", + ) + ), +] +QueryHITLDetailResponseReceivedFilter = Annotated[ + FilterParam[bool | None], + Depends( + filter_param_factory( + HITLDetail.response_received, + bool | None, + filter_name="response_received", + ) + ), +] +QueryHITLDetailUserIdFilter = Annotated[ + FilterParam[list[str]], + Depends( + filter_param_factory( + HITLDetail.user_id, + list[str], + FilterOptionEnum.ANY_EQUAL, + default_factory=list, + filter_name="user_id", + ) + ), +] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 74363c05fe0bc..1bbb151cc141a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -7505,6 +7505,99 @@ paths: summary: Get Hitl Details description: Get Human-in-the-loop details. operationId: get_hitl_details + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: order_by + in: query + required: false + schema: + type: string + default: ti_id + title: Order By + - name: dag_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Dag Id Pattern + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: dag_run_id + in: query + required: false + schema: + type: string + title: Dag Run Id + - name: state + in: query + required: false + schema: + type: array + items: + type: string + title: State + - name: response_received + in: query + required: false + schema: + anyOf: + - type: boolean + - type: 'null' + title: Response Received + - name: user_id + in: query + required: false + schema: + type: array + items: + type: string + title: User Id + - name: subject_search + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Subject Search + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: body_search + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Body Search + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." responses: '200': description: Successful Response @@ -7513,20 +7606,23 @@ paths: schema: $ref: '#/components/schemas/HITLDetailCollection' '401': - description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized '403': - description: Forbidden content: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' - security: - - OAuth2PasswordBearer: [] - - HTTPBearer: [] + description: Forbidden + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/monitor/health: get: tags: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 1d4d20fe64ea8..aea8c8433cd0d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from typing import Annotated + import structlog from fastapi import Depends, HTTPException, status from sqlalchemy import select @@ -23,6 +25,18 @@ from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + QueryHITLDetailBodySearch, + QueryHITLDetailDagIdPatternSearch, + QueryHITLDetailDagRunIdFilter, + QueryHITLDetailResponseReceivedFilter, + QueryHITLDetailSubjectSearch, + QueryHITLDetailUserIdFilter, + QueryLimit, + QueryOffset, + QueryTIStateFilter, + SortParam, +) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.hitl import ( HITLDetail, @@ -262,17 +276,67 @@ def get_mapped_ti_hitl_detail( dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))], ) def get_hitl_details( - readable_ti_filter: ReadableTIFilterDep, + limit: QueryLimit, + offset: QueryOffset, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + [ + "ti_id", + "subject", + "response_at", + "task_instance.dag_id", + "task_instance.run_id", + ], + HITLDetailModel, + to_replace={ + "dag_id": TI.dag_id, + "run_id": TI.run_id, + }, + ).dynamic_depends(), + ), + ], session: SessionDep, + # ti related filter + readable_ti_filter: ReadableTIFilterDep, + dag_id_pattern: QueryHITLDetailDagIdPatternSearch, + dag_run_id: QueryHITLDetailDagRunIdFilter, + ti_state: QueryTIStateFilter, + # hitl detail related filter + response_received: QueryHITLDetailResponseReceivedFilter, + user_id: QueryHITLDetailUserIdFilter, + subject_patten: QueryHITLDetailSubjectSearch, + body_patten: QueryHITLDetailBodySearch, ) -> HITLDetailCollection: """Get Human-in-the-loop details.""" - query = select(HITLDetailModel).join(TI, HITLDetailModel.ti_id == TI.id) + query = ( + select(HITLDetailModel) + .join(TI, HITLDetailModel.ti_id == TI.id) + .options(joinedload(HITLDetailModel.task_instance)) + ) hitl_detail_select, total_entries = paginated_select( statement=query, - filters=[readable_ti_filter], + filters=[ + # ti related filter + readable_ti_filter, + dag_id_pattern, + dag_run_id, + ti_state, + # hitl detail related filter + response_received, + user_id, + subject_patten, + body_patten, + ], + offset=offset, + limit=limit, + order_by=order_by, session=session, ) + hitl_details = session.scalars(hitl_detail_select) + return HITLDetailCollection( hitl_details=hitl_details, total_entries=total_entries, diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 582edd3a6e381..74d9e3a747b1a 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -73,3 +73,7 @@ class HITLDetail(Base): @hybrid_property def response_received(self) -> bool: return self.response_at is not None + + @response_received.expression # type: ignore[no-redef] + def response_received(cls): + return cls.response_at.is_not(None) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 91fd42bd07909..91a6d4e4784a6 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -640,7 +640,18 @@ export const UseHumanInTheLoopServiceGetMappedTiHitlDetailKeyFn = ({ dagId, dagR export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited>; export type HumanInTheLoopServiceGetHitlDetailsQueryResult = UseQueryResult; export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails"; -export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = (queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [])]; +export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: { + bodySearch?: string; + dagIdPattern?: string; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean; + state?: string[]; + subjectSearch?: string; + userId?: string[]; +} = {}, queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }])]; export type MonitorServiceGetHealthDefaultResponse = Awaited>; export type MonitorServiceGetHealthQueryResult = UseQueryResult; export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index d10b539687bf5..23072d97e34a8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1206,10 +1206,32 @@ export const ensureUseHumanInTheLoopServiceGetMappedTiHitlDetailData = (queryCli /** * Get Hitl Details * Get Human-in-the-loop details. +* @param data The data for the request. +* @param data.limit +* @param data.offset +* @param data.orderBy +* @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.dagRunId +* @param data.state +* @param data.responseReceived +* @param data.userId +* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(), queryFn: () => HumanInTheLoopService.getHitlDetails() }); +export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: { + bodySearch?: string; + dagIdPattern?: string; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean; + state?: string[]; + subjectSearch?: string; + userId?: string[]; +} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 2eab4c35b3ef4..5775c754dcfc5 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1206,10 +1206,32 @@ export const prefetchUseHumanInTheLoopServiceGetMappedTiHitlDetail = (queryClien /** * Get Hitl Details * Get Human-in-the-loop details. +* @param data The data for the request. +* @param data.limit +* @param data.offset +* @param data.orderBy +* @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.dagRunId +* @param data.state +* @param data.responseReceived +* @param data.userId +* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(), queryFn: () => HumanInTheLoopService.getHitlDetails() }); +export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: { + bodySearch?: string; + dagIdPattern?: string; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean; + state?: string[]; + subjectSearch?: string; + userId?: string[]; +} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index f47a175614451..49f0832aabf8f 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1206,10 +1206,32 @@ export const useHumanInTheLoopServiceGetMappedTiHitlDetail = = unknown[]>(queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails() as TData, ...options }); +export const useHumanInTheLoopServiceGetHitlDetails = = unknown[]>({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: { + bodySearch?: string; + dagIdPattern?: string; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean; + state?: string[]; + subjectSearch?: string; + userId?: string[]; +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 2f9e37e78d6c2..2c7311c16788d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1206,10 +1206,32 @@ export const useHumanInTheLoopServiceGetMappedTiHitlDetailSuspense = = unknown[]>(queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails() as TData, ...options }); +export const useHumanInTheLoopServiceGetHitlDetailsSuspense = = unknown[]>({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: { + bodySearch?: string; + dagIdPattern?: string; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean; + state?: string[]; + subjectSearch?: string; + userId?: string[]; +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index b935e2042366d..4125642b85215 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, UpdateMappedTiHitlDetailData, UpdateMappedTiHitlDetailResponse, GetMappedTiHitlDetailData, GetMappedTiHitlDetailResponse, GetHitlDetailsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, UpdateMappedTiHitlDetailData, UpdateMappedTiHitlDetailResponse, GetMappedTiHitlDetailData, GetMappedTiHitlDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse, GetCalendarData, GetCalendarResponse } from './types.gen'; export class AssetService { /** @@ -3488,16 +3488,40 @@ export class HumanInTheLoopService { /** * Get Hitl Details * Get Human-in-the-loop details. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.orderBy + * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.dagRunId + * @param data.state + * @param data.responseReceived + * @param data.userId + * @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response * @throws ApiError */ - public static getHitlDetails(): CancelablePromise { + public static getHitlDetails(data: GetHitlDetailsData = {}): CancelablePromise { return __request(OpenAPI, { method: 'GET', url: '/api/v2/hitl-details/', + query: { + limit: data.limit, + offset: data.offset, + order_by: data.orderBy, + dag_id_pattern: data.dagIdPattern, + dag_run_id: data.dagRunId, + state: data.state, + response_received: data.responseReceived, + user_id: data.userId, + subject_search: data.subjectSearch, + body_search: data.bodySearch + }, errors: { 401: 'Unauthorized', - 403: 'Forbidden' + 403: 'Forbidden', + 422: 'Validation Error' } }); } diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 06887e2caa236..1f496b36fe566 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2928,6 +2928,28 @@ export type GetMappedTiHitlDetailData = { export type GetMappedTiHitlDetailResponse = HITLDetail; +export type GetHitlDetailsData = { + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + bodySearch?: string | null; + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + dagIdPattern?: string | null; + dagRunId?: string; + limit?: number; + offset?: number; + orderBy?: string; + responseReceived?: boolean | null; + state?: Array<(string)>; + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + subjectSearch?: string | null; + userId?: Array<(string)>; +}; + export type GetHitlDetailsResponse = HITLDetailCollection; export type GetHealthResponse = HealthInfoResponse; @@ -5990,6 +6012,7 @@ export type $OpenApiTs = { }; '/api/v2/hitl-details/': { get: { + req: GetHitlDetailsData; res: { /** * Successful Response @@ -6003,6 +6026,10 @@ export type $OpenApiTs = { * Forbidden */ 403: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; }; }; }; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index e851bca4b25fa..695dfcae19015 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -21,6 +21,9 @@ import pytest from sqlalchemy.orm import Session +from airflow.utils.state import TaskInstanceState +from airflow.utils.timezone import utcnow + from tests_common.test_utils.db import AIRFLOW_V_3_1_PLUS if not AIRFLOW_V_3_1_PLUS: @@ -44,11 +47,12 @@ pytestmark = pytest.mark.db_test DAG_ID = "test_hitl_dag" +ANOTHER_DAG_ID = "another_hitl_dag" @pytest.fixture def sample_ti(create_task_instance: CreateTaskInstance) -> TaskInstance: - return create_task_instance() + return create_task_instance(dag_id=DAG_ID) @pytest.fixture @@ -76,6 +80,70 @@ def sample_hitl_detail(sample_ti: TaskInstance, session: Session) -> HITLDetail: return hitl_detail_model +@pytest.fixture +def sample_tis(create_task_instance: CreateTaskInstance) -> list[TaskInstance]: + tis = [ + create_task_instance( + dag_id=f"hitl_dag_{i}", + run_id=f"hitl_run_{i}", + task_id=f"hitl_task_{i}", + state=TaskInstanceState.RUNNING, + ) + for i in range(5) + ] + tis.extend( + [ + create_task_instance( + dag_id=f"other_Dag_{i}", + run_id=f"another_hitl_run_{i}", + task_id=f"another_hitl_task_{i}", + state=TaskInstanceState.SUCCESS, + ) + for i in range(3) + ] + ) + return tis + + +@pytest.fixture +def sample_hitl_details(sample_tis: list[TaskInstance], session: Session) -> list[HITLDetail]: + hitl_detail_models = [ + HITLDetail( + ti_id=ti.id, + options=["Approve", "Reject"], + subject=f"This is subject {i}", + body=f"this is body {i}", + defaults=["Approve"], + multiple=False, + params={"input_1": 1}, + ) + for i, ti in enumerate(sample_tis[:5]) + ] + hitl_detail_models.extend( + [ + HITLDetail( + ti_id=ti.id, + options=["1", "2", "3"], + subject=f"Subject {i} this is", + body=f"Body {i} this is", + defaults=["1"], + multiple=False, + params={"input": 1}, + response_at=utcnow(), + chosen_options=[str(i)], + params_input={"input": i}, + user_id="test", + ) + for i, ti in enumerate(sample_tis[5:]) + ] + ) + + session.add_all(hitl_detail_models) + session.commit() + + return hitl_detail_models + + @pytest.fixture def expected_ti_not_found_error_msg(sample_ti: TaskInstance) -> str: if TYPE_CHECKING: @@ -115,16 +183,16 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "subject": "This is subject", "user_id": None, "task_instance": { - "dag_display_name": "dag", - "dag_id": "dag", + "dag_display_name": DAG_ID, + "dag_id": DAG_ID, "dag_run_id": "test", "dag_version": { "bundle_name": "dag_maker", "bundle_url": None, "bundle_version": None, "created_at": mock.ANY, - "dag_display_name": "dag", - "dag_id": "dag", + "dag_display_name": DAG_ID, + "dag_id": DAG_ID, "id": mock.ANY, "version_number": 1, }, @@ -419,6 +487,47 @@ def test_should_respond_200_with_existing_response( "total_entries": 1, } + @pytest.mark.usefixtures("sample_hitl_details") + @pytest.mark.parametrize( + "params, expected_ti_count", + [ + # ti related filter + ({"dag_id_pattern": "hitl_dag"}, 5), + ({"dag_id_pattern": "other_Dag_"}, 3), + ({"dag_run_id": "hitl_run_0"}, 1), + ({"state": "running"}, 5), + ({"state": "success"}, 3), + # hitl detail related filter + ({"subject_search": "This is subject"}, 5), + ({"body_search": "this is"}, 8), + ({"response_received": False}, 5), + ({"response_received": True}, 3), + ({"user_id": ["test"]}, 3), + ], + ids=[ + "dag_id_hitl_dag", + "dag_id_other_dag", + "dag_run_id", + "ti_state_running", + "ti_state_success", + "subject", + "body", + "response_not_received", + "response_received", + "user_id", + ], + ) + def test_should_respond_200_with_existing_response_and_query( + self, + test_client: TestClient, + params: dict[str, Any], + expected_ti_count: int, + ) -> None: + response = test_client.get("/hitl-details/", params=params) + assert response.status_code == 200 + assert response.json()["total_entries"] == expected_ti_count + assert len(response.json()["hitl_details"]) == expected_ti_count + def test_should_respond_200_without_response(self, test_client: TestClient) -> None: response = test_client.get("/hitl-details/") assert response.status_code == 200