diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index 3f71e5c9cd08a..c7c23c3c21d10 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -36,7 +36,7 @@ from google.cloud.dataproc_v1 import Batch, Cluster, ClusterStatus, JobStatus from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.common.compat.sdk import AirflowException, conf +from airflow.providers.common.compat.sdk import AirflowException, conf, timezone from airflow.providers.google.cloud.hooks.dataproc import ( DataprocHook, DataProcJobBuilder, @@ -63,7 +63,6 @@ ) from airflow.providers.google.cloud.utils.dataproc import DataprocOperationType from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID -from airflow.utils import timezone if TYPE_CHECKING: from google.api_core import operation @@ -995,21 +994,24 @@ def __init__( def execute(self, context: Context) -> None: hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - operation = self._delete_cluster(hook) + try: + op: operation.Operation = self._delete_cluster(hook) + + except NotFound: + self.log.info( + "Cluster %s not found in region %s. might have been deleted already.", + self.cluster_name, + self.region, + ) + return + + except Exception as e: + raise AirflowException(str(e)) + if not self.deferrable: - hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation) + hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=op) self.log.info("Cluster deleted.") else: - try: - hook.get_cluster( - project_id=self.project_id, region=self.region, cluster_name=self.cluster_name - ) - except NotFound: - self.log.info("Cluster deleted.") - return - except Exception as e: - raise AirflowException(str(e)) - end_time: float = time.time() + self.timeout self.defer( trigger=DataprocDeleteClusterTrigger( diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index e2db6935fc204..c85003bc32357 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -33,7 +33,11 @@ from airflow import __version__ as AIRFLOW_VERSION from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import DAG, DagBag -from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, TaskDeferred +from airflow.providers.common.compat.sdk import ( + AirflowException, + AirflowTaskTimeout, + TaskDeferred, +) from airflow.providers.google.cloud.links.dataproc import ( DATAPROC_BATCH_LINK, DATAPROC_CLUSTER_LINK_DEPRECATED, @@ -1269,7 +1273,67 @@ def test_create_execute_call_finished_before_defer(self, mock_trigger_hook, mock ) mock_hook.return_value.wait_for_operation.assert_not_called() - assert not mock_defer.called + assert mock_defer.called + + @mock.patch(DATAPROC_PATH.format("DataprocHook")) + def test_execute_cluster_not_found(self, mock_hook): + mock_hook.return_value.create_cluster.return_value = None + mock_hook.return_value.delete_cluster.side_effect = NotFound("test") + delete_cluster_op = DataprocDeleteClusterOperator( + task_id="test_task", + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + delete_cluster_op.execute(context=mock.MagicMock()) + mock_hook.return_value.delete_cluster.assert_called_once_with( + project_id=GCP_PROJECT, + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(DATAPROC_PATH.format("DataprocHook")) + @mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook")) + def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook): + mock_hook.return_value.create_cluster.return_value = None + mock_hook.return_value.delete_cluster.side_effect = NotFound("test") + delete_cluster_op = DataprocDeleteClusterOperator( + task_id="test_task", + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + deferrable=True, + ) + + delete_cluster_op.execute(context=mock.MagicMock()) + mock_hook.return_value.delete_cluster.assert_called_once_with( + project_id=GCP_PROJECT, + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert not mock_deffer.called class TestDataprocSubmitJobOperator(DataprocJobTestBase):