diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 7fa329e0f32b5..ae813bca5a2c3 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -299,6 +299,7 @@ class DagRun(StrictBaseModel): run_type: DagRunType state: DagRunState conf: Annotated[dict[str, Any], Field(default_factory=dict)] + triggering_user_name: str | None = None consumed_asset_events: list[AssetEventDagRunReference] diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 84d676347ac87..2fc618bb7a616 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -26,9 +26,11 @@ AddIncludePriorDatesToGetXComSlice, ) from airflow.api_fastapi.execution_api.versions.v2025_09_23 import AddDagVersionIdField +from airflow.api_fastapi.execution_api.versions.v2025_10_10 import AddTriggeringUserNameField bundle = VersionBundle( HeadVersion(), + Version("2025-10-10", AddTriggeringUserNameField), Version("2025-09-23", AddDagVersionIdField), Version( "2025-08-10", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_10_10.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_10_10.py new file mode 100644 index 0000000000000..ced2917afda05 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_10_10.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema + +from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext + + +class AddTriggeringUserNameField(VersionChange): + """Add the `triggering_user_name` field to DagRun model.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("triggering_user_name").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_triggering_user_name_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove the `triggering_user_name` field from the dag_run object when converting to the previous version.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("triggering_user_name", None) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 66ce18e2d3554..c0915664f6207 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -189,6 +189,7 @@ def test_ti_run_state_to_running( "end_date": None, "run_type": "manual", "conf": {}, + "triggering_user_name": None, "consumed_asset_events": [], }, "task_reschedule_count": 0, @@ -637,6 +638,62 @@ def test_xcom_not_cleared_for_deferral(self, client, session, create_task_instan assert response.status_code == 200 assert ti.xcom_pull(task_ids="test_xcom_not_cleared_for_deferral", key="key") == "value" + def test_ti_run_with_triggering_user_name( + self, + client, + session, + dag_maker, + time_machine, + ): + """ + Test that the triggering_user_name field is correctly returned when it has a non-None value. + """ + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + with dag_maker(dag_id=str(uuid4()), session=session): + EmptyOperator(task_id="test_ti_run_with_triggering_user_name") + + # Create DagRun with triggering_user_name set to a specific value + dr = dag_maker.create_dagrun( + run_id="test", + logical_date=instant, + state=DagRunState.RUNNING, + start_date=instant, + triggering_user_name="test_user", + ) + + ti = dr.get_task_instance(task_id="test_ti_run_with_triggering_user_name") + ti.set_state(State.QUEUED) + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "test-hostname", + "unixname": "test-unixname", + "pid": 12345, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + json_response = response.json() + + # Verify the dag_run is present + assert "dag_run" in json_response + dag_run = json_response["dag_run"] + + # The triggering_user_name field should be present with the correct value + assert dag_run["triggering_user_name"] == "test_user" + + # Verify other expected fields are still present + assert dag_run["dag_id"] == ti.dag_id + assert dag_run["run_id"] == "test" + assert dag_run["state"] == "running" + class TestTIUpdateState: def setup_method(self): diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/__init__.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_task_instances.py new file mode 100644 index 0000000000000..79891731b770d --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_09_23/test_task_instances.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow._shared.timezones import timezone +from airflow.utils.state import DagRunState, State + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + + +@pytest.fixture +def ver_client(client): + """Client configured to use API version 2025-09-23.""" + client.headers["Airflow-API-Version"] = "2025-09-23" + return client + + +class TestTIRunStateV20250923: + """Test that API version 2025-09-23 does NOT include triggering_user_name field.""" + + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_ti_run_excludes_triggering_user_name( + self, + ver_client, + session, + create_task_instance, + time_machine, + ): + """ + Test that the triggering_user_name field is NOT present in API version 2025-09-23. + + This field was added in version 2025-10-10, so older API clients should not + receive it in the response. + """ + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_triggering_user_exclusion", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=instant, + ) + session.commit() + + response = ver_client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "test-hostname", + "unixname": "test-user", + "pid": 12345, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + json_response = response.json() + + # Verify the dag_run is present + assert "dag_run" in json_response + dag_run = json_response["dag_run"] + + # The triggering_user_name field should NOT be present in this API version + assert "triggering_user_name" not in dag_run, ( + "triggering_user_name should not be present in API version 2025-09-23" + ) + + # Verify other expected fields are still present + assert dag_run["dag_id"] == ti.dag_id + assert dag_run["run_id"] == "test" + assert dag_run["state"] == "running" + assert dag_run["conf"] == {} diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 8e4c4207b2742..464bea8eeacd0 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -27,7 +27,7 @@ from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, RootModel -API_VERSION: Final[str] = "2025-09-23" +API_VERSION: Final[str] = "2025-10-10" class AssetAliasReferenceAssetEventDagRun(BaseModel): @@ -539,6 +539,7 @@ class DagRun(BaseModel): run_type: DagRunType state: DagRunState conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None + triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] diff --git a/task-sdk/src/airflow/sdk/types.py b/task-sdk/src/airflow/sdk/types.py index c7084629085dd..124fb560cc592 100644 --- a/task-sdk/src/airflow/sdk/types.py +++ b/task-sdk/src/airflow/sdk/types.py @@ -52,6 +52,7 @@ class DagRunProtocol(Protocol): run_type: Any run_after: AwareDatetime conf: dict[str, Any] | None + triggering_user_name: str | None class RuntimeTaskInstanceProtocol(Protocol): diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 8a5e06e0e8445..1580e7697fdac 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -1846,6 +1846,7 @@ class RequestTestCase: "end_date": None, "clear_number": 0, "conf": None, + "triggering_user_name": None, }, "type": "PreviousDagRunResult", }, @@ -1866,6 +1867,7 @@ class RequestTestCase: run_after=timezone.parse("2024-01-15T12:00:00Z"), consumed_asset_events=[], state=DagRunState.SUCCESS, + triggering_user_name=None, ) ), ),