From 4d90f145c848fbb49be6954768da48783caa5c5c Mon Sep 17 00:00:00 2001 From: Success Moses Date: Sat, 30 Nov 2024 16:51:54 +0100 Subject: [PATCH] Make `CloudBatchSubmitJobOperator` fail when job fails (#44425) --- .../google/cloud/hooks/cloud_batch.py | 18 +++++++++++++----- .../google/cloud/hooks/test_cloud_batch.py | 16 +++++++++++++--- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/hooks/cloud_batch.py b/providers/src/airflow/providers/google/cloud/hooks/cloud_batch.py index 3ef24014b6f14..c647ac078abb2 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/cloud_batch.py +++ b/providers/src/airflow/providers/google/cloud/hooks/cloud_batch.py @@ -145,12 +145,20 @@ def wait_for_job( try: job = client.get_job(name=f"{job_name}") status: JobStatus.State = job.status.state - if ( - status == JobStatus.State.SUCCEEDED - or status == JobStatus.State.FAILED - or status == JobStatus.State.DELETION_IN_PROGRESS - ): + if status == JobStatus.State.SUCCEEDED: return job + elif status == JobStatus.State.FAILED: + message = ( + "Unexpected error in the operation: " + "Batch job with name {job_name} has failed its execution." + ) + raise AirflowException(message) + elif status == JobStatus.State.DELETION_IN_PROGRESS: + message = ( + "Unexpected error in the operation: " + "Batch job with name {job_name} is being deleted." + ) + raise AirflowException(message) else: time.sleep(polling_period_seconds) except Exception as e: diff --git a/providers/tests/google/cloud/hooks/test_cloud_batch.py b/providers/tests/google/cloud/hooks/test_cloud_batch.py index 05cc26b6b479b..aaa496215fccf 100644 --- a/providers/tests/google/cloud/hooks/test_cloud_batch.py +++ b/providers/tests/google/cloud/hooks/test_cloud_batch.py @@ -86,9 +86,7 @@ def test_delete_job(self, mock_batch_service_client, cloud_batch_hook): name=f"projects/{project_id}/locations/{region}/jobs/{job_name}" ) - @pytest.mark.parametrize( - "state", [JobStatus.State.SUCCEEDED, JobStatus.State.FAILED, JobStatus.State.DELETION_IN_PROGRESS] - ) + @pytest.mark.parametrize("state", [JobStatus.State.SUCCEEDED]) @mock.patch( "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", new=mock_base_gcp_hook_default_project_id, @@ -100,6 +98,18 @@ def test_wait_job_succeeded(self, mock_batch_service_client, state, cloud_batch_ actual_job = cloud_batch_hook.wait_for_job("job1") assert actual_job == mock_job + @pytest.mark.parametrize("state", [JobStatus.State.FAILED, JobStatus.State.DELETION_IN_PROGRESS]) + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ) + @mock.patch("airflow.providers.google.cloud.hooks.cloud_batch.BatchServiceClient") + def test_wait_job_does_not_succeed(self, mock_batch_service_client, state, cloud_batch_hook): + mock_job = self._mock_job_with_status(state) + mock_batch_service_client.return_value.get_job.return_value = mock_job + with pytest.raises(AirflowException): + cloud_batch_hook.wait_for_job("job1") + @mock.patch( "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", new=mock_base_gcp_hook_default_project_id,