From e16122e9a650eb6f0540957308b0c4cad733a42f Mon Sep 17 00:00:00 2001 From: bggwak Date: Tue, 12 Aug 2025 22:29:37 +0900 Subject: [PATCH 01/14] Fix clearTaskInstances API: Restore include_past/include_future support on UI --- .../core_api/routes/public/task_instances.py | 16 +- .../routes/public/test_task_instances.py | 317 +++++++++++------- 2 files changed, 217 insertions(+), 116 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 437e2ef8d6600..92958ae8503e8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -704,6 +704,7 @@ def post_clear_task_instances( downstream = body.include_downstream upstream = body.include_upstream + # Improved logic - resolve logical_date for scheduled DAGRun if dag_run_id is not None: dag_run: DagRun | None = session.scalar( select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id) @@ -713,13 +714,19 @@ def post_clear_task_instances( raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) # Get the specific dag version: dag = get_dag_for_run(dag_bag, dag_run, session) - if past or future: + if (past or future) and dag_run.logical_date is None: raise HTTPException( status.HTTP_400_BAD_REQUEST, - "Cannot use include_past or include_future when dag_run_id is provided because logical_date is not applicable.", + "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." ) - body.start_date = dag_run.logical_date if dag_run.logical_date is not None else None - body.end_date = dag_run.logical_date if dag_run.logical_date is not None else None + + if past or future: + body.start_date = dag_run.logical_date if not past else None + body.end_date = dag_run.logical_date if not future else None + dag_run_id = None # Use date-based clearing + else: + body.start_date = dag_run.logical_date + body.end_date = dag_run.logical_date if past: body.start_date = None @@ -778,6 +785,7 @@ def post_clear_task_instances( ) + @task_instances_router.patch( task_instances_prefix + "/{task_id}/dry_run", responses=create_openapi_http_exception_doc( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index c19b7d20728a3..5bc86ab683dcd 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -26,6 +26,7 @@ import pendulum import pytest +from airflow.models.baseoperator import BaseOperator from sqlalchemy import select from airflow._shared.timezones.timezone import datetime @@ -44,7 +45,6 @@ from airflow.utils.platform import getuser from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunType - from tests_common.test_utils.api_fastapi import _check_task_instance_note from tests_common.test_utils.db import ( clear_db_runs, @@ -778,12 +778,12 @@ def test_offset_limit(self, test_client, one_task_with_many_mapped_tis): ({"order_by": "map_index", "limit": 100}, list(range(100))), ({"order_by": "-map_index", "limit": 100}, list(range(109, 9, -1))), ( - {"order_by": "state", "limit": 108}, - list(range(5, 25)) + list(range(25, 110)) + list(range(3)), + {"order_by": "state", "limit": 108}, + list(range(5, 25)) + list(range(25, 110)) + list(range(3)), ), ( - {"order_by": "-state", "limit": 100}, - list(range(5)[::-1]) + list(range(25, 110)[::-1]) + list(range(15, 25)[::-1]), + {"order_by": "-state", "limit": 100}, + list(range(5)[::-1]) + list(range(25, 110)[::-1]) + list(range(15, 25)[::-1]), ), ({"order_by": "logical_date", "limit": 100}, list(range(100))), ({"order_by": "-logical_date", "limit": 100}, list(range(109, 9, -1))), @@ -1422,37 +1422,37 @@ def test_should_respond_empty_non_scheduled(self, test_client, session): "state, dependencies", [ ( - State.SCHEDULED, - { - "dependencies": [ - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " - "before the task's start date 2021-01-01T00:00:00+00:00.", - }, - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " - "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", - }, - ], - }, + State.SCHEDULED, + { + "dependencies": [ + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + ], + }, ), ( - State.NONE, - { - "dependencies": [ - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", - }, - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", - }, - {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, - ] - }, + State.NONE, + { + "dependencies": [ + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, + ] + }, ), ], ) @@ -2396,7 +2396,7 @@ def test_should_respond_200( check_last_log(session, dag_id=request_dag, event="post_clear_task_instances", logical_date=None) @pytest.mark.parametrize("flag", ["include_future", "include_past"]) - def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session, flag): + def test_manual_run_with_none_logical_date_returns_400(self, test_client, session, flag): dag_id = "example_python_operator" payload = { "dry_run": True, @@ -2404,7 +2404,7 @@ def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session "only_failed": True, flag: True, } - task_instances = [{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}] + task_instances = [{"logical_date": None, "state": State.FAILED}] self.create_task_instances( session, dag_id=dag_id, @@ -2415,10 +2415,103 @@ def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert response.status_code == 400 assert ( - "Cannot use include_past or include_future when dag_run_id is provided" + "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." in response.json()["detail"] ) + @pytest.mark.parametrize( + "flag, expected", + [ + ("include_past", 2), # D0 ~ D1 + ("include_future", 2), # D1 ~ + ], + ) + def test_with_dag_run_id_and_past_future_converts_to_date_range(self, test_client, session, flag, + expected): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # D0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D2 + ] + self.create_task_instances( + session, dag_id=dag_id, task_instances=task_instances, update_extras=False + ) + payload = { + "dry_run": True, + "only_failed": True, + "dag_run_id": "TEST_DAG_RUN_ID_1", + flag: True, + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + assert resp.json()["total_entries"] == expected # include_past => D0,D1 / include_future => D1,D2 + + def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_client, session): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), "state": State.FAILED}, # D0 + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, #D1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D2 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D3 + {"logical_date": None, "state": State.FAILED}, # D4 + ] + self.create_task_instances( + session, dag_id=dag_id, task_instances=task_instances, update_extras=False + ) + payload = { + "dry_run": True, + "only_failed": False, + "dag_run_id": "TEST_DAG_RUN_ID_1", # D1 + "include_past": True, + "include_future": True, + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + assert resp.json()["total_entries"] == 5 #D0 ~ #D4 + + def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, session): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS}, # D0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.SUCCESS}, # D2 + ] + self.create_task_instances( + session, dag_id=dag_id, task_instances=task_instances, update_extras=False + ) + payload = { + "dry_run": True, + "only_failed": True, + "dag_run_id": "TEST_DAG_RUN_ID_1", + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + a = resp.json() + assert resp.json()["total_entries"] == 1 + assert resp.json()["task_instances"][0]["logical_date"] == (DEFAULT_DATETIME_1 + dt.timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") #D1 + + @pytest.mark.parametrize("flag", ["include_future", "include_past"]) + def test_manual_run_with_none_logical_date_returns_400_kept(self, test_client, session, flag): + dag_id = "example_python_operator" + payload = { + "dry_run": True, + "dag_run_id": "TEST_DAG_RUN_ID_0", + "only_failed": True, + flag: True, + } + task_instances = [{"logical_date": None, "state": State.FAILED}] + self.create_task_instances( + session, dag_id=dag_id, task_instances=task_instances, update_extras=False, + dag_run_state=State.FAILED + ) + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 400 + assert ( + "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." + in resp.json()["detail"] + ) + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post( "/dags/dag_id/clearTaskInstances", @@ -2891,58 +2984,58 @@ def test_should_respond_404_for_nonexistent_dagrun_id(self, test_client, session "payload, expected", [ ( - {"end_date": "2020-11-10T12:42:39.442973"}, - { - "detail": [ - { - "type": "timezone_aware", - "loc": ["body", "end_date"], - "msg": "Input should have timezone info", - "input": "2020-11-10T12:42:39.442973", - } - ] - }, + {"end_date": "2020-11-10T12:42:39.442973"}, + { + "detail": [ + { + "type": "timezone_aware", + "loc": ["body", "end_date"], + "msg": "Input should have timezone info", + "input": "2020-11-10T12:42:39.442973", + } + ] + }, ), ( - {"end_date": "2020-11-10T12:4po"}, - { - "detail": [ - { - "type": "datetime_from_date_parsing", - "loc": ["body", "end_date"], - "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", - "input": "2020-11-10T12:4po", - "ctx": {"error": "unexpected extra characters at the end of the input"}, - } - ] - }, + {"end_date": "2020-11-10T12:4po"}, + { + "detail": [ + { + "type": "datetime_from_date_parsing", + "loc": ["body", "end_date"], + "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", + "input": "2020-11-10T12:4po", + "ctx": {"error": "unexpected extra characters at the end of the input"}, + } + ] + }, ), ( - {"start_date": "2020-11-10T12:42:39.442973"}, - { - "detail": [ - { - "type": "timezone_aware", - "loc": ["body", "start_date"], - "msg": "Input should have timezone info", - "input": "2020-11-10T12:42:39.442973", - } - ] - }, + {"start_date": "2020-11-10T12:42:39.442973"}, + { + "detail": [ + { + "type": "timezone_aware", + "loc": ["body", "start_date"], + "msg": "Input should have timezone info", + "input": "2020-11-10T12:42:39.442973", + } + ] + }, ), ( - {"start_date": "2020-11-10T12:4po"}, - { - "detail": [ - { - "type": "datetime_from_date_parsing", - "loc": ["body", "start_date"], - "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", - "input": "2020-11-10T12:4po", - "ctx": {"error": "unexpected extra characters at the end of the input"}, - } - ] - }, + {"start_date": "2020-11-10T12:4po"}, + { + "detail": [ + { + "type": "datetime_from_date_parsing", + "loc": ["body", "start_date"], + "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", + "input": "2020-11-10T12:4po", + "ctx": {"error": "unexpected extra characters at the end of the input"}, + } + ] + }, ), ], ) @@ -3643,16 +3736,16 @@ def test_should_raise_404_not_found_task(self, test_client): "payload, expected", [ ( - { - "new_state": "failede", - }, - f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "failede", + }, + f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ( - { - "new_state": "queued", - }, - f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "queued", + }, + f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ], ) @@ -3779,12 +3872,12 @@ def test_update_mask_should_call_mocked_api( "new_note_value,ti_note_data", [ ( - "My super cool TaskInstance note.", - {"content": "My super cool TaskInstance note.", "user_id": "test"}, + "My super cool TaskInstance note.", + {"content": "My super cool TaskInstance note.", "user_id": "test"}, ), ( - None, - {"content": None, "user_id": "test"}, + None, + {"content": None, "user_id": "test"}, ), ], ) @@ -4339,16 +4432,16 @@ def test_should_raise_404_not_found_task(self, test_client): "payload, expected", [ ( - { - "new_state": "failede", - }, - f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "failede", + }, + f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ( - { - "new_state": "queued", - }, - f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "queued", + }, + f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ], ) @@ -4507,19 +4600,19 @@ def test_should_respond_403(self, unauthorized_test_client): "test_url, setup_needed, expected_error", [ ( - f"/dags/non_existent_dag/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}", - False, - "The Task Instance with dag_id: `non_existent_dag`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", + f"/dags/non_existent_dag/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}", + False, + "The Task Instance with dag_id: `non_existent_dag`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", ), ( - f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/non_existent_task", - True, - "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `non_existent_task` and map_index: `-1` was not found", + f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/non_existent_task", + True, + "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `non_existent_task` and map_index: `-1` was not found", ), ( - f"/dags/{DAG_ID}/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/{TASK_ID}", - True, - "The Task Instance with dag_id: `example_python_operator`, run_id: `NON_EXISTENT_DAG_RUN`, task_id: `print_the_context` and map_index: `-1` was not found", + f"/dags/{DAG_ID}/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/{TASK_ID}", + True, + "The Task Instance with dag_id: `example_python_operator`, run_id: `NON_EXISTENT_DAG_RUN`, task_id: `print_the_context` and map_index: `-1` was not found", ), ], ) From 01be2fe0c6aa4799ab625006575853223f5d8310 Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:06:12 +0900 Subject: [PATCH 02/14] style: fix formatting --- .../core_api/routes/public/task_instances.py | 5 +- .../routes/public/test_task_instances.py | 363 +++++++++--------- 2 files changed, 183 insertions(+), 185 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 92958ae8503e8..5d8a9185395a0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -717,13 +717,13 @@ def post_clear_task_instances( if (past or future) and dag_run.logical_date is None: raise HTTPException( status.HTTP_400_BAD_REQUEST, - "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." + "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None).", ) if past or future: body.start_date = dag_run.logical_date if not past else None body.end_date = dag_run.logical_date if not future else None - dag_run_id = None # Use date-based clearing + dag_run_id = None # Use date-based clearing else: body.start_date = dag_run.logical_date body.end_date = dag_run.logical_date @@ -785,7 +785,6 @@ def post_clear_task_instances( ) - @task_instances_router.patch( task_instances_prefix + "/{task_id}/dry_run", responses=create_openapi_http_exception_doc( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 5bc86ab683dcd..102208c6c370a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -778,12 +778,12 @@ def test_offset_limit(self, test_client, one_task_with_many_mapped_tis): ({"order_by": "map_index", "limit": 100}, list(range(100))), ({"order_by": "-map_index", "limit": 100}, list(range(109, 9, -1))), ( - {"order_by": "state", "limit": 108}, - list(range(5, 25)) + list(range(25, 110)) + list(range(3)), + {"order_by": "state", "limit": 108}, + list(range(5, 25)) + list(range(25, 110)) + list(range(3)), ), ( - {"order_by": "-state", "limit": 100}, - list(range(5)[::-1]) + list(range(25, 110)[::-1]) + list(range(15, 25)[::-1]), + {"order_by": "-state", "limit": 100}, + list(range(5)[::-1]) + list(range(25, 110)[::-1]) + list(range(15, 25)[::-1]), ), ({"order_by": "logical_date", "limit": 100}, list(range(100))), ({"order_by": "-logical_date", "limit": 100}, list(range(109, 9, -1))), @@ -1422,37 +1422,37 @@ def test_should_respond_empty_non_scheduled(self, test_client, session): "state, dependencies", [ ( - State.SCHEDULED, - { - "dependencies": [ - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " - "before the task's start date 2021-01-01T00:00:00+00:00.", - }, - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " - "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", - }, - ], - }, + State.SCHEDULED, + { + "dependencies": [ + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + ], + }, ), ( - State.NONE, - { - "dependencies": [ - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", - }, - { - "name": "Logical Date", - "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", - }, - {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, - ] - }, + State.NONE, + { + "dependencies": [ + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Logical Date", + "reason": "The logical date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, + ] + }, ), ], ) @@ -2426,17 +2426,16 @@ def test_manual_run_with_none_logical_date_returns_400(self, test_client, sessio ("include_future", 2), # D1 ~ ], ) - def test_with_dag_run_id_and_past_future_converts_to_date_range(self, test_client, session, flag, - expected): + def test_with_dag_run_id_and_past_future_converts_to_date_range( + self, test_client, session, flag, expected + ): dag_id = "example_python_operator" task_instances = [ {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # D0 {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D2 ] - self.create_task_instances( - session, dag_id=dag_id, task_instances=task_instances, update_extras=False - ) + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { "dry_run": True, "only_failed": True, @@ -2451,14 +2450,12 @@ def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_cl dag_id = "example_python_operator" task_instances = [ {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), "state": State.FAILED}, # D0 - {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, #D1 + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # D1 {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D2 {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D3 {"logical_date": None, "state": State.FAILED}, # D4 ] - self.create_task_instances( - session, dag_id=dag_id, task_instances=task_instances, update_extras=False - ) + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { "dry_run": True, "only_failed": False, @@ -2468,7 +2465,7 @@ def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_cl } resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 - assert resp.json()["total_entries"] == 5 #D0 ~ #D4 + assert resp.json()["total_entries"] == 5 # D0 ~ #D4 def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, session): dag_id = "example_python_operator" @@ -2477,9 +2474,7 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.SUCCESS}, # D2 ] - self.create_task_instances( - session, dag_id=dag_id, task_instances=task_instances, update_extras=False - ) + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { "dry_run": True, "only_failed": True, @@ -2489,7 +2484,9 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess assert resp.status_code == 200 a = resp.json() assert resp.json()["total_entries"] == 1 - assert resp.json()["task_instances"][0]["logical_date"] == (DEFAULT_DATETIME_1 + dt.timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ") #D1 + assert resp.json()["task_instances"][0]["logical_date"] == ( + DEFAULT_DATETIME_1 + dt.timedelta(days=1) + ).strftime("%Y-%m-%dT%H:%M:%SZ") # D1 @pytest.mark.parametrize("flag", ["include_future", "include_past"]) def test_manual_run_with_none_logical_date_returns_400_kept(self, test_client, session, flag): @@ -2502,8 +2499,11 @@ def test_manual_run_with_none_logical_date_returns_400_kept(self, test_client, s } task_instances = [{"logical_date": None, "state": State.FAILED}] self.create_task_instances( - session, dag_id=dag_id, task_instances=task_instances, update_extras=False, - dag_run_state=State.FAILED + session, + dag_id=dag_id, + task_instances=task_instances, + update_extras=False, + dag_run_state=State.FAILED, ) resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 400 @@ -2984,58 +2984,58 @@ def test_should_respond_404_for_nonexistent_dagrun_id(self, test_client, session "payload, expected", [ ( - {"end_date": "2020-11-10T12:42:39.442973"}, - { - "detail": [ - { - "type": "timezone_aware", - "loc": ["body", "end_date"], - "msg": "Input should have timezone info", - "input": "2020-11-10T12:42:39.442973", - } - ] - }, + {"end_date": "2020-11-10T12:42:39.442973"}, + { + "detail": [ + { + "type": "timezone_aware", + "loc": ["body", "end_date"], + "msg": "Input should have timezone info", + "input": "2020-11-10T12:42:39.442973", + } + ] + }, ), ( - {"end_date": "2020-11-10T12:4po"}, - { - "detail": [ - { - "type": "datetime_from_date_parsing", - "loc": ["body", "end_date"], - "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", - "input": "2020-11-10T12:4po", - "ctx": {"error": "unexpected extra characters at the end of the input"}, - } - ] - }, + {"end_date": "2020-11-10T12:4po"}, + { + "detail": [ + { + "type": "datetime_from_date_parsing", + "loc": ["body", "end_date"], + "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", + "input": "2020-11-10T12:4po", + "ctx": {"error": "unexpected extra characters at the end of the input"}, + } + ] + }, ), ( - {"start_date": "2020-11-10T12:42:39.442973"}, - { - "detail": [ - { - "type": "timezone_aware", - "loc": ["body", "start_date"], - "msg": "Input should have timezone info", - "input": "2020-11-10T12:42:39.442973", - } - ] - }, + {"start_date": "2020-11-10T12:42:39.442973"}, + { + "detail": [ + { + "type": "timezone_aware", + "loc": ["body", "start_date"], + "msg": "Input should have timezone info", + "input": "2020-11-10T12:42:39.442973", + } + ] + }, ), ( - {"start_date": "2020-11-10T12:4po"}, - { - "detail": [ - { - "type": "datetime_from_date_parsing", - "loc": ["body", "start_date"], - "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", - "input": "2020-11-10T12:4po", - "ctx": {"error": "unexpected extra characters at the end of the input"}, - } - ] - }, + {"start_date": "2020-11-10T12:4po"}, + { + "detail": [ + { + "type": "datetime_from_date_parsing", + "loc": ["body", "start_date"], + "msg": "Input should be a valid datetime or date, unexpected extra characters at the end of the input", + "input": "2020-11-10T12:4po", + "ctx": {"error": "unexpected extra characters at the end of the input"}, + } + ] + }, ), ], ) @@ -3736,16 +3736,16 @@ def test_should_raise_404_not_found_task(self, test_client): "payload, expected", [ ( - { - "new_state": "failede", - }, - f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "failede", + }, + f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ( - { - "new_state": "queued", - }, - f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "queued", + }, + f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ], ) @@ -3772,50 +3772,49 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "new_state,expected_status_code,expected_json,set_ti_state_call_count", [ ( - "failed", - 200, - { - "task_instances": [ - { - "dag_id": "example_python_operator", - "dag_display_name": "example_python_operator", - "dag_version": mock.ANY, - "dag_run_id": "TEST_DAG_RUN_ID", - "logical_date": "2020-01-01T00:00:00Z", - "task_id": "print_the_context", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "operator_name": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": "print_the_context", - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, - } - ], - "total_entries": 1, - }, - 1, + "failed", + 200, + { + "task_instances": [ + { + "dag_id": "example_python_operator", + "dag_display_name": "example_python_operator", + "dag_version": mock.ANY, + "dag_run_id": "TEST_DAG_RUN_ID", + "logical_date": "2020-01-01T00:00:00Z", + "task_id": "print_the_context", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": "print_the_context", + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, + }, + 1, ), ( None, @@ -3872,12 +3871,12 @@ def test_update_mask_should_call_mocked_api( "new_note_value,ti_note_data", [ ( - "My super cool TaskInstance note.", - {"content": "My super cool TaskInstance note.", "user_id": "test"}, + "My super cool TaskInstance note.", + {"content": "My super cool TaskInstance note.", "user_id": "test"}, ), ( - None, - {"content": None, "user_id": "test"}, + None, + {"content": None, "user_id": "test"}, ), ], ) @@ -4432,16 +4431,16 @@ def test_should_raise_404_not_found_task(self, test_client): "payload, expected", [ ( - { - "new_state": "failede", - }, - f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "failede", + }, + f"'failede' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ( - { - "new_state": "queued", - }, - f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", + { + "new_state": "queued", + }, + f"'queued' is not one of ['{State.SUCCESS}', '{State.FAILED}', '{State.SKIPPED}']", ), ], ) @@ -4514,20 +4513,20 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte 1, ), ( - None, - 422, - { - "detail": [ - { - "type": "value_error", - "loc": ["body", "new_state"], - "msg": "Value error, 'new_state' should not be empty", - "input": None, - "ctx": {"error": {}}, - } - ] - }, - 0, + None, + 422, + { + "detail": [ + { + "type": "value_error", + "loc": ["body", "new_state"], + "msg": "Value error, 'new_state' should not be empty", + "input": None, + "ctx": {"error": {}}, + } + ] + }, + 0, ), ], ) @@ -4600,19 +4599,19 @@ def test_should_respond_403(self, unauthorized_test_client): "test_url, setup_needed, expected_error", [ ( - f"/dags/non_existent_dag/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}", - False, - "The Task Instance with dag_id: `non_existent_dag`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", + f"/dags/non_existent_dag/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}", + False, + "The Task Instance with dag_id: `non_existent_dag`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and map_index: `-1` was not found", ), ( - f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/non_existent_task", - True, - "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `non_existent_task` and map_index: `-1` was not found", + f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/non_existent_task", + True, + "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `non_existent_task` and map_index: `-1` was not found", ), ( - f"/dags/{DAG_ID}/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/{TASK_ID}", - True, - "The Task Instance with dag_id: `example_python_operator`, run_id: `NON_EXISTENT_DAG_RUN`, task_id: `print_the_context` and map_index: `-1` was not found", + f"/dags/{DAG_ID}/dagRuns/NON_EXISTENT_DAG_RUN/taskInstances/{TASK_ID}", + True, + "The Task Instance with dag_id: `example_python_operator`, run_id: `NON_EXISTENT_DAG_RUN`, task_id: `print_the_context` and map_index: `-1` was not found", ), ], ) From b334972d53fe471e3db68af049388c4b71e79ab0 Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:12:46 +0900 Subject: [PATCH 03/14] chore: fix typo --- .../routes/public/test_task_instances.py | 25 +------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 102208c6c370a..d20291016ac8d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2423,7 +2423,7 @@ def test_manual_run_with_none_logical_date_returns_400(self, test_client, sessio "flag, expected", [ ("include_past", 2), # D0 ~ D1 - ("include_future", 2), # D1 ~ + ("include_future", 2), # D1 ~ D2 ], ) def test_with_dag_run_id_and_past_future_converts_to_date_range( @@ -2488,29 +2488,6 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess DEFAULT_DATETIME_1 + dt.timedelta(days=1) ).strftime("%Y-%m-%dT%H:%M:%SZ") # D1 - @pytest.mark.parametrize("flag", ["include_future", "include_past"]) - def test_manual_run_with_none_logical_date_returns_400_kept(self, test_client, session, flag): - dag_id = "example_python_operator" - payload = { - "dry_run": True, - "dag_run_id": "TEST_DAG_RUN_ID_0", - "only_failed": True, - flag: True, - } - task_instances = [{"logical_date": None, "state": State.FAILED}] - self.create_task_instances( - session, - dag_id=dag_id, - task_instances=task_instances, - update_extras=False, - dag_run_state=State.FAILED, - ) - resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) - assert resp.status_code == 400 - assert ( - "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." - in resp.json()["detail"] - ) def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post( From 0de3dac1dbfa9b4bd0deffd0699212ccf4cf201b Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:14:16 +0900 Subject: [PATCH 04/14] chore: remove comment --- .../airflow/api_fastapi/core_api/routes/public/task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 5d8a9185395a0..6dcde492a1587 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -704,7 +704,6 @@ def post_clear_task_instances( downstream = body.include_downstream upstream = body.include_upstream - # Improved logic - resolve logical_date for scheduled DAGRun if dag_run_id is not None: dag_run: DagRun | None = session.scalar( select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id) From df6adeccc897786c6ac68ccf5d7afcf0d562905a Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:19:31 +0900 Subject: [PATCH 05/14] chore: fix comment DX to TX --- .../routes/public/test_task_instances.py | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index d20291016ac8d..7083ceaed220c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2422,8 +2422,8 @@ def test_manual_run_with_none_logical_date_returns_400(self, test_client, sessio @pytest.mark.parametrize( "flag, expected", [ - ("include_past", 2), # D0 ~ D1 - ("include_future", 2), # D1 ~ D2 + ("include_past", 2), # T0 ~ T1 + ("include_future", 2), # T1 ~ T2 ], ) def test_with_dag_run_id_and_past_future_converts_to_date_range( @@ -2431,9 +2431,9 @@ def test_with_dag_run_id_and_past_future_converts_to_date_range( ): dag_id = "example_python_operator" task_instances = [ - {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # D0 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D2 + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # T2 ] self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { @@ -2444,35 +2444,35 @@ def test_with_dag_run_id_and_past_future_converts_to_date_range( } resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 - assert resp.json()["total_entries"] == expected # include_past => D0,D1 / include_future => D1,D2 + assert resp.json()["total_entries"] == expected # include_past => T0,1 / include_future => T1,T2 def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_client, session): dag_id = "example_python_operator" task_instances = [ - {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), "state": State.FAILED}, # D0 - {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # D1 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D2 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # D3 - {"logical_date": None, "state": State.FAILED}, # D4 + {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), "state": State.FAILED}, # T0 + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T2 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # T3 + {"logical_date": None, "state": State.FAILED}, # T4 ] self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { "dry_run": True, "only_failed": False, - "dag_run_id": "TEST_DAG_RUN_ID_1", # D1 + "dag_run_id": "TEST_DAG_RUN_ID_1", # T1 "include_past": True, "include_future": True, } resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 - assert resp.json()["total_entries"] == 5 # D0 ~ #D4 + assert resp.json()["total_entries"] == 5 # T0 ~ #T4 def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, session): dag_id = "example_python_operator" task_instances = [ - {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS}, # D0 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # D1 - {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.SUCCESS}, # D2 + {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS}, # T0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.SUCCESS}, # T2 ] self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) payload = { @@ -2486,7 +2486,7 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess assert resp.json()["total_entries"] == 1 assert resp.json()["task_instances"][0]["logical_date"] == ( DEFAULT_DATETIME_1 + dt.timedelta(days=1) - ).strftime("%Y-%m-%dT%H:%M:%SZ") # D1 + ).strftime("%Y-%m-%dT%H:%M:%SZ") # T1 def test_should_respond_401(self, unauthenticated_test_client): From 268bf4bfb020868c5de941d878fd7896baa13dad Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:30:17 +0900 Subject: [PATCH 06/14] refactor: remove verbose handling future or past --- .../api_fastapi/core_api/routes/public/task_instances.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 6dcde492a1587..0c2e0d14f256f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -718,14 +718,10 @@ def post_clear_task_instances( status.HTTP_400_BAD_REQUEST, "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None).", ) - if past or future: - body.start_date = dag_run.logical_date if not past else None - body.end_date = dag_run.logical_date if not future else None dag_run_id = None # Use date-based clearing - else: - body.start_date = dag_run.logical_date - body.end_date = dag_run.logical_date + body.start_date = dag_run.logical_date + body.end_date = dag_run.logical_date if past: body.start_date = None From 4c75c98f7825a365f65100dbf77fc2d12c422424 Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:33:06 +0900 Subject: [PATCH 07/14] refactor: remove verbose handling future or past --- .../api_fastapi/core_api/routes/public/task_instances.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 0c2e0d14f256f..dad8faf1394a8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -718,8 +718,6 @@ def post_clear_task_instances( status.HTTP_400_BAD_REQUEST, "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None).", ) - if past or future: - dag_run_id = None # Use date-based clearing body.start_date = dag_run.logical_date body.end_date = dag_run.logical_date From 222f2cb583214067aff56ef4e1215badd96a1306 Mon Sep 17 00:00:00 2001 From: bggwak Date: Thu, 21 Aug 2025 01:38:42 +0900 Subject: [PATCH 08/14] refactor: use hardcoded test result --- .../core_api/routes/public/test_task_instances.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 7083ceaed220c..bd656df4125a6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2480,13 +2480,14 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess "only_failed": True, "dag_run_id": "TEST_DAG_RUN_ID_1", } + a= ( + DEFAULT_DATETIME_1 + dt.timedelta(days=1) + ).strftime("%Y-%m-%dT%H:%M:%SZ") # T1 resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 a = resp.json() assert resp.json()["total_entries"] == 1 - assert resp.json()["task_instances"][0]["logical_date"] == ( - DEFAULT_DATETIME_1 + dt.timedelta(days=1) - ).strftime("%Y-%m-%dT%H:%M:%SZ") # T1 + assert resp.json()["task_instances"][0]["logical_date"] == '2020-01-02T00:00:00Z' # T1 def test_should_respond_401(self, unauthenticated_test_client): From 9d933c4e2918ccd2c796eac7047528782d6d5d3a Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 00:03:34 +0900 Subject: [PATCH 09/14] fix(test_task_instances.py): correct test comment --- .../routes/public/test_task_instances.py | 123 +++++++++--------- 1 file changed, 59 insertions(+), 64 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index bd656df4125a6..8c5af8d21ea83 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2444,7 +2444,7 @@ def test_with_dag_run_id_and_past_future_converts_to_date_range( } resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 - assert resp.json()["total_entries"] == expected # include_past => T0,1 / include_future => T1,T2 + assert resp.json()["total_entries"] == expected # include_past => T0,T1 / include_future => T1,T2 def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_client, session): dag_id = "example_python_operator" @@ -2480,15 +2480,10 @@ def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, sess "only_failed": True, "dag_run_id": "TEST_DAG_RUN_ID_1", } - a= ( - DEFAULT_DATETIME_1 + dt.timedelta(days=1) - ).strftime("%Y-%m-%dT%H:%M:%SZ") # T1 resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert resp.status_code == 200 - a = resp.json() assert resp.json()["total_entries"] == 1 - assert resp.json()["task_instances"][0]["logical_date"] == '2020-01-02T00:00:00Z' # T1 - + assert resp.json()["task_instances"][0]["logical_date"] == "2020-01-02T00:00:00Z" # T1 def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post( @@ -3750,49 +3745,49 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "new_state,expected_status_code,expected_json,set_ti_state_call_count", [ ( - "failed", - 200, - { - "task_instances": [ - { - "dag_id": "example_python_operator", - "dag_display_name": "example_python_operator", - "dag_version": mock.ANY, - "dag_run_id": "TEST_DAG_RUN_ID", - "logical_date": "2020-01-01T00:00:00Z", - "task_id": "print_the_context", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": "print_the_context", - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, - } - ], - "total_entries": 1, - }, - 1, + "failed", + 200, + { + "task_instances": [ + { + "dag_id": "example_python_operator", + "dag_display_name": "example_python_operator", + "dag_version": mock.ANY, + "dag_run_id": "TEST_DAG_RUN_ID", + "logical_date": "2020-01-01T00:00:00Z", + "task_id": "print_the_context", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": "print_the_context", + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, + }, + 1, ), ( None, @@ -4491,20 +4486,20 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte 1, ), ( - None, - 422, - { - "detail": [ - { - "type": "value_error", - "loc": ["body", "new_state"], - "msg": "Value error, 'new_state' should not be empty", - "input": None, - "ctx": {"error": {}}, - } - ] - }, - 0, + None, + 422, + { + "detail": [ + { + "type": "value_error", + "loc": ["body", "new_state"], + "msg": "Value error, 'new_state' should not be empty", + "input": None, + "ctx": {"error": {}}, + } + ] + }, + 0, ), ], ) From 5d1ae497b75694137cfa029343925481b2ce3827 Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 00:21:05 +0900 Subject: [PATCH 10/14] fix(test_task_instances.py): fix typo --- .../api_fastapi/core_api/routes/public/test_task_instances.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 8c5af8d21ea83..de5e44c970cfb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -26,7 +26,6 @@ import pendulum import pytest -from airflow.models.baseoperator import BaseOperator from sqlalchemy import select from airflow._shared.timezones.timezone import datetime @@ -35,6 +34,7 @@ from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.listeners.listener import get_listener_manager from airflow.models import DagRun, Log, TaskInstance +from airflow.models.baseoperator import BaseOperator from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DagBag, sync_bag_to_db from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF @@ -45,6 +45,7 @@ from airflow.utils.platform import getuser from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunType + from tests_common.test_utils.api_fastapi import _check_task_instance_note from tests_common.test_utils.db import ( clear_db_runs, @@ -3766,6 +3767,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte "max_tries": 0, "note": "placeholder-note", "operator": "PythonOperator", + "operator_name": "PythonOperator", "pid": 100, "pool": "default_pool", "pool_slots": 1, From b25a56e11353ed56c6bc4a126119e7179e0e03c8 Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 00:26:43 +0900 Subject: [PATCH 11/14] fix(test_task_instances.py): fix typo --- .../api_fastapi/core_api/routes/public/test_task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index de5e44c970cfb..4dc673b8c2097 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -34,7 +34,6 @@ from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.listeners.listener import get_listener_manager from airflow.models import DagRun, Log, TaskInstance -from airflow.models.baseoperator import BaseOperator from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DagBag, sync_bag_to_db from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF From 6a00154776004e5a5900a83e735b34ad2e6637d5 Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 00:36:36 +0900 Subject: [PATCH 12/14] fix(task_instances.py): make sure start_date, end_date to None --- .../api_fastapi/core_api/routes/public/task_instances.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index dad8faf1394a8..7cecb67c41f80 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -718,8 +718,8 @@ def post_clear_task_instances( status.HTTP_400_BAD_REQUEST, "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None).", ) - body.start_date = dag_run.logical_date - body.end_date = dag_run.logical_date + body.start_date = dag_run.logical_date if dag_run.logical_date is not None else None + body.end_date = dag_run.logical_date if dag_run.logical_date is not None else None if past: body.start_date = None From 320599dde75d3be6faeaca77f8470ad83bc55f19 Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 21:55:11 +0900 Subject: [PATCH 13/14] fix(task_instances.py): fix inaccurate response --- .../api_fastapi/core_api/routes/public/task_instances.py | 2 +- .../api_fastapi/core_api/routes/public/test_task_instances.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 7cecb67c41f80..334413693d0b6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -716,7 +716,7 @@ def post_clear_task_instances( if (past or future) and dag_run.logical_date is None: raise HTTPException( status.HTTP_400_BAD_REQUEST, - "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None).", + "Cannot use include_past or include_future with no logical_date(e.g., manually or asset-triggered).", ) body.start_date = dag_run.logical_date if dag_run.logical_date is not None else None body.end_date = dag_run.logical_date if dag_run.logical_date is not None else None diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 4dc673b8c2097..a657ad809fd2f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2415,7 +2415,7 @@ def test_manual_run_with_none_logical_date_returns_400(self, test_client, sessio response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert response.status_code == 400 assert ( - "Cannot use include_past or include_future with a manually triggered DAG run (logical_date is None)." + "Cannot use include_past or include_future with no logical_date(e.g., manually or asset-triggered)." in response.json()["detail"] ) From 64fe11446bf27e2415b966f26ac04bb0e51480e7 Mon Sep 17 00:00:00 2001 From: nothinmin Date: Wed, 3 Sep 2025 22:01:09 +0900 Subject: [PATCH 14/14] fix(task_instances.py): fix inaccurate response --- .../api_fastapi/core_api/routes/public/task_instances.py | 2 +- .../api_fastapi/core_api/routes/public/test_task_instances.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 334413693d0b6..87d76fedc921f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -716,7 +716,7 @@ def post_clear_task_instances( if (past or future) and dag_run.logical_date is None: raise HTTPException( status.HTTP_400_BAD_REQUEST, - "Cannot use include_past or include_future with no logical_date(e.g., manually or asset-triggered).", + "Cannot use include_past or include_future with no logical_date(e.g. manually or asset-triggered).", ) body.start_date = dag_run.logical_date if dag_run.logical_date is not None else None body.end_date = dag_run.logical_date if dag_run.logical_date is not None else None diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index a657ad809fd2f..2c10ef1e341ab 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2415,7 +2415,7 @@ def test_manual_run_with_none_logical_date_returns_400(self, test_client, sessio response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert response.status_code == 400 assert ( - "Cannot use include_past or include_future with no logical_date(e.g., manually or asset-triggered)." + "Cannot use include_past or include_future with no logical_date(e.g. manually or asset-triggered)." in response.json()["detail"] )