From 0a2df3e803b8aba6e622540c72c30617c6513456 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 13 May 2025 14:36:30 +0530 Subject: [PATCH 1/4] Cadwyn migration for backcompat of /run endpoint --- .../execution_api/versions/__init__.py | 2 + .../execution_api/versions/v2025_05_20.py | 51 +++++++++++++++++++ .../airflow/sdk/api/datamodels/_generated.py | 2 +- 3 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 54329054c1213..5462f10297495 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -20,9 +20,11 @@ from cadwyn import HeadVersion, Version, VersionBundle from airflow.api_fastapi.execution_api.versions.v2025_04_28 import AddRenderedMapIndexField +from airflow.api_fastapi.execution_api.versions.v2025_05_20 import DowngradeUpstreamMapIndexes bundle = VersionBundle( HeadVersion(), + Version("2025-05-20", DowngradeUpstreamMapIndexes), Version("2025-04-28", AddRenderedMapIndexField), Version("2025-04-11"), ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py new file mode 100644 index 0000000000000..084464f243b51 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema + +from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext + + +class DowngradeUpstreamMapIndexes(VersionChange): + """Downgrade the upstream map indexes type for older clients.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(TIRunContext).field("upstream_map_indexes").had(type=dict[str, int] | None), # type: ignore + ) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: # type: ignore + """ + Downgrades the `upstream_map_indexes` field when converting to the previous version. + + Ensures that the field is only a dictionary of [str, int] (old format). + """ + resp = response.body["upstream_map_indexes"] + if isinstance(resp, dict): + downgraded = {} + for k, v in resp.items(): + if isinstance(v, int): + downgraded[k] = v + elif isinstance(v, list) and v and all(isinstance(i, int) for i in v): + downgraded[k] = v[0] + else: + downgraded[k] = 0 + response.body["upstream_map_indexes"] = downgraded diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index dbf39fa4ae146..3efae80e5b671 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -27,7 +27,7 @@ from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue -API_VERSION: Final[str] = "2025-04-28" +API_VERSION: Final[str] = "2025-05-20" class AssetAliasReferenceAssetEventDagRun(BaseModel): From f3b1cc6734f25d47ccbe1a1cfd1bf66715bf64a1 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 13 May 2025 14:53:08 +0530 Subject: [PATCH 2/4] fixing startup issues --- .../airflow/api_fastapi/execution_api/versions/v2025_05_20.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py index 084464f243b51..daa0ac0270096 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py @@ -17,6 +17,8 @@ from __future__ import annotations +from typing import Optional + from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext @@ -28,7 +30,7 @@ class DowngradeUpstreamMapIndexes(VersionChange): description = __doc__ instructions_to_migrate_to_previous_version = ( - schema(TIRunContext).field("upstream_map_indexes").had(type=dict[str, int] | None), # type: ignore + schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, int]]), # type: ignore ) @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] From a8eb16974fcb8f7e44994508f26f2206fe15243b Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 13 May 2025 15:00:38 +0530 Subject: [PATCH 3/4] ash comments Co-authored-by: Ash Berlin-Taylor --- .../airflow/api_fastapi/execution_api/versions/v2025_05_20.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py index daa0ac0270096..d5d5009560fb9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py @@ -40,7 +40,7 @@ def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: # ty Ensures that the field is only a dictionary of [str, int] (old format). """ - resp = response.body["upstream_map_indexes"] + resp = response.body.get("upstream_map_indexes") if isinstance(resp, dict): downgraded = {} for k, v in resp.items(): From 8a6859a07b956d49d08f931bbdea4147a4bf2d0f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 13 May 2025 17:03:25 +0530 Subject: [PATCH 4/4] adding test cases for execution API --- .../execution_api/versions/v2025_05_20.py | 3 +- .../v2025_04_28/test_task_instances.py | 142 ++++++++++++++++++ 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py index d5d5009560fb9..f8039f0a4d3fb 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py @@ -49,5 +49,6 @@ def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: # ty elif isinstance(v, list) and v and all(isinstance(i, int) for i in v): downgraded[k] = v[0] else: - downgraded[k] = 0 + # for cases like None, make it -1 + downgraded[k] = -1 response.body["upstream_map_indexes"] = downgraded diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py new file mode 100644 index 0000000000000..fce29c4b2e531 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from airflow.api_fastapi.common.dagbag import create_dag_bag, dag_bag_from_app +from airflow.utils import timezone +from airflow.utils.state import State + +from tests_common.test_utils.db import clear_db_assets, clear_db_runs + +pytestmark = pytest.mark.db_test + + +DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z") +DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z") + + +@pytest.fixture +def ver_client(client): + client.headers["Airflow-API-Version"] = "2025-04-28" + return client + + +class TestTIUpdateState: + def setup_method(self): + clear_db_assets() + clear_db_runs() + + def teardown_method(self): + clear_db_assets() + clear_db_runs() + + @pytest.mark.parametrize( + "mock_indexes, expected_response_indexes", + [ + pytest.param( + [("task_a", 5), ("task_b", 10)], + {"task_a": 5, "task_b": 10}, + id="plain ints", + ), + pytest.param( + [("task_a", [3, 4]), ("task_b", [9])], + {"task_a": 3, "task_b": 9}, + id="list of ints", + ), + pytest.param( + [("task_a", None), ("task_b", [6, 7]), ("task_c", 2)], + {"task_a": -1, "task_b": 6, "task_c": 2}, + id="mixed types", + ), + ], + ) + @patch("airflow.api_fastapi.execution_api.routes.task_instances._get_upstream_map_indexes") + def test_ti_run( + self, + mock_get_upstream_map_indexes, + ver_client, + session, + create_task_instance, + time_machine, + mock_indexes, + expected_response_indexes, + ): + """ + Test that this version of the endpoint works. + + Later versions modified the type of upstream_map_indexes. + """ + mock_get_upstream_map_indexes.return_value = mock_indexes + + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_state_to_running", + state=State.QUEUED, + session=session, + start_date=instant, + ) + + dag = ti.task.dag + dagbag = create_dag_bag() + dagbag.dags = {dag.dag_id: dag} + execution_app = next(route.app for route in ver_client.app.routes if route.path == "/execution") + execution_app.dependency_overrides[dag_bag_from_app] = lambda: dagbag + session.commit() + + response = ver_client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + assert response.json() == { + "dag_run": { + "dag_id": "dag", + "run_id": "test", + "clear_number": 0, + "logical_date": instant_str, + "data_interval_start": instant.subtract(days=1).to_iso8601_string(), + "data_interval_end": instant_str, + "run_after": instant_str, + "start_date": instant_str, + "end_date": None, + "run_type": "manual", + "conf": {}, + "consumed_asset_events": [], + }, + "task_reschedule_count": 0, + "upstream_map_indexes": expected_response_indexes, + "max_tries": 0, + "should_retry": False, + "variables": [], + "connections": [], + "xcom_keys_to_clear": [], + }