diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 437e2ef8d6600..87d76fedc921f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -713,10 +713,10 @@ def post_clear_task_instances( raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) # Get the specific dag version: dag = get_dag_for_run(dag_bag, dag_run, session) - if past or future: + if (past or future) and dag_run.logical_date is None: raise HTTPException( status.HTTP_400_BAD_REQUEST, - "Cannot use include_past or include_future when dag_run_id is provided because logical_date is not applicable.", + "Cannot use include_past or include_future with no logical_date(e.g. manually or asset-triggered).", ) body.start_date = dag_run.logical_date if dag_run.logical_date is not None else None body.end_date = dag_run.logical_date if dag_run.logical_date is not None else None diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index c19b7d20728a3..2c10ef1e341ab 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2396,7 +2396,7 @@ def test_should_respond_200( check_last_log(session, dag_id=request_dag, event="post_clear_task_instances", logical_date=None) @pytest.mark.parametrize("flag", ["include_future", "include_past"]) - def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session, flag): + def test_manual_run_with_none_logical_date_returns_400(self, test_client, session, flag): dag_id = "example_python_operator" payload = { "dry_run": True, @@ -2404,7 +2404,7 @@ def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session "only_failed": True, flag: True, } - task_instances = [{"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}] + task_instances = [{"logical_date": None, "state": State.FAILED}] self.create_task_instances( session, dag_id=dag_id, @@ -2415,10 +2415,76 @@ def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session response = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) assert response.status_code == 400 assert ( - "Cannot use include_past or include_future when dag_run_id is provided" + "Cannot use include_past or include_future with no logical_date(e.g. manually or asset-triggered)." in response.json()["detail"] ) + @pytest.mark.parametrize( + "flag, expected", + [ + ("include_past", 2), # T0 ~ T1 + ("include_future", 2), # T1 ~ T2 + ], + ) + def test_with_dag_run_id_and_past_future_converts_to_date_range( + self, test_client, session, flag, expected + ): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # T2 + ] + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) + payload = { + "dry_run": True, + "only_failed": True, + "dag_run_id": "TEST_DAG_RUN_ID_1", + flag: True, + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + assert resp.json()["total_entries"] == expected # include_past => T0,T1 / include_future => T1,T2 + + def test_with_dag_run_id_and_both_past_and_future_means_full_range(self, test_client, session): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1 - dt.timedelta(days=1), "state": State.FAILED}, # T0 + {"logical_date": DEFAULT_DATETIME_1, "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T2 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.FAILED}, # T3 + {"logical_date": None, "state": State.FAILED}, # T4 + ] + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) + payload = { + "dry_run": True, + "only_failed": False, + "dag_run_id": "TEST_DAG_RUN_ID_1", # T1 + "include_past": True, + "include_future": True, + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + assert resp.json()["total_entries"] == 5 # T0 ~ #T4 + + def test_with_dag_run_id_only_uses_run_id_based_clearing(self, test_client, session): + dag_id = "example_python_operator" + task_instances = [ + {"logical_date": DEFAULT_DATETIME_1, "state": State.SUCCESS}, # T0 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1), "state": State.FAILED}, # T1 + {"logical_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2), "state": State.SUCCESS}, # T2 + ] + self.create_task_instances(session, dag_id=dag_id, task_instances=task_instances, update_extras=False) + payload = { + "dry_run": True, + "only_failed": True, + "dag_run_id": "TEST_DAG_RUN_ID_1", + } + resp = test_client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload) + assert resp.status_code == 200 + assert resp.json()["total_entries"] == 1 + assert resp.json()["task_instances"][0]["logical_date"] == "2020-01-02T00:00:00Z" # T1 + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post( "/dags/dag_id/clearTaskInstances",