diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index ae644e6cfc22d..d85c137c6cae8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -132,12 +132,22 @@ def get_dag_run(dag_id: str, dag_run_id: str, session: SessionDep) -> DAGRunResp def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep): """Delete a DAG Run entry.""" dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) + deletable_states = {s.value for s in DAGRunPatchStates} if dag_run is None: raise HTTPException( status.HTTP_404_NOT_FOUND, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found", ) + if dag_run.state not in deletable_states: + raise HTTPException( + status.HTTP_409_CONFLICT, + ( + f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` " + f"cannot be deleted in {dag_run.state} state" + ), + ) + session.delete(dag_run) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 3720994e04df7..87a7b932b749b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1322,6 +1322,23 @@ def test_delete_dag_run_not_found(self, test_client): body = response.json() assert body["detail"] == "The DagRun with dag_id: `test_dag1` and run_id: `invalid` was not found" + def test_delete_dag_run_in_running_state(self, test_client, dag_maker, session): + with dag_maker(dag_id="test_running_dag"): + EmptyOperator(task_id="t1") + + dag_maker.create_dagrun( + run_id="test_running", + state=DagRunState.RUNNING, + ) + session.commit() + response = test_client.delete("/dags/test_running_dag/dagRuns/test_running") + assert response.status_code == 409 + body = response.json() + assert body["detail"] == ( + "The DagRun with dag_id: `test_running_dag` and run_id: `test_running` " + "cannot be deleted in running state" + ) + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.delete(f"/dags/{DAG1_ID}/dagRuns/invalid") assert response.status_code == 401