From 39cd8c7c60ac8ed8222c394ebcdb54668995d202 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Tue, 13 May 2025 18:07:08 +0200 Subject: [PATCH] Fix patch_task_instance endpoint --- .../openapi/v1-rest-api-generated.yaml | 22 +- .../core_api/routes/public/task_instances.py | 21 +- .../airflow/ui/openapi-gen/queries/queries.ts | 4 +- .../ui/openapi-gen/requests/services.gen.ts | 4 +- .../ui/openapi-gen/requests/types.gen.ts | 16 +- .../routes/public/test_task_instances.py | 370 ++++++++++-------- 6 files changed, 241 insertions(+), 196 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index 1cb0a8bfdbab1..101f6db7f84b6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -4543,8 +4543,9 @@ paths: in: query required: false schema: - type: integer - default: -1 + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -4568,7 +4569,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TaskInstanceResponse' + $ref: '#/components/schemas/TaskInstanceCollectionResponse' '401': content: application/json: @@ -5256,7 +5257,9 @@ paths: in: path required: true schema: - type: integer + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -5280,7 +5283,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TaskInstanceResponse' + $ref: '#/components/schemas/TaskInstanceCollectionResponse' '401': content: application/json: @@ -5853,7 +5856,9 @@ paths: in: path required: true schema: - type: integer + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -5940,8 +5945,9 @@ paths: in: query required: false schema: - type: integer - default: -1 + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 7021fbaef3fdf..0fcb6530608e9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -803,10 +803,13 @@ def patch_task_instance_dry_run( dag_bag: DagBagDep, body: PatchTaskInstanceBody, session: SessionDep, - map_index: int = -1, + map_index: int | None = None, update_mask: list[str] | None = Query(None), ) -> TaskInstanceCollectionResponse: """Update a task instance dry_run mode.""" + if map_index is None: + map_index = -1 + dag, ti, data = _patch_ti_validate_request( dag_id, dag_run_id, task_id, dag_bag, body, session, map_index, update_mask ) @@ -874,10 +877,13 @@ def patch_task_instance( body: PatchTaskInstanceBody, user: GetUserDep, session: SessionDep, - map_index: int = -1, + map_index: int | None = None, update_mask: list[str] | None = Query(None), -) -> TaskInstanceResponse: +) -> TaskInstanceCollectionResponse: """Update a task instance.""" + if map_index is None: + map_index = -1 + dag, ti, data = _patch_ti_validate_request( dag_id, dag_run_id, task_id, dag_bag, body, session, map_index, update_mask ) @@ -924,7 +930,14 @@ def patch_task_instance( ti.task_instance_note.user_id = user.get_id() session.commit() - return TaskInstanceResponse.model_validate(ti) + return TaskInstanceCollectionResponse( + task_instances=[ + TaskInstanceResponse.model_validate( + ti, + ) + ], + total_entries=1, + ) @task_instances_router.delete( diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 9fe536cd513d7..ba618c79b3656 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -4066,7 +4066,7 @@ export const useDagServicePatchDag = < * @param data.requestBody * @param data.mapIndex * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ export const useTaskInstanceServicePatchTaskInstance = < @@ -4125,7 +4125,7 @@ export const useTaskInstanceServicePatchTaskInstance = < * @param data.mapIndex * @param data.requestBody * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ export const useTaskInstanceServicePatchTaskInstanceByMapIndex = < diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 59e0b9e193de1..d5b88a7bf2b85 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1923,7 +1923,7 @@ export class TaskInstanceService { * @param data.requestBody * @param data.mapIndex * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ public static patchTaskInstance(data: PatchTaskInstanceData): CancelablePromise { @@ -2231,7 +2231,7 @@ export class TaskInstanceService { * @param data.mapIndex * @param data.requestBody * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ public static patchTaskInstanceByMapIndex( diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 294f9f0b02d41..0625548e395f1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2270,13 +2270,13 @@ export type GetTaskInstanceResponse = TaskInstanceResponse; export type PatchTaskInstanceData = { dagId: string; dagRunId: string; - mapIndex?: number; + mapIndex?: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array | null; }; -export type PatchTaskInstanceResponse = TaskInstanceResponse; +export type PatchTaskInstanceResponse = TaskInstanceCollectionResponse; export type DeleteTaskInstanceData = { dagId: string; @@ -2363,13 +2363,13 @@ export type GetMappedTaskInstanceResponse = TaskInstanceResponse; export type PatchTaskInstanceByMapIndexData = { dagId: string; dagRunId: string; - mapIndex: number; + mapIndex: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array | null; }; -export type PatchTaskInstanceByMapIndexResponse = TaskInstanceResponse; +export type PatchTaskInstanceByMapIndexResponse = TaskInstanceCollectionResponse; export type GetTaskInstancesData = { dagId: string; @@ -2441,7 +2441,7 @@ export type PostClearTaskInstancesResponse = TaskInstanceCollectionResponse; export type PatchTaskInstanceDryRunByMapIndexData = { dagId: string; dagRunId: string; - mapIndex: number; + mapIndex: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array | null; @@ -2452,7 +2452,7 @@ export type PatchTaskInstanceDryRunByMapIndexResponse = TaskInstanceCollectionRe export type PatchTaskInstanceDryRunData = { dagId: string; dagRunId: string; - mapIndex?: number; + mapIndex?: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array | null; @@ -4307,7 +4307,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: TaskInstanceResponse; + 200: TaskInstanceCollectionResponse; /** * Bad Request */ @@ -4527,7 +4527,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: TaskInstanceResponse; + 200: TaskInstanceCollectionResponse; /** * Bad Request */ diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 20540093960b9..e0b694f748da6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -3113,39 +3113,44 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): ) assert response.status_code == 200 assert response.json() == { - "dag_id": self.DAG_ID, - "dag_display_name": self.DAG_DISPLAY_NAME, - "dag_version": None, - "dag_run_id": self.RUN_ID, - "logical_date": "2020-01-01T00:00:00Z", - "task_id": self.TASK_ID, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_version": None, + "dag_run_id": self.RUN_ID, + "logical_date": "2020-01-01T00:00:00Z", + "task_id": self.TASK_ID, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } mock_set_ti_state.assert_called_once_with( @@ -3329,39 +3334,44 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "failed", 200, { - "dag_id": "example_python_operator", - "dag_display_name": "example_python_operator", - "dag_version": None, - "dag_run_id": "TEST_DAG_RUN_ID", - "logical_date": "2020-01-01T00:00:00Z", - "task_id": "print_the_context", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": "print_the_context", - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": "example_python_operator", + "dag_display_name": "example_python_operator", + "dag_version": None, + "dag_run_id": "TEST_DAG_RUN_ID", + "logical_date": "2020-01-01T00:00:00Z", + "task_id": "print_the_context", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": "print_the_context", + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, }, 1, ), @@ -3441,41 +3451,46 @@ def test_update_mask_set_note_should_respond_200( assert response.status_code == 200, response.text response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_display_name": self.DAG_DISPLAY_NAME, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } - _check_task_instance_note(session, response_data["id"], ti_note_data) + _check_task_instance_note(session, response_data["task_instances"][0]["id"], ti_note_data) def test_set_note_should_respond_200(self, test_client, session): self.create_task_instances(session) @@ -3487,43 +3502,48 @@ def test_set_note_should_respond_200(self, test_client, session): assert response.status_code == 200, response.text response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_display_name": self.DAG_DISPLAY_NAME, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} + session, response_data["task_instances"][0]["id"], {"content": new_note_value, "user_id": "test"} ) def test_set_note_should_respond_200_mapped_task_instance_with_rtif(self, test_client, session): @@ -3550,43 +3570,50 @@ def test_set_note_should_respond_200_mapped_task_instance_with_rtif(self, test_c response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_display_name": self.DAG_DISPLAY_NAME, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": map_index, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, - "rendered_map_index": str(map_index), - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, + "rendered_map_index": str(map_index), + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} + session, + response_data["task_instances"][0]["id"], + {"content": new_note_value, "user_id": "test"}, ) def test_set_note_should_respond_200_when_note_is_empty(self, test_client, session): @@ -3602,10 +3629,9 @@ def test_set_note_should_respond_200_when_note_is_empty(self, test_client, sessi ) assert response.status_code == 200, response.text response_data = response.json() - assert response_data["note"] == new_note_value - _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} - ) + response_ti = response_data["task_instances"][0] + assert response_ti["note"] == new_note_value + _check_task_instance_note(session, response_ti["id"], {"content": new_note_value, "user_id": "test"}) @mock.patch("airflow.models.dag.DAG.set_task_instance_state") def test_should_raise_409_for_updating_same_task_instance_state(