From 7992d62c9debc776038b064f7c0ad48c439b2bc7 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Tue, 13 Jan 2026 00:27:04 +0200 Subject: [PATCH] Add 'ignore_if_missing' to DataprocDeleteClusterOperator --- .../google/cloud/operators/dataproc.py | 18 +++++++---- .../google/cloud/operators/test_dataproc.py | 32 +++++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index c7c23c3c21d10..55aa554ecdb4e 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -949,6 +949,8 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator): account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode. :param polling_interval_seconds: Time (seconds) to wait between calls to check the cluster status. + :param ignore_if_missing: If True, the operator will not raise an exception if the cluster does not exist. + Defaults to False. """ template_fields: Sequence[str] = ( @@ -974,6 +976,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), polling_interval_seconds: int = 10, + ignore_if_missing: bool = False, **kwargs, ): super().__init__(**kwargs) @@ -991,6 +994,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.deferrable = deferrable self.polling_interval_seconds = polling_interval_seconds + self.ignore_if_missing = ignore_if_missing def execute(self, context: Context) -> None: hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) @@ -998,12 +1002,14 @@ def execute(self, context: Context) -> None: op: operation.Operation = self._delete_cluster(hook) except NotFound: - self.log.info( - "Cluster %s not found in region %s. might have been deleted already.", - self.cluster_name, - self.region, - ) - return + if self.ignore_if_missing: + self.log.info( + "Cluster %s not found in region %s. Ignoring.", + self.cluster_name, + self.region, + ) + return + raise except Exception as e: raise AirflowException(str(e)) diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index bbfc702914b4d..b4cb2a1906145 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -1289,6 +1289,7 @@ def test_execute_cluster_not_found(self, mock_hook): retry=RETRY, timeout=TIMEOUT, metadata=METADATA, + ignore_if_missing=True, ) delete_cluster_op.execute(context=mock.MagicMock()) @@ -1319,6 +1320,7 @@ def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook): timeout=TIMEOUT, metadata=METADATA, deferrable=True, + ignore_if_missing=True, ) delete_cluster_op.execute(context=mock.MagicMock()) @@ -1335,6 +1337,36 @@ def test_execute_cluster_not_found_deffered(self, mock_deffer, mock_hook): assert not mock_deffer.called + @mock.patch(DATAPROC_PATH.format("DataprocHook")) + def test_execute_cluster_not_found_raises_when_ignore_if_missing_false(self, mock_hook): + mock_hook.return_value.delete_cluster.side_effect = NotFound("test") + delete_cluster_op = DataprocDeleteClusterOperator( + task_id="test_task", + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ignore_if_missing=False, + ) + + with pytest.raises(NotFound): + delete_cluster_op.execute(context=mock.MagicMock()) + + mock_hook.return_value.delete_cluster.assert_called_once_with( + project_id=GCP_PROJECT, + region=GCP_REGION, + cluster_name=CLUSTER_NAME, + cluster_uuid=None, + request_id=REQUEST_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + class TestDataprocSubmitJobOperator(DataprocJobTestBase): @mock.patch(DATAPROC_PATH.format("DataprocHook"))