Skip to content

Commit

Permalink
Make CloudBatchSubmitJobOperator fail when job fails (apache#44425)
Browse files Browse the repository at this point in the history
  • Loading branch information
SuccessMoses authored Nov 30, 2024
1 parent 693cdd4 commit 4d90f14
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
18 changes: 13 additions & 5 deletions providers/src/airflow/providers/google/cloud/hooks/cloud_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,20 @@ def wait_for_job(
try:
job = client.get_job(name=f"{job_name}")
status: JobStatus.State = job.status.state
if (
status == JobStatus.State.SUCCEEDED
or status == JobStatus.State.FAILED
or status == JobStatus.State.DELETION_IN_PROGRESS
):
if status == JobStatus.State.SUCCEEDED:
return job
elif status == JobStatus.State.FAILED:
message = (
"Unexpected error in the operation: "
"Batch job with name {job_name} has failed its execution."
)
raise AirflowException(message)
elif status == JobStatus.State.DELETION_IN_PROGRESS:
message = (
"Unexpected error in the operation: "
"Batch job with name {job_name} is being deleted."
)
raise AirflowException(message)
else:
time.sleep(polling_period_seconds)
except Exception as e:
Expand Down
16 changes: 13 additions & 3 deletions providers/tests/google/cloud/hooks/test_cloud_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ def test_delete_job(self, mock_batch_service_client, cloud_batch_hook):
name=f"projects/{project_id}/locations/{region}/jobs/{job_name}"
)

@pytest.mark.parametrize(
"state", [JobStatus.State.SUCCEEDED, JobStatus.State.FAILED, JobStatus.State.DELETION_IN_PROGRESS]
)
@pytest.mark.parametrize("state", [JobStatus.State.SUCCEEDED])
@mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
Expand All @@ -100,6 +98,18 @@ def test_wait_job_succeeded(self, mock_batch_service_client, state, cloud_batch_
actual_job = cloud_batch_hook.wait_for_job("job1")
assert actual_job == mock_job

@pytest.mark.parametrize("state", [JobStatus.State.FAILED, JobStatus.State.DELETION_IN_PROGRESS])
@mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
)
@mock.patch("airflow.providers.google.cloud.hooks.cloud_batch.BatchServiceClient")
def test_wait_job_does_not_succeed(self, mock_batch_service_client, state, cloud_batch_hook):
mock_job = self._mock_job_with_status(state)
mock_batch_service_client.return_value.get_job.return_value = mock_job
with pytest.raises(AirflowException):
cloud_batch_hook.wait_for_job("job1")

@mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
Expand Down

0 comments on commit 4d90f14

Please sign in to comment.