diff --git a/airflow/providers/cncf/kubernetes/operators/job.py b/airflow/providers/cncf/kubernetes/operators/job.py index 6f06f88ea5cf5f..aa55eb4ebb5550 100644 --- a/airflow/providers/cncf/kubernetes/operators/job.py +++ b/airflow/providers/cncf/kubernetes/operators/job.py @@ -25,8 +25,10 @@ from kubernetes.client import BatchV1Api, models as k8s from kubernetes.client.api_client import ApiClient +from kubernetes.client.rest import ApiException from airflow.exceptions import AirflowException +from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( add_unique_suffix, @@ -305,3 +307,72 @@ def reconcile_job_specs( return merge_objects(base_spec, client_spec) return None + + +class KubernetesDeleteJobOperator(BaseOperator): + """ + Delete a Kubernetes Job. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:KubernetesDeleteJobOperator` + + :param name: name of the Job. + :param namespace: the namespace to run within kubernetes. + :param kubernetes_conn_id: The :ref:`kubernetes connection id ` + for the Kubernetes cluster. + :param config_file: The path to the Kubernetes config file. (templated) + If not specified, default value is ``~/.kube/config`` + :param in_cluster: run kubernetes client with in_cluster configuration. + :param cluster_context: context that points to kubernetes cluster. + Ignored when in_cluster is True. If None, current-context is used. (templated) + """ + + template_fields: Sequence[str] = ( + "config_file", + "namespace", + "cluster_context", + ) + + def __init__( + self, + *, + name: str, + namespace: str, + kubernetes_conn_id: str | None = KubernetesHook.default_conn_name, + config_file: str | None = None, + in_cluster: bool | None = None, + cluster_context: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.name = name + self.namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.config_file = config_file + self.in_cluster = in_cluster + self.cluster_context = cluster_context + + @cached_property + def hook(self) -> KubernetesHook: + return KubernetesHook( + conn_id=self.kubernetes_conn_id, + in_cluster=self.in_cluster, + config_file=self.config_file, + cluster_context=self.cluster_context, + ) + + @cached_property + def client(self) -> BatchV1Api: + return self.hook.batch_v1_client + + def execute(self, context: Context): + try: + self.log.info("Deleting kubernetes Job: %s", self.name) + self.client.delete_namespaced_job(name=self.name, namespace=self.namespace) + self.log.info("Kubernetes job was deleted.") + except ApiException as e: + if e.status == 404: + self.log.info("The Kubernetes job %s does not exist.", self.name) + else: + raise e diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 0fc4db56c8aee2..809e7420214a47 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -33,7 +33,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning -from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator +from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.operators.resource import ( KubernetesCreateResourceOperator, @@ -1345,3 +1345,110 @@ def __init__( self.suspend = True self.labels.update({"kueue.x-k8s.io/queue-name": queue_name}) self.annotations.update({"kueue.x-k8s.io/queue-name": queue_name}) + + +class GKEDeleteJobOperator(KubernetesDeleteJobOperator): + """ + Delete a Kubernetes job in the specified Google Kubernetes Engine cluster. + + This Operator assumes that the system has gcloud installed and has configured a + connection id with a service account. + + The **minimum** required to define a cluster to create are the variables + ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, + ``namespace`` + + .. seealso:: + For more detail about Kubernetes Engine authentication have a look at the reference: + https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GKEDeleteJobOperator` + + :param location: The name of the Google Kubernetes Engine zone or region in which the + cluster resides, e.g. 'us-central1-a' + :param cluster_name: The name of the Google Kubernetes Engine cluster + :param use_internal_ip: Use the internal IP address as the endpoint. + :param project_id: The Google Developers Console project id + :param gcp_conn_id: The Google cloud connection id to use. This allows for + users to specify a service account. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"project_id", "location", "cluster_name"} | set(KubernetesDeleteJobOperator.template_fields) + ) + + def __init__( + self, + *, + location: str, + cluster_name: str, + use_internal_ip: bool = False, + project_id: str | None = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.cluster_name = cluster_name + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.use_internal_ip = use_internal_ip + + self._ssl_ca_cert: str | None = None + self._cluster_url: str | None = None + + if self.gcp_conn_id is None: + raise AirflowException( + "The gcp_conn_id parameter has become required. If you want to use Application Default " + "Credentials (ADC) strategy for authorization, create an empty connection " + "called `google_cloud_default`.", + ) + # There is no need to manage the kube_config file, as it will be generated automatically. + # All Kubernetes parameters (except config_file) are also valid for the GKEDeleteJobOperator. + if self.config_file: + raise AirflowException("config_file is not an allowed parameter for the GKEDeleteJobOperator.") + + @cached_property + def cluster_hook(self) -> GKEHook: + return GKEHook( + gcp_conn_id=self.gcp_conn_id, + location=self.location, + impersonation_chain=self.impersonation_chain, + ) + + @cached_property + def hook(self) -> GKEJobHook: + if self._cluster_url is None or self._ssl_ca_cert is None: + raise AttributeError( + "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Try to use self.get_kube_creds method", + ) + + return GKEJobHook( + gcp_conn_id=self.gcp_conn_id, + cluster_url=self._cluster_url, + ssl_ca_cert=self._ssl_ca_cert, + ) + + def execute(self, context: Context): + """Execute process of deleting Job.""" + self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( + cluster_name=self.cluster_name, + project_id=self.project_id, + use_internal_ip=self.use_internal_ip, + cluster_hook=self.cluster_hook, + ).fetch_cluster_info() + + return super().execute(context) diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index bfd629a49bef72..4307aad95dffa2 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -628,3 +628,18 @@ Instead of ``template`` parameter for Pod creating this operator uses :class:`~a It means that user can use all parameters from :class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` in :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator`. More information about the Jobs here: `Kubernetes Job Documentation `__ + + +.. _howto/operator:KubernetesDeleteJobOperator: + +KubernetesDeleteJobOperator +=========================== + +The :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator` allows +you to delete Jobs on a Kubernetes cluster. + +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes_job.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_delete_k8s_job] + :end-before: [END howto_operator_delete_k8s_job] diff --git a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst index 4d5a7d0b63dd36..53681ddceee8fe 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst @@ -222,6 +222,27 @@ For run Job on a GKE cluster with Kueue enabled use ``GKEStartKueueJobOperator`` :end-before: [END howto_operator_kueue_start_job] +.. _howto/operator:GKEDeleteJobOperator: + +Delete a Job on a GKE cluster +""""""""""""""""""""""""""""" + +There are two operators available in order to delete a job on a GKE cluster: + +* :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator` +* :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDeleteJobOperator` + +``GKEDeleteJobOperator`` extends ``KubernetesDeleteJobOperator`` to provide authorization using Google Cloud credentials. +There is no need to manage the ``kube_config`` file, as it will be generated automatically. +All Kubernetes parameters (except ``config_file``) are also valid for the ``GKEDeleteJobOperator``. + +.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gke_delete_job] + :end-before: [END howto_operator_gke_delete_job] + + .. _howto/operator:GKEDescribeJobOperator: Retrieve information about Job by given name diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py b/tests/providers/cncf/kubernetes/operators/test_job.py index 1d6d47e5cb1322..cd9f3835af0e0e 100644 --- a/tests/providers/cncf/kubernetes/operators/test_job.py +++ b/tests/providers/cncf/kubernetes/operators/test_job.py @@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException from airflow.models import DAG, DagModel, DagRun, TaskInstance -from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator +from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -519,3 +519,29 @@ def test_wait_until_job_complete( namespace=mock_job_expected.metadata.namespace, job_poll_interval=POLL_INTERVAL, ) + + +@pytest.mark.execution_timeout(300) +class TestKubernetesDeleteJobOperator: + @pytest.fixture(autouse=True) + def setup_tests(self): + self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client") + self._default_client_mock = self._default_client_patch.start() + + yield + + patch.stopall() + + @patch("kubernetes.config.load_kube_config") + @patch("kubernetes.client.api.BatchV1Api.delete_namespaced_job") + def test_delete_execute(self, mock_delete_namespaced_job, mock_load_kube_config): + op = KubernetesDeleteJobOperator( + kubernetes_conn_id="kubernetes_default", + task_id="test_delete_job", + name="test_job_name", + namespace="test_job_namespace", + ) + + op.execute(None) + + mock_delete_namespaced_job.assert_called() diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index e999ff84f059fd..62ccb238fb9438 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -28,7 +28,7 @@ from airflow.exceptions import AirflowException, TaskDeferred from airflow.models import Connection -from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator +from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.operators.resource import ( KubernetesCreateResourceOperator, @@ -40,6 +40,7 @@ GKECreateCustomResourceOperator, GKEDeleteClusterOperator, GKEDeleteCustomResourceOperator, + GKEDeleteJobOperator, GKEDescribeJobOperator, GKEStartJobOperator, GKEStartKueueInsideClusterOperator, @@ -88,6 +89,9 @@ KUB_DELETE_RES_OPERATOR_EXEC = ( "airflow.providers.cncf.kubernetes.operators.resource.KubernetesDeleteResourceOperator.execute" ) +DEL_KUB_JOB_OPERATOR_EXEC = ( + "airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator.execute" +) TEMP_FILE = "tempfile.NamedTemporaryFile" GKE_OP_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator" GKE_CREATE_CLUSTER_PATH = ( @@ -1130,3 +1134,111 @@ def test_gcp_conn_id(self, get_con_mock): hook = gke_op.hook assert hook.gcp_conn_id == "test_conn" + + +class TestGKEDeleteJobOperator: + def setup_method(self): + self.gke_op = GKEDeleteJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + ) + + def test_template_fields(self): + assert set(KubernetesDeleteJobOperator.template_fields).issubset(GKEDeleteJobOperator.template_fields) + + @mock.patch.dict(os.environ, {}) + @mock.patch(DEL_KUB_JOB_OPERATOR_EXEC) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + def test_execute(self, mock_hook, fetch_cluster_info_mock, file_mock, exec_mock): + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.execute(context=mock.MagicMock()) + fetch_cluster_info_mock.assert_called_once() + + def test_config_file_throws_error(self): + with pytest.raises(AirflowException): + GKEDeleteJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + config_file="/path/to/alternative/kubeconfig", + ) + + @mock.patch.dict(os.environ, {}) + @mock.patch( + "airflow.hooks.base.BaseHook.get_connections", + return_value=[Connection(extra=json.dumps({"keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}))], + ) + @mock.patch(DEL_KUB_JOB_OPERATOR_EXEC) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + def test_execute_with_impersonation_service_account( + self, mock_hook, fetch_cluster_info_mock, file_mock, exec_mock, get_con_mock + ): + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.impersonation_chain = "test_account@example.com" + self.gke_op.execute(context=mock.MagicMock()) + fetch_cluster_info_mock.assert_called_once() + + @mock.patch.dict(os.environ, {}) + @mock.patch( + "airflow.hooks.base.BaseHook.get_connections", + return_value=[Connection(extra=json.dumps({"keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}))], + ) + @mock.patch(DEL_KUB_JOB_OPERATOR_EXEC) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + def test_execute_with_impersonation_service_chain_one_element( + self, mock_hook, fetch_cluster_info_mock, file_mock, exec_mock, get_con_mock + ): + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.impersonation_chain = ["test_account@example.com"] + self.gke_op.execute(context=mock.MagicMock()) + + fetch_cluster_info_mock.assert_called_once() + + @pytest.mark.db_test + def test_default_gcp_conn_id(self): + gke_op = GKEDeleteJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + ) + gke_op._cluster_url = CLUSTER_URL + gke_op._ssl_ca_cert = SSL_CA_CERT + hook = gke_op.hook + + assert hook.gcp_conn_id == "google_cloud_default" + + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_connection", + return_value=Connection(conn_id="test_conn"), + ) + def test_gcp_conn_id(self, get_con_mock): + gke_op = GKEDeleteJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + name=TASK_NAME, + namespace=NAMESPACE, + gcp_conn_id="test_conn", + ) + gke_op._cluster_url = CLUSTER_URL + gke_op._ssl_ca_cert = SSL_CA_CERT + hook = gke_op.hook + + assert hook.gcp_conn_id == "test_conn" diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py index 8801f5ddaf6a6a..5b68dedba2d2b8 100644 --- a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py +++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py @@ -24,11 +24,14 @@ from datetime import datetime from airflow import DAG -from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator +from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_kubernetes_job_operator" +JOB_NAME = "test-pi" +JOB_NAMESPACE = "default" + with DAG( dag_id=DAG_ID, schedule=None, @@ -38,13 +41,23 @@ # [START howto_operator_k8s_job] k8s_job = KubernetesJobOperator( task_id="job-task", - namespace="default", + namespace=JOB_NAMESPACE, image="perl:5.34.0", cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"], - name="test-pi", + name=JOB_NAME, ) # [END howto_operator_k8s_job] + # [START howto_operator_delete_k8s_job] + delete_job_task = KubernetesDeleteJobOperator( + task_id="delete_job_task", + name=JOB_NAME, + namespace=JOB_NAMESPACE, + ) + # [END howto_operator_delete_k8s_job] + + k8s_job >> delete_job_task + from tests.system.utils.watcher import watcher # This test needs watcher in order to properly mark success/failure diff --git a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py index a118bda06ba60a..e28951a8858ddb 100644 --- a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py +++ b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py @@ -27,6 +27,7 @@ from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKECreateClusterOperator, GKEDeleteClusterOperator, + GKEDeleteJobOperator, GKEDescribeJobOperator, GKEListJobsOperator, GKEStartJobOperator, @@ -40,6 +41,9 @@ CLUSTER_NAME = f"cluster-name-test-build-{ENV_ID}" CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1} +JOB_NAME = "test-pi" +JOB_NAMESPACE = "default" + with DAG( DAG_ID, schedule="@once", # Override to match your needs @@ -60,10 +64,10 @@ project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME, - namespace="default", + namespace=JOB_NAMESPACE, image="perl:5.34.0", cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"], - name="test-pi", + name=JOB_NAME, ) # [END howto_operator_gke_start_job] @@ -84,6 +88,17 @@ ) # [END howto_operator_gke_describe_job] + # [START howto_operator_gke_delete_job] + delete_job = GKEDeleteJobOperator( + task_id="delete_job", + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + cluster_name=CLUSTER_NAME, + name=JOB_NAME, + namespace=JOB_NAMESPACE, + ) + # [END howto_operator_gke_delete_job] + delete_cluster = GKEDeleteClusterOperator( task_id="delete_cluster", name=CLUSTER_NAME, @@ -91,7 +106,7 @@ location=GCP_LOCATION, ) - create_cluster >> job_task >> delete_cluster + (create_cluster >> job_task >> list_job_task >> describe_job_task >> delete_job >> delete_cluster) from tests.system.utils.watcher import watcher