Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pydantic import Field, field_validator

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.task_instance_history import TaskInstanceHistoryResponse
from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse


Expand Down Expand Up @@ -101,3 +102,9 @@ class HITLDetailCollection(BaseModel):

hitl_details: Iterable[HITLDetail]
total_entries: int


class HITLDetailHistory(BaseHITLDetail):
"""Schema for Human-in-the-loop detail history."""

task_instance: TaskInstanceHistoryResponse
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,9 @@

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.api_fastapi.core_api.datamodels.hitl import BaseHITLDetail
from airflow.utils.state import TaskInstanceState


class HITLDetailHistory(BaseHITLDetail):
"""Schema for Human-in-the-loop detail history."""


class TaskInstanceHistoryResponse(BaseModel):
"""TaskInstanceHistory serializer for responses."""

Expand Down Expand Up @@ -67,7 +62,6 @@ class TaskInstanceHistoryResponse(BaseModel):
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
dag_version: DagVersionResponse | None
hitl_detail: HITLDetailHistory | None


class TaskInstanceHistoryCollectionResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8442,6 +8442,80 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/hitlDetails/tries/{try_number}:
get:
tags:
- Task Instance
summary: Get Hitl Detail Try Detail
description: Get a Human-in-the-loop detail of a specific task instance.
operationId: get_hitl_detail_try_detail
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: map_index
in: path
required: true
schema:
type: integer
title: Map Index
- name: try_number
in: path
required: true
schema:
anyOf:
- type: integer
- type: 'null'
title: Try Number
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/HITLDetailHistory'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/hitlDetails:
get:
tags:
Expand Down Expand Up @@ -11476,11 +11550,14 @@ components:
type: boolean
title: Response Received
default: false
task_instance:
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
type: object
required:
- options
- subject
- created_at
- task_instance
title: HITLDetailHistory
description: Schema for Human-in-the-loop detail history.
HITLDetailResponse:
Expand Down Expand Up @@ -12375,10 +12452,6 @@ components:
anyOf:
- $ref: '#/components/schemas/DagVersionResponse'
- type: 'null'
hitl_detail:
anyOf:
- $ref: '#/components/schemas/HITLDetailHistory'
- type: 'null'
type: object
required:
- task_id
Expand Down Expand Up @@ -12407,7 +12480,6 @@ components:
- executor
- executor_config
- dag_version
- hitl_detail
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.api_fastapi.core_api.datamodels.hitl import (
HITLDetail,
HITLDetailCollection,
HITLDetailHistory,
HITLDetailResponse,
UpdateHITLDetailPayload,
)
Expand All @@ -58,10 +59,12 @@
requires_access_dag,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.models.base import Base
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
from airflow.models.hitl import HITLDetail as HITLDetailModel, HITLUser
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH

task_instances_hitl_router = AirflowRouter(
tags=["Task Instance"],
Expand All @@ -78,22 +81,32 @@ def _get_task_instance_with_hitl_detail(
task_id: str,
session: SessionDep,
map_index: int,
) -> TI:
query = (
select(TI)
.where(
TI.dag_id == dag_id,
TI.run_id == dag_run_id,
TI.task_id == task_id,
try_number: int | None = None,
) -> TI | TIH:
def _query(orm_object: Base) -> TI | TIH | None:
query = (
select(orm_object)
.where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.map_index == map_index,
)
.options(joinedload(orm_object.hitl_detail))
)
.options(joinedload(TI.hitl_detail))
)

if map_index is not None:
query = query.where(TI.map_index == map_index)
if try_number is not None:
query = query.where(orm_object.try_number == try_number)

task_instance = session.scalar(query)
if task_instance is None:
ti_or_tih = session.scalar(query)
return ti_or_tih

if try_number is None:
ti_or_tih = _query(TI)
else:
ti_or_tih = _query(TIH) or _query(TI)

if ti_or_tih is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
Expand All @@ -102,13 +115,13 @@ def _get_task_instance_with_hitl_detail(
),
)

if not task_instance.hitl_detail:
if not ti_or_tih.hitl_detail:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Human-in-the-loop detail does not exist for Task Instance with id {task_instance.id}",
detail=f"Human-in-the-loop detail does not exist for Task Instance with id {ti_or_tih.id}",
)

return task_instance
return ti_or_tih


@task_instances_hitl_router.patch(
Expand Down Expand Up @@ -198,10 +211,37 @@ def get_hitl_detail(
task_id=task_id,
session=session,
map_index=map_index,
try_number=None,
)
return task_instance.hitl_detail


@task_instances_hitl_router.get(
task_instance_hitl_path + "/tries/{try_number}",
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.HITL_DETAIL))],
)
def get_hitl_detail_try_detail(
dag_id: str,
dag_run_id: str,
task_id: str,
session: SessionDep,
map_index: int = -1,
try_number: int | None = None,
) -> HITLDetailHistory:
"""Get a Human-in-the-loop detail of a specific task instance."""
task_instance_history = _get_task_instance_with_hitl_detail(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
session=session,
map_index=map_index,
try_number=try_number,
)
return task_instance_history.hitl_detail


@task_instances_hitl_router.get(
"/hitlDetails",
status_code=status.HTTP_200_OK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,28 +669,24 @@ def get_task_instance_try_details(
"""Get task instance details by try number."""

def _query(orm_object: Base) -> TI | TIH | None:
query = (
select(orm_object)
.where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
)
.options(joinedload(orm_object.hitl_detail))
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
)

task_instance = session.scalar(query)
return task_instance
ti_or_tih = session.scalar(query)
return ti_or_tih

result = _query(TI) or _query(TIH)
if result is None:
ti_or_tih = _query(TI) or _query(TIH)
if ti_or_tih is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found",
)
return result
return ti_or_tih


@task_instances_router.get(
Expand Down
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,16 @@ export const UseTaskInstanceServiceGetHitlDetailKeyFn = ({ dagId, dagRunId, mapI
mapIndex: number;
taskId: string;
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetHitlDetailKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }])];
export type TaskInstanceServiceGetHitlDetailTryDetailDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getHitlDetailTryDetail>>;
export type TaskInstanceServiceGetHitlDetailTryDetailQueryResult<TData = TaskInstanceServiceGetHitlDetailTryDetailDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetHitlDetailTryDetailKey = "TaskInstanceServiceGetHitlDetailTryDetail";
export const UseTaskInstanceServiceGetHitlDetailTryDetailKeyFn = ({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
tryNumber: number;
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetHitlDetailTryDetailKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, tryNumber }])];
export type TaskInstanceServiceGetHitlDetailsDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getHitlDetails>>;
export type TaskInstanceServiceGetHitlDetailsQueryResult<TData = TaskInstanceServiceGetHitlDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetHitlDetailsKey = "TaskInstanceServiceGetHitlDetails";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,25 @@ export const ensureUseTaskInstanceServiceGetHitlDetailData = (queryClient: Query
taskId: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetHitlDetailKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => TaskInstanceService.getHitlDetail({ dagId, dagRunId, mapIndex, taskId }) });
/**
* Get Hitl Detail Try Detail
* Get a Human-in-the-loop detail of a specific task instance.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @param data.tryNumber
* @returns HITLDetailHistory Successful Response
* @throws ApiError
*/
export const ensureUseTaskInstanceServiceGetHitlDetailTryDetailData = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
tryNumber: number;
}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetHitlDetailTryDetailKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => TaskInstanceService.getHitlDetailTryDetail({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
/**
* Get Hitl Details
* Get Human-in-the-loop details.
* @param data The data for the request.
Expand Down
19 changes: 19 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,25 @@ export const prefetchUseTaskInstanceServiceGetHitlDetail = (queryClient: QueryCl
taskId: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetHitlDetailKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => TaskInstanceService.getHitlDetail({ dagId, dagRunId, mapIndex, taskId }) });
/**
* Get Hitl Detail Try Detail
* Get a Human-in-the-loop detail of a specific task instance.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.mapIndex
* @param data.tryNumber
* @returns HITLDetailHistory Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetHitlDetailTryDetail = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
dagId: string;
dagRunId: string;
mapIndex: number;
taskId: string;
tryNumber: number;
}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetHitlDetailTryDetailKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => TaskInstanceService.getHitlDetailTryDetail({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
/**
* Get Hitl Details
* Get Human-in-the-loop details.
* @param data The data for the request.
Expand Down
Loading
Loading