diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index ee840ec21f2d0..b6b5a4a9270c4 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -c05cefbe080ed889ebe132a0285756db477ff28256bbc1e86da1e053873f6478 \ No newline at end of file +e491b0c58188f06ab4696cc09c765413065069f90d78e27eb53fdac5e2e92c82 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index a129862ad113f..f08f88ff3f632 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -1437,72 +1437,68 @@ hitl_detail - -hitl_detail - -ti_id - - [UUID] - NOT NULL - -body - - [TEXT] - -chosen_options - - [JSON] - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_user_id - - [VARCHAR(128)] - -responded_user_name - - [VARCHAR(128)] - -respondents - - [JSON] - -response_at - - [TIMESTAMP] - -subject - - [TEXT] - NOT NULL + +hitl_detail + +ti_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_by + + [JSON] + +response_at + + [TIMESTAMP] + +subject + + [TEXT] + NOT NULL task_instance--hitl_detail - -1 -1 + +1 +1 diff --git a/airflow-core/docs/tutorial/hitl.rst b/airflow-core/docs/tutorial/hitl.rst index b104bc7d03366..a84ef73559879 100644 --- a/airflow-core/docs/tutorial/hitl.rst +++ b/airflow-core/docs/tutorial/hitl.rst @@ -95,6 +95,9 @@ Approval or Rejection --------------------- A specialized form of option selection, which has only 'Approval' and 'Rejection' as options. +You can also set the ``assigned_users`` to restrict the users allowed to respond for a HITL operator. +It should be a list of user ids and user names (both needed) (e.g., ``[{"id": "1", "name": "user1"}, {"id": "2", "name": "user2"}]``. +ONLY the users within this list will be allowed to respond. .. exampleinclude:: /../../providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py :language: python diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index f1d876f29b631..b51e6688d66c2 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -1022,15 +1022,16 @@ def _optional_boolean(value: bool | None) -> bool | None: ) ), ] + QueryHITLDetailRespondedUserIdFilter = Annotated[ FilterParam[list[str]], Depends( filter_param_factory( - HITLDetail.responded_user_id, + HITLDetail.responded_by_user_id, list[str], FilterOptionEnum.ANY_EQUAL, default_factory=list, - filter_name="responded_user_id", + filter_name="responded_by_user_id", ) ), ] @@ -1038,11 +1039,11 @@ def _optional_boolean(value: bool | None) -> bool | None: FilterParam[list[str]], Depends( filter_param_factory( - HITLDetail.responded_user_name, + HITLDetail.responded_by_user_name, list[str], FilterOptionEnum.ANY_EQUAL, default_factory=list, - filter_name="responded_user_name", + filter_name="responded_by_user_name", ) ), ] diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py index ea0f045539ee6..24a18821f132c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py @@ -37,13 +37,19 @@ class UpdateHITLDetailPayload(BaseModel): class HITLDetailResponse(BaseModel): """Response of updating a Human-in-the-loop detail.""" - responded_user_id: str - responded_user_name: str + responded_by: HITLUser response_at: datetime chosen_options: list[str] = Field(min_length=1) params_input: Mapping = Field(default_factory=dict) +class HITLUser(BaseModel): + """Schema for a Human-in-the-loop users.""" + + id: str + name: str + + class HITLDetail(BaseModel): """Schema for Human-in-the-loop detail.""" @@ -56,11 +62,10 @@ class HITLDetail(BaseModel): defaults: list[str] | None = None multiple: bool = False params: dict[str, Any] = Field(default_factory=dict) - respondents: list[str] | None = None + assigned_users: list[HITLUser] = Field(default_factory=list) # Response Content Detail - responded_user_id: str | None = None - responded_user_name: str | None = None + responded_by_user: HITLUser | None = None response_at: datetime | None = None chosen_options: list[str] | None = None params_input: dict[str, Any] = Field(default_factory=dict) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index a4d85b16dd3c0..383b9fca62f56 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1994,23 +1994,15 @@ components: additionalProperties: true type: object title: Params - respondents: - anyOf: - - items: - type: string - type: array - - type: 'null' - title: Respondents - responded_user_id: - anyOf: - - type: string - - type: 'null' - title: Responded User Id - responded_user_name: + assigned_users: + items: + $ref: '#/components/schemas/HITLUser' + type: array + title: Assigned Users + responded_by_user: anyOf: - - type: string + - $ref: '#/components/schemas/HITLUser' - type: 'null' - title: Responded User Name response_at: anyOf: - type: string @@ -2039,6 +2031,20 @@ components: - subject title: HITLDetail description: Schema for Human-in-the-loop detail. + HITLUser: + properties: + id: + type: string + title: Id + name: + type: string + title: Name + type: object + required: + - id + - name + title: HITLUser + description: Schema for a Human-in-the-loop users. HTTPExceptionResponse: properties: detail: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 7c23f712f1c7a..b1b5faf649391 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -8199,22 +8199,22 @@ paths: - type: boolean - type: 'null' title: Response Received - - name: responded_user_id + - name: responded_by_user_id in: query required: false schema: type: array items: type: string - title: Responded User Id - - name: responded_user_name + title: Responded By User Id + - name: responded_by_user_name in: query required: false schema: type: array items: type: string - title: Responded User Name + title: Responded By User Name - name: subject_search in: query required: false @@ -10855,23 +10855,15 @@ components: additionalProperties: true type: object title: Params - respondents: - anyOf: - - items: - type: string - type: array - - type: 'null' - title: Respondents - responded_user_id: - anyOf: - - type: string - - type: 'null' - title: Responded User Id - responded_user_name: + assigned_users: + items: + $ref: '#/components/schemas/HITLUser' + type: array + title: Assigned Users + responded_by_user: anyOf: - - type: string + - $ref: '#/components/schemas/HITLUser' - type: 'null' - title: Responded User Name response_at: anyOf: - type: string @@ -10918,12 +10910,8 @@ components: description: Schema for a collection of Human-in-the-loop details. HITLDetailResponse: properties: - responded_user_id: - type: string - title: Responded User Id - responded_user_name: - type: string - title: Responded User Name + responded_by: + $ref: '#/components/schemas/HITLUser' response_at: type: string format: date-time @@ -10940,12 +10928,25 @@ components: title: Params Input type: object required: - - responded_user_id - - responded_user_name + - responded_by - response_at - chosen_options title: HITLDetailResponse description: Response of updating a Human-in-the-loop detail. + HITLUser: + properties: + id: + type: string + title: Id + name: + type: string + title: Name + type: object + required: + - id + - name + title: HITLUser + description: Schema for a Human-in-the-loop users. HTTPExceptionResponse: properties: detail: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index e04714e0a9137..5f39e18dd13e6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -53,7 +53,7 @@ from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag from airflow.api_fastapi.logging.decorators import action_logging from airflow.models.dagrun import DagRun -from airflow.models.hitl import HITLDetail as HITLDetailModel +from airflow.models.hitl import HITLDetail as HITLDetailModel, HITLUser from airflow.models.taskinstance import TaskInstance as TI hitl_router = AirflowRouter(tags=["HumanInTheLoop"], prefix="/hitlDetails") @@ -144,19 +144,19 @@ def update_hitl_detail( user_id = user.get_id() user_name = user.get_name() - if hitl_detail_model.respondents: - if isinstance(user_id, int): - # FabAuthManager (ab_user) store user id as integer, but common interface is string type - user_id = str(user_id) - if user_id not in hitl_detail_model.respondents: + if isinstance(user_id, int): + # FabAuthManager (ab_user) store user id as integer, but common interface is string type + user_id = str(user_id) + hitl_user = HITLUser(id=user_id, name=user_name) + if hitl_detail_model.assigned_users: + if hitl_user not in hitl_detail_model.assigned_users: log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id) raise HTTPException( status.HTTP_403_FORBIDDEN, f"User={user_name} (id={user_id}) is not a respondent for the task.", ) - hitl_detail_model.responded_user_id = user_id - hitl_detail_model.responded_user_name = user_name + hitl_detail_model.responded_by = hitl_user hitl_detail_model.response_at = timezone.utcnow() hitl_detail_model.chosen_options = update_hitl_detail_payload.chosen_options hitl_detail_model.params_input = update_hitl_detail_payload.params_input diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py index 4162d2cf7717c..a5f16eb881809 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py @@ -26,6 +26,13 @@ from airflow.models.hitl import HITLDetail +class HITLUser(BaseModel): + """Schema for a Human-in-the-loop users.""" + + id: str + name: str + + class HITLDetailRequest(BaseModel): """Schema for the request part of a Human-in-the-loop detail for a specific task instance.""" @@ -36,7 +43,7 @@ class HITLDetailRequest(BaseModel): defaults: list[str] | None = None multiple: bool = False params: dict[str, Any] = Field(default_factory=dict) - respondents: list[str] | None = None + assigned_users: list[HITLUser] = Field(default_factory=list) class UpdateHITLDetailPayload(BaseModel): @@ -51,8 +58,7 @@ class HITLDetailResponse(BaseModel): """Schema for the response part of a Human-in-the-loop detail for a specific task instance.""" response_received: bool - responded_user_name: str | None - responded_user_id: str | None + responded_by_user: HITLUser | None = None response_at: datetime | None # It's empty if the user has not yet responded. chosen_options: list[str] | None @@ -60,11 +66,19 @@ class HITLDetailResponse(BaseModel): @classmethod def from_hitl_detail_orm(cls, hitl_detail: HITLDetail) -> HITLDetailResponse: + hitl_user = ( + HITLUser( + id=hitl_detail.responded_by_user_id, + name=hitl_detail.responded_by_user_name, + ) + if hitl_detail.responded_by_user + else None + ) + return HITLDetailResponse( response_received=hitl_detail.response_received, response_at=hitl_detail.response_at, - responded_user_id=hitl_detail.responded_user_id, - responded_user_name=hitl_detail.responded_user_name, + responded_by_user=hitl_user, chosen_options=hitl_detail.chosen_options, params_input=hitl_detail.params_input or {}, ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py index c9efbda78f04a..8a6606bd28028 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py @@ -71,7 +71,7 @@ def upsert_hitl_detail( defaults=payload.defaults, multiple=payload.multiple, params=payload.params, - respondents=payload.respondents, + assignees=[user.model_dump() for user in payload.assigned_users], ) session.add(hitl_detail_model) elif hitl_detail_model.response_received: @@ -116,8 +116,7 @@ def update_hitl_detail( f"Human-in-the-loop detail for Task Instance with id {ti_id_str} already exists.", ) - hitl_detail_model.responded_user_id = HITLDetail.DEFAULT_USER_NAME - hitl_detail_model.responded_user_name = HITLDetail.DEFAULT_USER_NAME + hitl_detail_model.responded_by = HITLDetail.DEFAULT_USER hitl_detail_model.response_at = datetime.now(timezone.utc) hitl_detail_model.chosen_options = payload.chosen_options hitl_detail_model.params_input = payload.params_input diff --git a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py index 3e0ede0183ee1..60f870ff44ccd 100644 --- a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py +++ b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py @@ -59,10 +59,9 @@ def upgrade(): Column("defaults", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("multiple", Boolean, unique=False, default=False), Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), - Column("respondents", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("response_at", UtcDateTime, nullable=True), - Column("responded_user_id", String(128), nullable=True), - Column("responded_user_name", String(128), nullable=True), + Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), ForeignKeyConstraint( diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 448939e2c6f61..e2055344bc3ec 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -16,16 +16,68 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING, Any, TypedDict + import sqlalchemy_jsonfield -from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text +from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text, func, literal from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.compiler import compiles from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship +from sqlalchemy.sql.functions import FunctionElement from airflow.models.base import Base from airflow.settings import json from airflow.utils.sqlalchemy import UtcDateTime +if TYPE_CHECKING: + from sqlalchemy.sql import ColumnElement + from sqlalchemy.sql.compiler import SQLCompiler + + +class JSONExtract(FunctionElement): + """ + Cross-dialect JSON key extractor. + + :meta: private + """ + + type = String() + inherit_cache = True + + def __init__(self, column: ColumnElement[Any], key: str, **kwargs: dict[str, Any]) -> None: + super().__init__(column, literal(key), **kwargs) + + +@compiles(JSONExtract, "postgresql") +def compile_postgres(element: JSONExtract, compiler: SQLCompiler, **kwargs: dict[str, Any]) -> str: + """ + Compile JSONExtract for PostgreSQL. + + :meta: private + """ + column, key = element.clauses + return compiler.process(func.json_extract_path_text(column, key), **kwargs) + + +@compiles(JSONExtract, "sqlite") +@compiles(JSONExtract, "mysql") +def compile_sqlite_mysql(element: JSONExtract, compiler: SQLCompiler, **kwargs: dict[str, Any]) -> str: + """ + Compile JSONExtract for SQLite/MySQL. + + :meta: private + """ + column, key = element.clauses + return compiler.process(func.json_extract(column, f"$.{key.value}"), **kwargs) + + +class HITLUser(TypedDict): + """Typed dict for saving a Human-in-the-loop user information.""" + + id: str + name: str + class HITLDetail(Base): """Human-in-the-loop request and corresponding response.""" @@ -44,12 +96,11 @@ class HITLDetail(Base): defaults = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) multiple = Column(Boolean, unique=False, default=False) params = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) - respondents = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) # Response Content Detail response_at = Column(UtcDateTime, nullable=True) - responded_user_id = Column(String(128), nullable=True) - responded_user_name = Column(String(128), nullable=True) + responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) chosen_options = Column( sqlalchemy_jsonfield.JSONField(json=json), nullable=True, @@ -80,4 +131,44 @@ def response_received(self) -> bool: def response_received(cls): return cls.response_at.is_not(None) - DEFAULT_USER_NAME = "Fallback to defaults" + @hybrid_property + def responded_by_user_id(self) -> str | None: + return self.responded_by["id"] if self.responded_by else None + + @responded_by_user_id.expression # type: ignore[no-redef] + def responded_by_user_id(cls): + return JSONExtract(cls.responded_by, "id") + + @hybrid_property + def responded_by_user_name(self) -> str | None: + return self.responded_by["name"] if self.responded_by else None + + @responded_by_user_name.expression # type: ignore[no-redef] + def responded_by_user_name(cls): + return JSONExtract(cls.responded_by, "name") + + @hybrid_property + def assigned_users(self) -> list[HITLUser]: + if not self.assignees: + return [] + return [ + HITLUser( + id=assignee["id"], + name=assignee["name"], + ) + for assignee in self.assignees + ] + + @hybrid_property + def responded_by_user(self) -> HITLUser | None: + if self.responded_by is None: + return None + return HITLUser( + id=self.responded_by["id"], + name=self.responded_by["name"], + ) + + DEFAULT_USER = HITLUser( + id="Fallback to defaults", + name="Fallback to defaults", + ) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 457aac893eef9..d4ffa8098f9f0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -717,7 +717,7 @@ export const UseHumanInTheLoopServiceGetHitlDetailKeyFn = ({ dagId, dagRunId, ma export type HumanInTheLoopServiceGetHitlDetailsDefaultResponse = Awaited>; export type HumanInTheLoopServiceGetHitlDetailsQueryResult = UseQueryResult; export const useHumanInTheLoopServiceGetHitlDetailsKey = "HumanInTheLoopServiceGetHitlDetails"; -export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { +export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { bodySearch?: string; dagId?: string; dagIdPattern?: string; @@ -725,14 +725,14 @@ export const UseHumanInTheLoopServiceGetHitlDetailsKeyFn = ({ bodySearch, dagId, limit?: number; offset?: number; orderBy?: string[]; - respondedUserId?: string[]; - respondedUserName?: string[]; + respondedByUserId?: string[]; + respondedByUserName?: string[]; responseReceived?: boolean; state?: string[]; subjectSearch?: string; taskId?: string; taskIdPattern?: string; -} = {}, queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }])]; +} = {}, queryKey?: Array) => [useHumanInTheLoopServiceGetHitlDetailsKey, ...(queryKey ?? [{ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }])]; export type MonitorServiceGetHealthDefaultResponse = Awaited>; export type MonitorServiceGetHealthQueryResult = UseQueryResult; export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 75803561cfa32..923eae15e158e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1371,14 +1371,14 @@ export const ensureUseHumanInTheLoopServiceGetHitlDetailData = (queryClient: Que * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived -* @param data.respondedUserId -* @param data.respondedUserName +* @param data.respondedByUserId +* @param data.respondedByUserName * @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { +export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { bodySearch?: string; dagId?: string; dagIdPattern?: string; @@ -1386,14 +1386,14 @@ export const ensureUseHumanInTheLoopServiceGetHitlDetailsData = (queryClient: Qu limit?: number; offset?: number; orderBy?: string[]; - respondedUserId?: string[]; - respondedUserName?: string[]; + respondedByUserId?: string[]; + respondedByUserName?: string[]; responseReceived?: boolean; state?: string[]; subjectSearch?: string; taskId?: string; taskIdPattern?: string; -} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) }); +} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 180eb82621082..96408ad07c85b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1371,14 +1371,14 @@ export const prefetchUseHumanInTheLoopServiceGetHitlDetail = (queryClient: Query * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived -* @param data.respondedUserId -* @param data.respondedUserName +* @param data.respondedByUserId +* @param data.respondedByUserName * @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response * @throws ApiError */ -export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { +export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: QueryClient, { bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { bodySearch?: string; dagId?: string; dagIdPattern?: string; @@ -1386,14 +1386,14 @@ export const prefetchUseHumanInTheLoopServiceGetHitlDetails = (queryClient: Quer limit?: number; offset?: number; orderBy?: string[]; - respondedUserId?: string[]; - respondedUserName?: string[]; + respondedByUserId?: string[]; + respondedByUserName?: string[]; responseReceived?: boolean; state?: string[]; subjectSearch?: string; taskId?: string; taskIdPattern?: string; -} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) }); +} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 1b133994fecf5..346b4ae43a4fd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1371,14 +1371,14 @@ export const useHumanInTheLoopServiceGetHitlDetail = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { +export const useHumanInTheLoopServiceGetHitlDetails = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { bodySearch?: string; dagId?: string; dagIdPattern?: string; @@ -1386,14 +1386,14 @@ export const useHumanInTheLoopServiceGetHitlDetails = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) as TData, ...options }); +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 45fa755cb8d16..5d3d0f2f19c26 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1371,14 +1371,14 @@ export const useHumanInTheLoopServiceGetHitlDetailSuspense = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { +export const useHumanInTheLoopServiceGetHitlDetailsSuspense = = unknown[]>({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }: { bodySearch?: string; dagId?: string; dagIdPattern?: string; @@ -1386,14 +1386,14 @@ export const useHumanInTheLoopServiceGetHitlDetailsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedUserId, respondedUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) as TData, ...options }); +} = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseHumanInTheLoopServiceGetHitlDetailsKeyFn({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }, queryKey), queryFn: () => HumanInTheLoopService.getHitlDetails({ bodySearch, dagId, dagIdPattern, dagRunId, limit, offset, orderBy, respondedByUserId, respondedByUserName, responseReceived, state, subjectSearch, taskId, taskIdPattern }) as TData, ...options }); /** * Get Health * @returns HealthInfoResponse Successful Response diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 72795ca45515a..332d112cb7c1d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3571,41 +3571,22 @@ export const $HITLDetail = { type: 'object', title: 'Params' }, - respondents: { - anyOf: [ - { - items: { - type: 'string' - }, - type: 'array' - }, - { - type: 'null' - } - ], - title: 'Respondents' - }, - responded_user_id: { - anyOf: [ - { - type: 'string' - }, - { - type: 'null' - } - ], - title: 'Responded User Id' + assigned_users: { + items: { + '$ref': '#/components/schemas/HITLUser' + }, + type: 'array', + title: 'Assigned Users' }, - responded_user_name: { + responded_by_user: { anyOf: [ { - type: 'string' + '$ref': '#/components/schemas/HITLUser' }, { type: 'null' } - ], - title: 'Responded User Name' + ] }, response_at: { anyOf: [ @@ -3672,13 +3653,8 @@ export const $HITLDetailCollection = { export const $HITLDetailResponse = { properties: { - responded_user_id: { - type: 'string', - title: 'Responded User Id' - }, - responded_user_name: { - type: 'string', - title: 'Responded User Name' + responded_by: { + '$ref': '#/components/schemas/HITLUser' }, response_at: { type: 'string', @@ -3700,11 +3676,28 @@ export const $HITLDetailResponse = { } }, type: 'object', - required: ['responded_user_id', 'responded_user_name', 'response_at', 'chosen_options'], + required: ['responded_by', 'response_at', 'chosen_options'], title: 'HITLDetailResponse', description: 'Response of updating a Human-in-the-loop detail.' } as const; +export const $HITLUser = { + properties: { + id: { + type: 'string', + title: 'Id' + }, + name: { + type: 'string', + title: 'Name' + } + }, + type: 'object', + required: ['id', 'name'], + title: 'HITLUser', + description: 'Schema for a Human-in-the-loop users.' +} as const; + export const $HTTPExceptionResponse = { properties: { detail: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 1f074a4d5cc72..5f38c32a9d297 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3623,8 +3623,8 @@ export class HumanInTheLoopService { * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.state * @param data.responseReceived - * @param data.respondedUserId - * @param data.respondedUserName + * @param data.respondedByUserId + * @param data.respondedByUserName * @param data.subjectSearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @param data.bodySearch SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. * @returns HITLDetailCollection Successful Response @@ -3645,8 +3645,8 @@ export class HumanInTheLoopService { task_id_pattern: data.taskIdPattern, state: data.state, response_received: data.responseReceived, - responded_user_id: data.respondedUserId, - responded_user_name: data.respondedUserName, + responded_by_user_id: data.respondedByUserId, + responded_by_user_name: data.respondedByUserName, subject_search: data.subjectSearch, body_search: data.bodySearch }, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index c1320c1c63512..e88e743d5b5e0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -940,9 +940,8 @@ export type HITLDetail = { params?: { [key: string]: unknown; }; - respondents?: Array<(string)> | null; - responded_user_id?: string | null; - responded_user_name?: string | null; + assigned_users?: Array; + responded_by_user?: HITLUser | null; response_at?: string | null; chosen_options?: Array<(string)> | null; params_input?: { @@ -963,8 +962,7 @@ export type HITLDetailCollection = { * Response of updating a Human-in-the-loop detail. */ export type HITLDetailResponse = { - responded_user_id: string; - responded_user_name: string; + responded_by: HITLUser; response_at: string; chosen_options: Array<(string)>; params_input?: { @@ -972,6 +970,14 @@ export type HITLDetailResponse = { }; }; +/** + * Schema for a Human-in-the-loop users. + */ +export type HITLUser = { + id: string; + name: string; +}; + /** * HTTPException Model used for error response. */ @@ -3100,8 +3106,8 @@ export type GetHitlDetailsData = { limit?: number; offset?: number; orderBy?: Array<(string)>; - respondedUserId?: Array<(string)>; - respondedUserName?: Array<(string)>; + respondedByUserId?: Array<(string)>; + respondedByUserName?: Array<(string)>; responseReceived?: boolean | null; state?: Array<(string)>; /** diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 9dfd355992093..e20f9fabba45c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -31,6 +31,7 @@ from airflow._shared.timezones.timezone import utcnow from airflow.models.hitl import HITLDetail from airflow.models.log import Log +from airflow.sdk.execution_time.hitl import HITLUser from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: @@ -77,7 +78,7 @@ def sample_hitl_detail(sample_ti: TaskInstance, session: Session) -> HITLDetail: defaults=["Approve"], multiple=False, params={"input_1": 1}, - respondents=None, + assignees=None, ) session.add(hitl_detail_model) session.commit() @@ -95,7 +96,7 @@ def sample_hitl_detail_non_respondent(sample_ti: TaskInstance, session: Session) defaults=["Approve"], multiple=False, params={"input_1": 1}, - respondents=["non_test"], + assignees=[HITLUser(id="non_test", name="non_test")], ) session.add(hitl_detail_model) session.commit() @@ -113,7 +114,7 @@ def sample_hitl_detail_respondent(sample_ti: TaskInstance, session: Session) -> defaults=["Approve"], multiple=False, params={"input_1": 1}, - respondents=["test"], + assignees=[HITLUser(id="test", name="test")], ) session.add(hitl_detail_model) session.commit() @@ -173,8 +174,7 @@ def sample_hitl_details(sample_tis: list[TaskInstance], session: Session) -> lis response_at=utcnow(), chosen_options=[str(i)], params_input={"input": i}, - responded_user_id="test", - responded_user_name="test", + responded_by={"id": "test", "name": "test"}, ) for i, ti in enumerate(sample_tis[5:]) ] @@ -208,14 +208,13 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "multiple": False, "options": ["Approve", "Reject"], "params": {"input_1": 1}, - "respondents": None, + "assigned_users": [], "params_input": {}, "response_at": None, "chosen_options": None, "response_received": False, "subject": "This is subject", - "responded_user_id": None, - "responded_user_name": None, + "responded_by_user": None, "task_instance": { "dag_display_name": DAG_ID, "dag_id": DAG_ID, @@ -321,8 +320,7 @@ def test_should_respond_200_with_existing_response( assert response.json() == { "params_input": {"input_1": 2}, "chosen_options": ["Approve"], - "responded_user_id": "test", - "responded_user_name": "test", + "responded_by": {"id": "test", "name": "test"}, "response_at": "2025-07-03T00:00:00Z", } @@ -332,7 +330,7 @@ def test_should_respond_200_with_existing_response( @time_machine.travel(datetime(2025, 7, 3, 0, 0, 0), tick=False) @pytest.mark.usefixtures("sample_hitl_detail_respondent") @pytest.mark.parametrize("map_index", [None, -1]) - def test_should_respond_200_to_respondent_user( + def test_should_respond_200_to_assigned_users( self, test_client: TestClient, sample_ti_url_identifier: str, @@ -351,8 +349,7 @@ def test_should_respond_200_to_respondent_user( assert response.json() == { "params_input": {"input_1": 2}, "chosen_options": ["Approve"], - "responded_user_id": "test", - "responded_user_name": "test", + "responded_by": {"id": "test", "name": "test"}, "response_at": "2025-07-03T00:00:00Z", } @@ -453,8 +450,7 @@ def test_should_respond_409( expected_response = { "params_input": {"input_1": 2}, "chosen_options": ["Approve"], - "responded_user_id": "test", - "responded_user_name": "test", + "responded_by": {"id": "test", "name": "test"}, "response_at": "2025-07-03T00:00:00Z", } assert response.status_code == 200 @@ -579,8 +575,8 @@ def test_should_respond_200_with_existing_response( ({"body_search": "this is"}, 8), ({"response_received": False}, 5), ({"response_received": True}, 3), - ({"responded_user_id": ["test"]}, 3), - ({"responded_user_name": ["test"]}, 3), + ({"responded_by_user_id": ["test"]}, 3), + ({"responded_by_user_name": ["test"]}, 3), ], ids=[ "dag_id_pattern_hitl_dag", diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index f85d7bda49b0b..fa972d6384d1c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -160,7 +160,7 @@ def setup_hitl_data(self, create_task_instance: TaskInstance, session: Session): defaults=["Approve"], response_at=utcnow(), chosen_options=["Approve"], - responded_user_id="test", + responded_by={"id": "test", "name": "test"}, ) for i in range(3, 5) ] @@ -186,6 +186,7 @@ def setup_hitl_data(self, create_task_instance: TaskInstance, session: Session): "params": {}, "params_input": {}, "response_received": False, + "assigned_users": [], } for i in range(3) ], diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py index 085fc7ae6a8b3..764d5a48ccdc6 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py @@ -45,13 +45,12 @@ "defaults": ["Approve"], "multiple": False, "params": {"input_1": 1}, - "respondents": None, + "assignees": None, } expected_empty_hitl_detail_response_part: dict[str, Any] = { "response_at": None, "chosen_options": None, - "responded_user_id": None, - "responded_user_name": None, + "responded_by_user": None, "params_input": {}, "response_received": False, } @@ -94,8 +93,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "params_input": {"input_1": 2}, "response_at": convert_to_utc(datetime(2025, 7, 3, 0, 0, 0)), "chosen_options": ["Reject"], - "responded_user_id": "Fallback to defaults", - "responded_user_name": "Fallback to defaults", + "responded_by": {"id": "Fallback to defaults", "name": "Fallback to defaults"}, }, }, ], @@ -125,11 +123,15 @@ def test_upsert_hitl_detail( **default_hitl_detail_request_kwargs, }, ) - assert response.status_code == 201 - assert response.json() == { + + expected_json = { "ti_id": ti.id, **default_hitl_detail_request_kwargs, } + expected_json["assigned_users"] = expected_json.pop("assignees") or [] + + assert response.status_code == 201 + assert response.json() == expected_json def test_upsert_hitl_detail_with_empty_option( @@ -172,8 +174,10 @@ def test_update_hitl_detail(client: Client, sample_ti: TaskInstance) -> None: "response_at": "2025-07-03T00:00:00Z", "chosen_options": ["Reject"], "response_received": True, - "responded_user_id": "Fallback to defaults", - "responded_user_name": "Fallback to defaults", + "responded_by_user": { + "id": "Fallback to defaults", + "name": "Fallback to defaults", + }, } diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 37ecd597a5c88..ccad0f23ed898 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -525,16 +525,13 @@ class FastAPIRootMiddlewareResponse(BaseModel): name: Annotated[str, Field(title="Name")] -class HITLDetailResponse(BaseModel): +class HITLUser(BaseModel): """ - Response of updating a Human-in-the-loop detail. + Schema for a Human-in-the-loop users. """ - responded_user_id: Annotated[str, Field(title="Responded User Id")] - responded_user_name: Annotated[str, Field(title="Responded User Name")] - response_at: Annotated[datetime, Field(title="Response At")] - chosen_options: Annotated[list[str], Field(min_length=1, title="Chosen Options")] - params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + id: Annotated[str, Field(title="Id")] + name: Annotated[str, Field(title="Name")] class HTTPExceptionResponse(BaseModel): @@ -1434,6 +1431,17 @@ class EventLogCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class HITLDetailResponse(BaseModel): + """ + Response of updating a Human-in-the-loop detail. + """ + + responded_by: HITLUser + response_at: Annotated[datetime, Field(title="Response At")] + chosen_options: Annotated[list[str], Field(min_length=1, title="Chosen Options")] + params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + + class HTTPValidationError(BaseModel): detail: Annotated[list[ValidationError] | None, Field(title="Detail")] = None @@ -1828,9 +1836,8 @@ class HITLDetail(BaseModel): defaults: Annotated[list[str] | None, Field(title="Defaults")] = None multiple: Annotated[bool | None, Field(title="Multiple")] = False params: Annotated[dict[str, Any] | None, Field(title="Params")] = None - respondents: Annotated[list[str] | None, Field(title="Respondents")] = None - responded_user_id: Annotated[str | None, Field(title="Responded User Id")] = None - responded_user_name: Annotated[str | None, Field(title="Responded User Name")] = None + assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned Users")] = None + responded_by_user: HITLUser | None = None response_at: Annotated[datetime | None, Field(title="Response At")] = None chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py index dd7b4dc277fdd..44d77ba4959a2 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py @@ -140,6 +140,7 @@ def notify(self, context: Context) -> None: notifiers=[hitl_request_callback], on_success_callback=hitl_success_callback, on_failure_callback=hitl_failure_callback, + assigned_users=[{"id": "admin", "name": "admin"}], ) # [END howto_hitl_approval_operator] diff --git a/providers/standard/src/airflow/providers/standard/operators/hitl.py b/providers/standard/src/airflow/providers/standard/operators/hitl.py index 440abf9a516c4..f8e1c631ea63e 100644 --- a/providers/standard/src/airflow/providers/standard/operators/hitl.py +++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py @@ -41,6 +41,7 @@ if TYPE_CHECKING: from airflow.sdk.definitions.context import Context + from airflow.sdk.execution_time.hitl import HITLUser from airflow.sdk.types import RuntimeTaskInstanceProtocol @@ -70,7 +71,7 @@ def __init__( multiple: bool = False, params: ParamsDict | dict[str, Any] | None = None, notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None, - respondents: str | list[str] | None = None, + assigned_users: HITLUser | list[HITLUser] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -86,7 +87,7 @@ def __init__( self.notifiers: Sequence[BaseNotifier] = ( [notifiers] if isinstance(notifiers, BaseNotifier) else notifiers or [] ) - self.respondents = [respondents] if isinstance(respondents, str) else respondents + self.assigned_users = [assigned_users] if isinstance(assigned_users, dict) else assigned_users self.validate_options() self.validate_params() @@ -138,7 +139,7 @@ def execute(self, context: Context): defaults=self.defaults, multiple=self.multiple, params=self.serialized_params, - respondents=self.respondents, + assigned_users=self.assigned_users, ) if self.execution_timeout: @@ -178,6 +179,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any: return HITLTriggerEventSuccessPayload( chosen_options=chosen_options, params_input=params_input, + responded_by_user=event["responded_by_user"], ) def process_trigger_event_error(self, event: dict[str, Any]) -> None: diff --git a/providers/standard/src/airflow/providers/standard/triggers/hitl.py b/providers/standard/src/airflow/providers/standard/triggers/hitl.py index c5f32800058c2..a9299ae240756 100644 --- a/providers/standard/src/airflow/providers/standard/triggers/hitl.py +++ b/providers/standard/src/airflow/providers/standard/triggers/hitl.py @@ -25,12 +25,13 @@ import asyncio from collections.abc import AsyncIterator from datetime import datetime -from typing import Any, Literal, TypedDict +from typing import TYPE_CHECKING, Any, Literal, TypedDict from uuid import UUID from asgiref.sync import sync_to_async from airflow.sdk.execution_time.hitl import ( + HITLUser, get_hitl_detail_content_detail, update_hitl_detail_response, ) @@ -43,6 +44,7 @@ class HITLTriggerEventSuccessPayload(TypedDict, total=False): chosen_options: list[str] params_input: dict[str, Any] + responded_by_user: HITLUser timedout: bool @@ -100,12 +102,15 @@ async def run(self) -> AsyncIterator[TriggerEvent]: if self.timeout_datetime and self.timeout_datetime < utcnow(): # Fetch latest HITL detail before fallback resp = await sync_to_async(get_hitl_detail_content_detail)(ti_id=self.ti_id) + # Response already received, yield success and exit if resp.response_received and resp.chosen_options: - # Response already received, yield success and exit + if TYPE_CHECKING: + assert resp.responded_by_user is not None + self.log.info( "[HITL] responded_by=%s (id=%s) options=%s at %s (timeout fallback skipped)", - resp.responded_user_name, - resp.responded_user_id, + resp.responded_by_user.name, + resp.responded_by_user.id, resp.chosen_options, resp.response_at, ) @@ -113,6 +118,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: HITLTriggerEventSuccessPayload( chosen_options=resp.chosen_options, params_input=resp.params_input or {}, + responded_by_user=HITLUser( + id=resp.responded_by_user.id, + name=resp.responded_by_user.name, + ), timedout=False, ) ) @@ -139,6 +148,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: HITLTriggerEventSuccessPayload( chosen_options=self.defaults, params_input=self.params, + responded_by_user=HITLUser( + id="Fallback to defaults", + name="Fallback to defaults", + ), timedout=True, ) ) @@ -146,10 +159,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: resp = await sync_to_async(get_hitl_detail_content_detail)(ti_id=self.ti_id) if resp.response_received and resp.chosen_options: + if TYPE_CHECKING: + assert resp.responded_by_user is not None self.log.info( "[HITL] responded_by=%s (id=%s) options=%s at %s", - resp.responded_user_name, - resp.responded_user_id, + resp.responded_by_user.name, + resp.responded_by_user.id, resp.chosen_options, resp.response_at, ) @@ -157,6 +172,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]: HITLTriggerEventSuccessPayload( chosen_options=resp.chosen_options, params_input=resp.params_input or {}, + responded_by_user=HITLUser( + id=resp.responded_by_user.id, + name=resp.responded_by_user.name, + ), timedout=False, ) ) diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py index 18646ab118dd6..98d0bf66864b7 100644 --- a/providers/standard/tests/unit/standard/operators/test_hitl.py +++ b/providers/standard/tests/unit/standard/operators/test_hitl.py @@ -43,6 +43,7 @@ ) from airflow.sdk import Param, timezone from airflow.sdk.definitions.param import ParamsDict +from airflow.sdk.execution_time.hitl import HITLUser from tests_common.test_utils.config import conf_vars @@ -66,7 +67,7 @@ def hitl_task_and_ti_for_generating_link(dag_maker: DagMaker) -> tuple[HITLOpera options=["1", "2", "3", "4", "5"], body="This is body", defaults=["1"], - respondents="test", + assigned_users=HITLUser(id="test", name="test"), multiple=True, params=ParamsDict({"input_1": 1, "input_2": 2, "input_3": 3}), ) @@ -165,7 +166,7 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None: options=["1", "2", "3", "4", "5"], body="This is body", defaults=["1"], - respondents="test", + assigned_users=HITLUser(id="test", name="test"), multiple=False, params=ParamsDict({"input_1": 1}), notifiers=[notifier], @@ -181,10 +182,9 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None: assert hitl_detail_model.defaults == ["1"] assert hitl_detail_model.multiple is False assert hitl_detail_model.params == {"input_1": 1} - assert hitl_detail_model.respondents == ["test"] + assert hitl_detail_model.assignees == [{"id": "test", "name": "test"}] assert hitl_detail_model.response_at is None - assert hitl_detail_model.responded_user_id is None - assert hitl_detail_model.responded_user_name is None + assert hitl_detail_model.responded_by is None assert hitl_detail_model.chosen_options is None assert hitl_detail_model.params_input == {} @@ -232,7 +232,11 @@ def test_execute_complete(self) -> None: ret = hitl_op.execute_complete( context={}, - event={"chosen_options": ["1"], "params_input": {"input": 2}}, + event={ + "chosen_options": ["1"], + "params_input": {"input": 2}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) assert ret["chosen_options"] == ["1"] @@ -253,6 +257,7 @@ def test_validate_chosen_options_with_invalid_content(self) -> None: event={ "chosen_options": ["not exists"], "params_input": {"input": 2}, + "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -271,6 +276,7 @@ def test_validate_params_input_with_invalid_input(self) -> None: event={ "chosen_options": ["1"], "params_input": {"no such key": 2, "input": 333}, + "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -397,12 +403,17 @@ def test_execute_complete(self) -> None: ret = hitl_op.execute_complete( context={}, - event={"chosen_options": ["Approve"], "params_input": {}}, + event={ + "chosen_options": ["Approve"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) assert ret == { "chosen_options": ["Approve"], "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, } def test_execute_complete_with_downstream_tasks(self, dag_maker) -> None: @@ -419,7 +430,11 @@ def test_execute_complete_with_downstream_tasks(self, dag_maker) -> None: with pytest.raises(DownstreamTasksSkipped) as exc_info: hitl_op.execute_complete( context={"ti": ti, "task": ti.task}, - event={"chosen_options": ["Reject"], "params_input": {}}, + event={ + "chosen_options": ["Reject"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) assert set(exc_info.value.tasks) == {"op1"} @@ -480,6 +495,7 @@ def test_execute_complete(self, dag_maker) -> None: event={ "chosen_options": ["branch_1"], "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, }, ) assert set(exc_info.value.tasks) == set((f"branch_{i}", -1) for i in range(2, 6)) @@ -503,6 +519,7 @@ def test_execute_complete_with_multiple_branches(self, dag_maker) -> None: event={ "chosen_options": [f"branch_{i}" for i in range(1, 4)], "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, }, ) assert set(exc_info.value.tasks) == set((f"branch_{i}", -1) for i in range(4, 6)) @@ -524,7 +541,11 @@ def test_mapping_applies_for_single_choice(self, dag_maker): with pytest.raises(DownstreamTasksSkipped) as exc: op.execute_complete( context={"ti": ti, "task": ti.task}, - event={"chosen_options": ["Approve"], "params_input": {}}, + event={ + "chosen_options": ["Approve"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) # checks to see that the "archive" task was skipped assert set(exc.value.tasks) == {("archive", -1)} @@ -551,7 +572,11 @@ def test_mapping_with_multiple_choices(self, dag_maker): with pytest.raises(DownstreamTasksSkipped) as exc: op.execute_complete( context={"ti": ti, "task": ti.task}, - event={"chosen_options": ["Approve", "KeepAsIs"], "params_input": {}}, + event={ + "chosen_options": ["Approve", "KeepAsIs"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) # publish + keep chosen → only "other" skipped assert set(exc.value.tasks) == {("other", -1)} @@ -572,7 +597,11 @@ def test_fallback_to_option_when_not_mapped(self, dag_maker): with pytest.raises(DownstreamTasksSkipped) as exc: op.execute_complete( context={"ti": ti, "task": ti.task}, - event={"chosen_options": ["branch_2"], "params_input": {}}, + event={ + "chosen_options": ["branch_2"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) assert set(exc.value.tasks) == {("branch_1", -1)} @@ -593,7 +622,11 @@ def test_error_if_mapped_branch_not_direct_downstream(self, dag_maker): with pytest.raises(AirflowException, match="downstream|not found"): op.execute_complete( context={"ti": ti, "task": ti.task}, - event={"chosen_options": ["Approve"], "params_input": {}}, + event={ + "chosen_options": ["Approve"], + "params_input": {}, + "responded_by_user": {"id": "test", "name": "test"}, + }, ) @pytest.mark.parametrize("bad", [123, ["publish"], {"x": "y"}, b"publish"]) diff --git a/providers/standard/tests/unit/standard/triggers/test_hitl.py b/providers/standard/tests/unit/standard/triggers/test_hitl.py index 7b33715e68d35..ef82f9e09d9b0 100644 --- a/providers/standard/tests/unit/standard/triggers/test_hitl.py +++ b/providers/standard/tests/unit/standard/triggers/test_hitl.py @@ -31,7 +31,7 @@ from uuid6 import uuid7 from airflow._shared.timezones.timezone import utc, utcnow -from airflow.api_fastapi.execution_api.datamodels.hitl import HITLDetailResponse +from airflow.api_fastapi.execution_api.datamodels.hitl import HITLDetailResponse, HITLUser from airflow.providers.standard.triggers.hitl import ( HITLTrigger, HITLTriggerEventFailurePayload, @@ -110,8 +110,7 @@ async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_lo ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=False, - responded_user_id=None, - responded_user_name=None, + responded_by_user=None, response_at=None, chosen_options=None, params_input={}, @@ -123,7 +122,12 @@ async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_lo event = await trigger_task assert event == TriggerEvent( - HITLTriggerEventSuccessPayload(chosen_options=["1"], params_input={"input": 1}, timedout=True) + HITLTriggerEventSuccessPayload( + chosen_options=["1"], + params_input={"input": 1}, + responded_by_user={"id": "Fallback to defaults", "name": "Fallback to defaults"}, + timedout=True, + ) ) assert mock_log.info.call_args == mock.call( @@ -149,8 +153,7 @@ async def test_run_should_check_response_in_timeout_handler( ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True, - responded_user_id="1", - responded_user_name="test", + responded_by_user=HITLUser(id="1", name="test"), response_at=action_datetime, chosen_options=["2"], params_input={}, @@ -162,7 +165,12 @@ async def test_run_should_check_response_in_timeout_handler( event = await trigger_task assert event == TriggerEvent( - HITLTriggerEventSuccessPayload(chosen_options=["2"], params_input={}, timedout=False) + HITLTriggerEventSuccessPayload( + chosen_options=["2"], + params_input={}, + responded_by_user={"id": "1", "name": "test"}, + timedout=False, + ) ) assert mock_log.info.call_args == mock.call( @@ -188,8 +196,7 @@ async def test_run(self, mock_update, mock_log, mock_supervisor_comms, time_mach ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True, - responded_user_id="test", - responded_user_name="test", + responded_by_user=HITLUser(id="test", name="test"), response_at=utcnow(), chosen_options=["3"], params_input={"input": 50}, @@ -203,6 +210,7 @@ async def test_run(self, mock_update, mock_log, mock_supervisor_comms, time_mach HITLTriggerEventSuccessPayload( chosen_options=["3"], params_input={"input": 50}, + responded_by_user={"id": "test", "name": "test"}, timedout=False, ) ) diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 04575fa770d05..0a353271c1025 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -44,6 +44,7 @@ DagRunStateResponse, DagRunType, HITLDetailResponse, + HITLUser, InactiveAssetsResponse, PrevSuccessfulDagRunResponse, TaskInstanceState, @@ -723,7 +724,7 @@ def add_response( defaults: list[str] | None = None, multiple: bool = False, params: dict[str, Any] | None = None, - respondents: list[str] | None = None, + assigned_users: list[HITLUser] | None = None, ) -> HITLDetailRequestResult: """Add a Human-in-the-loop response that waits for human response for a specific Task Instance.""" payload = CreateHITLDetailPayload( @@ -734,7 +735,7 @@ def add_response( defaults=defaults, multiple=multiple, params=params, - respondents=respondents, + assigned_users=assigned_users, ) resp = self.client.post( f"/hitlDetails/{ti_id}", diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index cd861bf07d124..d2151d59d028a 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -154,32 +154,13 @@ class DagRunType(str, Enum): ASSET_TRIGGERED = "asset_triggered" -class HITLDetailRequest(BaseModel): +class HITLUser(BaseModel): """ - Schema for the request part of a Human-in-the-loop detail for a specific task instance. + Schema for a Human-in-the-loop users. """ - ti_id: Annotated[UUID, Field(title="Ti Id")] - options: Annotated[list[str], Field(min_length=1, title="Options")] - subject: Annotated[str, Field(title="Subject")] - body: Annotated[str | None, Field(title="Body")] = None - defaults: Annotated[list[str] | None, Field(title="Defaults")] = None - multiple: Annotated[bool | None, Field(title="Multiple")] = False - params: Annotated[dict[str, Any] | None, Field(title="Params")] = None - respondents: Annotated[list[str] | None, Field(title="Respondents")] = None - - -class HITLDetailResponse(BaseModel): - """ - Schema for the response part of a Human-in-the-loop detail for a specific task instance. - """ - - response_received: Annotated[bool, Field(title="Response Received")] - responded_user_name: Annotated[str | None, Field(title="Responded User Name")] = None - responded_user_id: Annotated[str | None, Field(title="Responded User Id")] = None - response_at: Annotated[AwareDatetime | None, Field(title="Response At")] = None - chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None - params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + id: Annotated[str, Field(title="Id")] + name: Annotated[str, Field(title="Name")] class InactiveAssetsResponse(BaseModel): @@ -561,6 +542,33 @@ class DagRun(BaseModel): consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] +class HITLDetailRequest(BaseModel): + """ + Schema for the request part of a Human-in-the-loop detail for a specific task instance. + """ + + ti_id: Annotated[UUID, Field(title="Ti Id")] + options: Annotated[list[str], Field(min_length=1, title="Options")] + subject: Annotated[str, Field(title="Subject")] + body: Annotated[str | None, Field(title="Body")] = None + defaults: Annotated[list[str] | None, Field(title="Defaults")] = None + multiple: Annotated[bool | None, Field(title="Multiple")] = False + params: Annotated[dict[str, Any] | None, Field(title="Params")] = None + assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned Users")] = None + + +class HITLDetailResponse(BaseModel): + """ + Schema for the response part of a Human-in-the-loop detail for a specific task instance. + """ + + response_received: Annotated[bool, Field(title="Response Received")] + responded_by_user: HITLUser | None = None + response_at: Annotated[AwareDatetime | None, Field(title="Response At")] = None + chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None + params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + + class HTTPValidationError(BaseModel): detail: Annotated[list[ValidationError] | None, Field(title="Detail")] = None diff --git a/task-sdk/src/airflow/sdk/execution_time/hitl.py b/task-sdk/src/airflow/sdk/execution_time/hitl.py index 500be78ab4e22..07f94e63c0529 100644 --- a/task-sdk/src/airflow/sdk/execution_time/hitl.py +++ b/task-sdk/src/airflow/sdk/execution_time/hitl.py @@ -17,9 +17,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict from uuid import UUID +from airflow.sdk.api.datamodels._generated import HITLUser as APIHITLUser from airflow.sdk.execution_time.comms import ( CreateHITLDetailPayload, GetHITLDetailResponse, @@ -30,6 +31,11 @@ from airflow.sdk.api.datamodels._generated import HITLDetailResponse +class HITLUser(TypedDict): + id: str + name: str + + def upsert_hitl_detail( ti_id: UUID, options: list[str], @@ -38,7 +44,7 @@ def upsert_hitl_detail( defaults: list[str] | None = None, multiple: bool = False, params: dict[str, Any] | None = None, - respondents: list[str] | None = None, + assigned_users: list[HITLUser] | None = None, ) -> None: from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS @@ -51,7 +57,11 @@ def upsert_hitl_detail( defaults=defaults, params=params, multiple=multiple, - respondents=respondents, + assigned_users=( + [APIHITLUser(id=user["id"], name=user["name"]) for user in assigned_users] + if assigned_users + else [] + ), ) ) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index e3e4a1cbe838c..e4818e7756fa9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1343,7 +1343,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: defaults=msg.defaults, params=msg.params, multiple=msg.multiple, - respondents=msg.respondents, + assigned_users=msg.assigned_users, ) self.send_msg(resp, request_id=req_id, error=None, **dump_opts) elif isinstance(msg, MaskSecret): diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 986ab5fe5e8bf..9464296934dba 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -38,6 +38,7 @@ DagRunState, DagRunStateResponse, HITLDetailResponse, + HITLUser, VariableResponse, XComResponse, ) @@ -1289,7 +1290,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert result.defaults == ["Approval"] assert result.params is None assert result.multiple is False - assert result.respondents is None + assert result.assigned_users is None def test_update_response(self, time_machine: TimeMachineFixture) -> None: time_machine.move_to(datetime(2025, 7, 3, 0, 0, 0)) @@ -1302,8 +1303,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: json={ "chosen_options": ["Approval"], "params_input": {}, - "responded_user_id": "admin", - "responded_user_name": "admin", + "responded_by_user": {"id": "admin", "name": "admin"}, "response_received": True, "response_at": "2025-07-03T00:00:00Z", }, @@ -1320,8 +1320,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert result.response_received is True assert result.chosen_options == ["Approval"] assert result.params_input == {} - assert result.responded_user_id == "admin" - assert result.responded_user_name == "admin" + assert result.responded_by_user == HITLUser(id="admin", name="admin") assert result.response_at == timezone.datetime(2025, 7, 3, 0, 0, 0) def test_get_detail_response(self, time_machine: TimeMachineFixture) -> None: @@ -1335,8 +1334,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: json={ "chosen_options": ["Approval"], "params_input": {}, - "responded_user_id": "admin", - "responded_user_name": "admin", + "responded_by_user": {"id": "admin", "name": "admin"}, "response_received": True, "response_at": "2025-07-03T00:00:00Z", }, @@ -1349,6 +1347,5 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert result.response_received is True assert result.chosen_options == ["Approval"] assert result.params_input == {} - assert result.responded_user_id == "admin" - assert result.responded_user_name == "admin" + assert result.responded_by_user == HITLUser(id="admin", name="admin") assert result.response_at == timezone.datetime(2025, 7, 3, 0, 0, 0) diff --git a/task-sdk/tests/task_sdk/execution_time/test_hitl.py b/task-sdk/tests/task_sdk/execution_time/test_hitl.py index a5ed016f44de8..ad74ade5cd2e6 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_hitl.py +++ b/task-sdk/tests/task_sdk/execution_time/test_hitl.py @@ -20,9 +20,10 @@ from uuid6 import uuid7 from airflow.sdk import timezone -from airflow.sdk.api.datamodels._generated import HITLDetailResponse +from airflow.sdk.api.datamodels._generated import HITLDetailResponse, HITLUser as APIHITLUser from airflow.sdk.execution_time.comms import CreateHITLDetailPayload from airflow.sdk.execution_time.hitl import ( + HITLUser, get_hitl_detail_content_detail, update_hitl_detail_response, upsert_hitl_detail, @@ -39,7 +40,7 @@ def test_upsert_hitl_detail(mock_supervisor_comms) -> None: body="Optional body", defaults=["Approve", "Reject"], params={"input_1": 1}, - respondents=["test"], + assigned_users=[HITLUser(id="test", name="test")], multiple=False, ) mock_supervisor_comms.send.assert_called_with( @@ -50,7 +51,7 @@ def test_upsert_hitl_detail(mock_supervisor_comms) -> None: body="Optional body", defaults=["Approve", "Reject"], params={"input_1": 1}, - respondents=["test"], + assigned_users=[APIHITLUser(id="test", name="test")], multiple=False, ) ) @@ -62,8 +63,7 @@ def test_update_hitl_detail_response(mock_supervisor_comms) -> None: response_received=True, chosen_options=["Approve"], response_at=timestamp, - responded_user_id="admin", - responded_user_name="admin", + responded_by_user=APIHITLUser(id="admin", name="admin"), params_input={"input_1": 1}, ) resp = update_hitl_detail_response( @@ -75,8 +75,7 @@ def test_update_hitl_detail_response(mock_supervisor_comms) -> None: response_received=True, chosen_options=["Approve"], response_at=timestamp, - responded_user_id="admin", - responded_user_name="admin", + responded_by_user=APIHITLUser(id="admin", name="admin"), params_input={"input_1": 1}, ) @@ -86,8 +85,7 @@ def test_get_hitl_detail_content_detail(mock_supervisor_comms) -> None: response_received=False, chosen_options=None, response_at=None, - responded_user_id=None, - responded_user_name=None, + responded_by_user=None, params_input={}, ) resp = get_hitl_detail_content_detail(TI_ID) @@ -95,7 +93,6 @@ def test_get_hitl_detail_content_detail(mock_supervisor_comms) -> None: response_received=False, chosen_options=None, response_at=None, - responded_user_id=None, - responded_user_name=None, + responded_by_user=None, params_input={}, ) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index e6b0a3dccd95b..f36bc8a191aed 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2000,7 +2000,7 @@ class RequestTestCase: "defaults": ["Approve"], "multiple": False, "params": {}, - "respondents": None, + "assigned_users": None, "type": "HITLDetailRequestResult", }, client_mock=ClientMock( @@ -2011,7 +2011,7 @@ class RequestTestCase: "multiple": False, "options": ["Approve", "Reject"], "params": {}, - "respondents": None, + "assigned_users": None, "subject": "This is subject", "ti_id": TI_ID, },