Skip to content

Commit ea456d8

Browse files
authored
Add response migration for consumed_asset_events events field (#48940)
And, like I should have done in #48888, add tests this time. The issue was caused by me misunderstanding how Cadwyn works, the `instructions_to_migrate_to_previous_version` only affect the OpenAPI schema, but since the change in this case results in an "invalid" model being returned, we have to write our own migration for the data. For those curious, the error without this migration function is: > fastapi.exceptions.ResponseValidationError: 1 validation errors: > {'type': 'extra_forbidden', 'loc': ('response', 'dag_run', 'consumed_asset_events'), 'msg': 'Extra inputs are not permitted', 'input': []}
1 parent ca755fb commit ea456d8

File tree

3 files changed

+121
-2
lines changed

3 files changed

+121
-2
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
from __future__ import annotations
1919

20-
from cadwyn import VersionChange, schema
20+
from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema
2121

22-
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
22+
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext
2323

2424

2525
class AddConsumedAssetEventsField(VersionChange):
@@ -28,3 +28,7 @@ class AddConsumedAssetEventsField(VersionChange):
2828
description = __doc__
2929

3030
instructions_to_migrate_to_previous_version = (schema(DagRun).field("consumed_asset_events").didnt_exist,)
31+
32+
@convert_response_to_previous_version_for(TIRunContext) # type: ignore
33+
def remove_consumed_asset_events(response: ResponseInfo): # type: ignore
34+
response.body["dag_run"].pop("consumed_asset_events")
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
import pytest
21+
22+
from airflow.utils import timezone
23+
from airflow.utils.state import State
24+
25+
from tests_common.test_utils.db import clear_db_assets, clear_db_runs
26+
27+
pytestmark = pytest.mark.db_test
28+
29+
30+
DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z")
31+
DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z")
32+
33+
34+
@pytest.fixture
35+
def ver_client(client):
36+
client.headers["Airflow-API-Version"] = "2025-03-26"
37+
return client
38+
39+
40+
class TestTIUpdateState:
41+
def setup_method(self):
42+
clear_db_assets()
43+
clear_db_runs()
44+
45+
def teardown_method(self):
46+
clear_db_assets()
47+
clear_db_runs()
48+
49+
def test_ti_run(self, ver_client, session, create_task_instance, time_machine):
50+
"""
51+
Test that this version of the endpoint works.
52+
53+
Later versions add a consumed_asset_events field.
54+
"""
55+
instant_str = "2024-09-30T12:00:00Z"
56+
instant = timezone.parse(instant_str)
57+
time_machine.move_to(instant, tick=False)
58+
59+
ti = create_task_instance(
60+
task_id="test_ti_run_state_to_running",
61+
state=State.QUEUED,
62+
session=session,
63+
start_date=instant,
64+
)
65+
session.commit()
66+
67+
response = ver_client.patch(
68+
f"/execution/task-instances/{ti.id}/run",
69+
json={
70+
"state": "running",
71+
"hostname": "random-hostname",
72+
"unixname": "random-unixname",
73+
"pid": 100,
74+
"start_date": instant_str,
75+
},
76+
)
77+
78+
assert response.status_code == 200
79+
assert response.json() == {
80+
"dag_run": {
81+
"dag_id": "dag",
82+
"run_id": "test",
83+
"clear_number": 0,
84+
"logical_date": instant_str,
85+
"data_interval_start": instant.subtract(days=1).to_iso8601_string(),
86+
"data_interval_end": instant_str,
87+
"run_after": instant_str,
88+
"start_date": instant_str,
89+
"end_date": None,
90+
"run_type": "manual",
91+
"conf": {},
92+
},
93+
"task_reschedule_count": 0,
94+
"max_tries": 0,
95+
"should_retry": False,
96+
"variables": [],
97+
"connections": [],
98+
"xcom_keys_to_clear": [],
99+
}

0 commit comments

Comments
 (0)