Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.models.dag_favorite import DagFavorite
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
from airflow.models.hitl import HITLDetail
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance
from airflow.models.variable import Variable
Expand Down Expand Up @@ -750,3 +751,64 @@ def _optional_boolean(value: bool | None) -> bool | None:
QueryConnectionIdPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(Connection.conn_id, "connection_id_pattern"))
]

# Human in the loop
QueryHITLDetailDagIdPatternSearch = Annotated[
_SearchParam,
Depends(
search_param_factory(
TaskInstance.dag_id,
"dag_id_pattern",
)
),
]
QueryHITLDetailDagRunIdFilter = Annotated[
FilterParam[str],
Depends(
filter_param_factory(
TaskInstance.run_id,
str,
filter_name="dag_run_id",
),
),
]
QueryHITLDetailSubjectSearch = Annotated[
_SearchParam,
Depends(
search_param_factory(
HITLDetail.subject,
"subject_search",
)
),
]
QueryHITLDetailBodySearch = Annotated[
_SearchParam,
Depends(
search_param_factory(
HITLDetail.body,
"body_search",
)
),
]
QueryHITLDetailResponseReceivedFilter = Annotated[
FilterParam[bool | None],
Depends(
filter_param_factory(
HITLDetail.response_received,
bool | None,
filter_name="response_received",
)
),
]
QueryHITLDetailUserIdFilter = Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
HITLDetail.user_id,
list[str],
FilterOptionEnum.ANY_EQUAL,
default_factory=list,
filter_name="user_id",
)
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -7505,6 +7505,99 @@ paths:
summary: Get Hitl Details
description: Get Human-in-the-loop details.
operationId: get_hitl_details
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 50
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: ti_id
title: Order By
- name: dag_id_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Dag Id Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: dag_run_id
in: query
required: false
schema:
type: string
title: Dag Run Id
- name: state
in: query
required: false
schema:
type: array
items:
type: string
title: State
- name: response_received
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
title: Response Received
- name: user_id
in: query
required: false
schema:
type: array
items:
type: string
title: User Id
- name: subject_search
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Subject Search
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: body_search
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Body Search
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
responses:
'200':
description: Successful Response
Expand All @@ -7513,20 +7606,23 @@ paths:
schema:
$ref: '#/components/schemas/HITLDetailCollection'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/monitor/health:
get:
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@
# under the License.
from __future__ import annotations

from typing import Annotated

import structlog
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import joinedload

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
QueryHITLDetailBodySearch,
QueryHITLDetailDagIdPatternSearch,
QueryHITLDetailDagRunIdFilter,
QueryHITLDetailResponseReceivedFilter,
QueryHITLDetailSubjectSearch,
QueryHITLDetailUserIdFilter,
QueryLimit,
QueryOffset,
QueryTIStateFilter,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.hitl import (
HITLDetail,
Expand Down Expand Up @@ -262,17 +276,67 @@ def get_mapped_ti_hitl_detail(
dependencies=[Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def get_hitl_details(
readable_ti_filter: ReadableTIFilterDep,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"ti_id",
"subject",
"response_at",
"task_instance.dag_id",
"task_instance.run_id",
],
HITLDetailModel,
to_replace={
"dag_id": TI.dag_id,
"run_id": TI.run_id,
},
).dynamic_depends(),
),
],
session: SessionDep,
# ti related filter
readable_ti_filter: ReadableTIFilterDep,
dag_id_pattern: QueryHITLDetailDagIdPatternSearch,
dag_run_id: QueryHITLDetailDagRunIdFilter,
ti_state: QueryTIStateFilter,
# hitl detail related filter
response_received: QueryHITLDetailResponseReceivedFilter,
user_id: QueryHITLDetailUserIdFilter,
subject_patten: QueryHITLDetailSubjectSearch,
body_patten: QueryHITLDetailBodySearch,
) -> HITLDetailCollection:
"""Get Human-in-the-loop details."""
query = select(HITLDetailModel).join(TI, HITLDetailModel.ti_id == TI.id)
query = (
select(HITLDetailModel)
.join(TI, HITLDetailModel.ti_id == TI.id)
.options(joinedload(HITLDetailModel.task_instance))
)
hitl_detail_select, total_entries = paginated_select(
statement=query,
filters=[readable_ti_filter],
filters=[
# ti related filter
readable_ti_filter,
dag_id_pattern,
dag_run_id,
ti_state,
# hitl detail related filter
response_received,
user_id,
subject_patten,
body_patten,
],
offset=offset,
limit=limit,
order_by=order_by,
session=session,
)

hitl_details = session.scalars(hitl_detail_select)

return HITLDetailCollection(
hitl_details=hitl_details,
total_entries=total_entries,
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/models/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ class HITLDetail(Base):
@hybrid_property
def response_received(self) -> bool:
return self.response_at is not None

@response_received.expression # type: ignore[no-redef]
def response_received(cls):
return cls.response_at.is_not(None)
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,18 @@ export const UseHumanInTheLoopServiceGetMappedTiHitlDetailKeyFn = ({ dagId, dagR
export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited<ReturnType<typeof HumanInTheLoopService.getHitlDetails>>;
export type HumanInTheLoopServiceGetHitlDetailsQueryResult<TData = HumanInTheLoopServiceGetHitlDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails";
export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = (queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [])];
export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: {
bodySearch?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string;
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
userId?: string[];
} = {}, queryKey?: Array<unknown>) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<ReturnType<typeof MonitorService.getHealth>>;
export type MonitorServiceGetHealthQueryResult<TData = MonitorServiceGetHealthDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,10 +1206,32 @@ export const ensureUseHumanInTheLoopServiceGetMappedTiHitlDetailData = (queryCli
/**
* Get Hitl Details
* Get Human-in-the-loop details.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.dagRunId
* @param data.state
* @param data.responseReceived
* @param data.userId
* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns HITLDetailCollection Successful Response
* @throws ApiError
*/
export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(), queryFn: () => HumanInTheLoopService.getHitlDetails() });
export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: {
bodySearch?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string;
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
userId?: string[];
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) });
/**
* Get Health
* @returns HealthInfoResponse Successful Response
Expand Down
24 changes: 23 additions & 1 deletion airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1206,10 +1206,32 @@ export const prefetchUseHumanInTheLoopServiceGetMappedTiHitlDetail = (queryClien
/**
* Get Hitl Details
* Get Human-in-the-loop details.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.dagRunId
* @param data.state
* @param data.responseReceived
* @param data.userId
* @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns HITLDetailCollection Successful Response
* @throws ApiError
*/
export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn(), queryFn: () => HumanInTheLoopService.getHitlDetails() });
export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }: {
bodySearch?: string;
dagIdPattern?: string;
dagRunId?: string;
limit?: number;
offset?: number;
orderBy?: string;
responseReceived?: boolean;
state?: string[];
subjectSearch?: string;
userId?: string[];
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagIdPattern, dagRunId, limit, offset, orderBy, responseReceived, state, subjectSearch, userId }) });
/**
* Get Health
* @returns HealthInfoResponse Successful Response
Expand Down
Loading