Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 6 additions & 27 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,14 +987,14 @@ def _optional_boolean(value: bool | None) -> bool | None:
)
),
]
QueryHITLDetailDagRunIdFilter = Annotated[
FilterParam[str],
QueryHITLDetailTaskIdFilter = Annotated[
FilterParam[str | None],
Depends(
filter_param_factory(
TaskInstance.run_id,
str,
filter_name="dag_run_id",
),
TaskInstance.task_id,
str | None,
filter_name="task_id",
)
),
]
QueryHITLDetailSubjectSearch = Annotated[
Expand Down Expand Up @@ -1025,7 +1025,6 @@ def _optional_boolean(value: bool | None) -> bool | None:
)
),
]

QueryHITLDetailRespondedUserIdFilter = Annotated[
FilterParam[list[str]],
Depends(
Expand All @@ -1050,23 +1049,3 @@ def _optional_boolean(value: bool | None) -> bool | None:
)
),
]
QueryHITLDetailDagIdFilter = Annotated[
FilterParam[str | None],
Depends(
filter_param_factory(
TaskInstance.dag_id,
str | None,
filter_name="dag_id",
)
),
]
QueryHITLDetailTaskIdFilter = Annotated[
FilterParam[str | None],
Depends(
filter_param_factory(
TaskInstance.task_id,
str | None,
filter_name="task_id",
)
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -7968,10 +7968,10 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/hitlDetails/{dag_id}/{dag_run_id}/{task_id}:
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/hitlDetails:
patch:
tags:
- HumanInTheLoop
- Task Instance
summary: Update Hitl Detail
description: Update a Human-in-the-loop detail.
operationId: update_hitl_detail
Expand All @@ -7998,11 +7998,10 @@ paths:
type: string
title: Task Id
- name: map_index
in: query
required: false
in: path
required: true
schema:
type: integer
default: -1
title: Map Index
requestBody:
required: true
Expand Down Expand Up @@ -8049,7 +8048,7 @@ paths:
$ref: '#/components/schemas/HTTPValidationError'
get:
tags:
- HumanInTheLoop
- Task Instance
summary: Get Hitl Detail
description: Get a Human-in-the-loop detail of a specific task instance.
operationId: get_hitl_detail
Expand All @@ -8076,11 +8075,10 @@ paths:
type: string
title: Task Id
- name: map_index
in: query
required: false
in: path
required: true
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
Expand Down Expand Up @@ -8113,17 +8111,29 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/hitlDetails/:
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/hitlDetails:
get:
tags:
- HumanInTheLoop
- Task Instance
summary: Get Hitl Details
description: Get Human-in-the-loop details.
operationId: get_hitl_details
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: limit
in: query
required: false
Expand All @@ -8150,14 +8160,6 @@ paths:
default:
- ti_id
title: Order By
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: dag_id_pattern
in: query
required: false
Expand All @@ -8170,12 +8172,6 @@ paths:
title: Dag Id Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: dag_run_id
in: query
required: false
schema:
type: string
title: Dag Run Id
- name: task_id
in: query
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.extra_links import extra_links_router
from airflow.api_fastapi.core_api.routes.public.hitl import hitl_router
from airflow.api_fastapi.core_api.routes.public.hitl import task_instances_hitl_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.job import job_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
Expand Down Expand Up @@ -84,7 +84,7 @@
authenticated_router.include_router(dag_parsing_router)
authenticated_router.include_router(dag_tags_router)
authenticated_router.include_router(dag_versions_router)
authenticated_router.include_router(hitl_router)
authenticated_router.include_router(task_instances_hitl_router)


# Include authenticated router in public router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
QueryHITLDetailBodySearch,
QueryHITLDetailDagIdFilter,
QueryHITLDetailDagIdPatternSearch,
QueryHITLDetailDagRunIdFilter,
QueryHITLDetailRespondedUserIdFilter,
QueryHITLDetailRespondedUserNameFilter,
QueryHITLDetailResponseReceivedFilter,
Expand All @@ -56,7 +54,11 @@
from airflow.models.hitl import HITLDetail as HITLDetailModel, HITLUser
from airflow.models.taskinstance import TaskInstance as TI

hitl_router = AirflowRouter(tags=["HumanInTheLoop"], prefix="/hitlDetails")
task_instances_hitl_router = AirflowRouter(
tags=["Task Instance"],
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}",
)
task_instance_hitl_path = "/taskInstances/{task_id}/{map_index}/hitlDetails"

log = structlog.get_logger(__name__)

Expand Down Expand Up @@ -100,8 +102,8 @@ def _get_task_instance_with_hitl_detail(
return task_instance


@hitl_router.patch(
"/{dag_id}/{dag_run_id}/{task_id}",
@task_instances_hitl_router.patch(
task_instance_hitl_path,
responses=create_openapi_http_exception_doc(
[
status.HTTP_403_FORBIDDEN,
Expand Down Expand Up @@ -165,8 +167,8 @@ def update_hitl_detail(
return HITLDetailResponse.model_validate(hitl_detail_model)


@hitl_router.get(
"/{dag_id}/{dag_run_id}/{task_id}",
@task_instances_hitl_router.get(
task_instance_hitl_path,
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.HITL_DETAIL))],
Expand All @@ -189,12 +191,14 @@ def get_hitl_detail(
return task_instance.hitl_detail


@hitl_router.get(
"/",
@task_instances_hitl_router.get(
"/hitlDetails",
status_code=status.HTTP_200_OK,
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.HITL_DETAIL))],
)
def get_hitl_details(
dag_id: str,
dag_run_id: str,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand All @@ -220,9 +224,7 @@ def get_hitl_details(
session: SessionDep,
# ti related filter
readable_ti_filter: ReadableTIFilterDep,
dag_id: QueryHITLDetailDagIdFilter,
dag_id_pattern: QueryHITLDetailDagIdPatternSearch,
dag_run_id: QueryHITLDetailDagRunIdFilter,
task_id: QueryHITLDetailTaskIdFilter,
task_id_pattern: QueryHITLDetailTaskIdPatternSearch,
ti_state: QueryTIStateFilter,
Expand All @@ -244,14 +246,16 @@ def get_hitl_details(
)
)
)
if dag_id != "~":
query = query.where(TI.dag_id == dag_id)
if dag_run_id != "~":
query = query.where(TI.run_id == dag_run_id)
hitl_detail_select, total_entries = paginated_select(
statement=query,
filters=[
# ti related filter
readable_ti_filter,
dag_id,
dag_id_pattern,
dag_run_id,
task_id,
task_id_pattern,
ti_state,
Expand Down
60 changes: 30 additions & 30 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// generated with @7nohe/openapi-react-query-codegen@1.6.2

import { UseQueryResult } from "@tanstack/react-query";
import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, HumanInTheLoopService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen";
import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagReportService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, VariableService, VersionService, XcomService } from "../requests/services.gen";
import { DagRunState, DagWarningType } from "../requests/types.gen";
export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof AssetService.getAssets>>;
export type AssetServiceGetAssetsQueryResult<TData = AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
Expand Down Expand Up @@ -553,6 +553,34 @@ export const UseTaskInstanceServiceGetExternalLogUrlKeyFn = ({ dagId, dagRunId,
taskId: string;
tryNumber: number;
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetExternalLogUrlKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, tryNumber }])];
export type TaskInstanceServiceGetHitlDetailDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getHitlDetail>>;
export type TaskInstanceServiceGetHitlDetailQueryResult<TData = TaskInstanceServiceGetHitlDetailDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetHitlDetailKey = "TaskInstanceServiceGetHitlDetail";
export const UseTaskInstanceServiceGetHitlDetailKeyFn = ({ dagId, dagRunId, mapIndex, taskId }: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetHitlDetailKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }])];
export type TaskInstanceServiceGetHitlDetailsDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getHitlDetails>>;
export type TaskInstanceServiceGetHitlDetailsQueryResult<TData = TaskInstanceServiceGetHitlDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetHitlDetailsKey = "TaskInstanceServiceGetHitlDetails";
export const UseTaskInstanceServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId: string;
dagIdPattern?: string;
dagRunId: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedByUserId?: string[];
respondedByUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }])];
export type ImportErrorServiceGetImportErrorDefaultResponse = Awaited<ReturnType<typeof ImportErrorService.getImportError>>;
export type ImportErrorServiceGetImportErrorQueryResult<TData = ImportErrorServiceGetImportErrorDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useImportErrorServiceGetImportErrorKey = "ImportErrorServiceGetImportError";
Expand Down Expand Up @@ -706,34 +734,6 @@ export const UseDagVersionServiceGetDagVersionsKeyFn = ({ bundleName, bundleVers
orderBy?: string[];
versionNumber?: number;
}, queryKey?: Array<unknown>) => [useDagVersionServiceGetDagVersionsKey, ...(queryKey ?? [{ bundleName, bundleVersion, dagId, limit, offset, orderBy, versionNumber }])];
export type HumanInTheLoopServiceGetHitlDetailDefaultResponse = Awaited<ReturnType<typeof HumanInTheLoopService.getHitlDetail>>;
export type HumanInTheLoopServiceGetHitlDetailQueryResult<TData = HumanInTheLoopServiceGetHitlDetailDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useHumanInTheLoopServiceGetHitlDetailKey = "HumanInTheLoopServiceGetHitlDetail";
export const UseHumanInTheLoopServiceGetHitlDetailKeyFn = ({ dagId, dagRunId, mapIndex, taskId }: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
}, queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }])];
export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited<ReturnType<typeof HumanInTheLoopService.getHitlDetails>>;
export type HumanInTheLoopServiceGetHitlDetailsQueryResult<TData = HumanInTheLoopServiceGetHitlDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails";
export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: {
bodySearch?: string;
dagId?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string[];
respondedByUserId?: string[];
respondedByUserName?: string[];
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
taskId?: string;
taskIdPattern?: string;
} = {}, queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<ReturnType<typeof MonitorService.getHealth>>;
export type MonitorServiceGetHealthQueryResult<TData = MonitorServiceGetHealthDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth";
Expand Down Expand Up @@ -871,12 +871,12 @@ export type TaskInstanceServicePatchTaskInstanceByMapIndexMutationResult = Await
export type TaskInstanceServiceBulkTaskInstancesMutationResult = Awaited<ReturnType<typeof TaskInstanceService.bulkTaskInstances>>;
export type TaskInstanceServicePatchTaskInstanceDryRunByMapIndexMutationResult = Awaited<ReturnType<typeof TaskInstanceService.patchTaskInstanceDryRunByMapIndex>>;
export type TaskInstanceServicePatchTaskInstanceDryRunMutationResult = Awaited<ReturnType<typeof TaskInstanceService.patchTaskInstanceDryRun>>;
export type TaskInstanceServiceUpdateHitlDetailMutationResult = Awaited<ReturnType<typeof TaskInstanceService.updateHitlDetail>>;
export type PoolServicePatchPoolMutationResult = Awaited<ReturnType<typeof PoolService.patchPool>>;
export type PoolServiceBulkPoolsMutationResult = Awaited<ReturnType<typeof PoolService.bulkPools>>;
export type XcomServiceUpdateXcomEntryMutationResult = Awaited<ReturnType<typeof XcomService.updateXcomEntry>>;
export type VariableServicePatchVariableMutationResult = Awaited<ReturnType<typeof VariableService.patchVariable>>;
export type VariableServiceBulkVariablesMutationResult = Awaited<ReturnType<typeof VariableService.bulkVariables>>;
export type HumanInTheLoopServiceUpdateHitlDetailMutationResult = Awaited<ReturnType<typeof HumanInTheLoopService.updateHitlDetail>>;
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<ReturnType<typeof AssetService.deleteAssetQueuedEvents>>;
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>>;
export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>>;
Expand Down
Loading