diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index ea4a1e35b2163..989fd9542f8df 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -18,8 +18,9 @@ from marshmallow import ValidationError from airflow import DAG +from airflow._vendor.connexion import NoContent from airflow.api_connexion import security -from airflow.api_connexion.exceptions import BadRequest, NotFound +from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound from airflow.api_connexion.parameters import check_limit, format_parameters from airflow.api_connexion.schemas.dag_schema import ( DAGCollection, @@ -27,9 +28,10 @@ dag_schema, dags_collection_schema, ) -from airflow.exceptions import SerializedDagNotFound +from airflow.exceptions import AirflowException, DagNotFound, SerializedDagNotFound from airflow.models.dag import DagModel from airflow.security import permissions +from airflow.settings import Session from airflow.utils.session import provide_session @@ -100,3 +102,22 @@ def patch_dag(session, dag_id, update_mask=None): setattr(dag, 'is_paused', patch_body['is_paused']) session.commit() return dag_schema.dump(dag) + + +@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG)]) +@provide_session +def delete_dag(dag_id: str, session: Session): + """Delete the specific DAG.""" + # TODO: This function is shared with the /delete endpoint used by the web + # UI, so we're reusing it to simplify maintenance. Refactor the function to + # another place when the experimental/legacy API is removed. + from airflow.api.common.experimental import delete_dag + + try: + delete_dag.delete_dag(dag_id, session=session) + except DagNotFound: + raise NotFound(f"Dag with id: '{dag_id}' not found") + except AirflowException: + raise AlreadyExists(detail=f"Task instances of dag with id: '{dag_id}' are still running") + + return NoContent, 204 diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 2ce6804bbd888..39ce36f48d1a6 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -230,7 +230,7 @@ info: ## AlreadyExists The request could not be completed due to a conflict with the current state of the target - resource, meaning that the resource already exists + resource, e.g. the resource it tries to create already exists. ## Unknown @@ -478,6 +478,28 @@ paths: '404': $ref: '#/components/responses/NotFound' + delete: + summary: Delete a DAG + description: > + Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. + Logs are not deleted. This action cannot be undone. + x-openapi-router-controller: airflow.api_connexion.endpoints.dag_endpoint + operationId: delete_dag + tags: [DAG] + responses: + '204': + description: Success. + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthenticated' + '403': + $ref: '#/components/responses/PermissionDenied' + '404': + $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/AlreadyExists' + /dags/{dag_id}/clearTaskInstances: parameters: - $ref: '#/components/parameters/DAGID' @@ -3522,7 +3544,7 @@ components: $ref: '#/components/schemas/Error' # 409 'AlreadyExists': - description: The resource that a client tried to create already exists. + description: An existing resource conflicts with the request. content: application/json: schema: diff --git a/airflow/exceptions.py b/airflow/exceptions.py index a2b28eefd6cfe..a3ce7e224918e 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -20,6 +20,7 @@ # to be marked in an ERROR state """Exceptions used by Airflow""" import datetime +import warnings from typing import Any, Dict, List, NamedTuple, Optional from airflow.utils.code_utils import prepare_code_snippet @@ -139,6 +140,10 @@ class DagRunAlreadyExists(AirflowBadRequest): class DagFileExists(AirflowBadRequest): """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2) + class DuplicateTaskIdFound(AirflowException): """Raise when a Task with duplicate task_id is defined in the same DAG""" diff --git a/airflow/www/views.py b/airflow/www/views.py index 146145b2abc98..76b5525cff69e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1556,7 +1556,7 @@ def run(self): def delete(self): """Deletes DAG.""" from airflow.api.common.experimental import delete_dag - from airflow.exceptions import DagFileExists, DagNotFound + from airflow.exceptions import DagNotFound dag_id = request.values.get('dag_id') origin = get_safe_url(request.values.get('origin')) @@ -1566,9 +1566,6 @@ def delete(self): except DagNotFound: flash(f"DAG with id {dag_id} not found. Cannot delete", 'error') return redirect(request.referrer) - except DagFileExists: - flash(f"Dag id {dag_id} is still in DagBag. Remove the DAG file first.", 'error') - return redirect(request.referrer) except AirflowException: flash( f"Cannot delete DAG with id {dag_id} because some task instances of the DAG "