Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,31 @@ def sync(self) -> None:
)
self.fail(task[0], e)
except ApiException as e:
body = json.loads(e.body)
try:
if e.body:
body = json.loads(e.body)
else:
# If no body content, use reason as the message
body = {"message": e.reason}
except (json.JSONDecodeError, ValueError, TypeError):
# If the body is a string (e.g., in a 429 error), it can't be parsed as JSON.
# Use the body directly as the message instead.
body = {"message": e.body}

retries = self.task_publish_retries[key]
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
message = body.get("message", "")
if (
(str(e.status) == "403" and "exceeded quota" in body["message"])
or (str(e.status) == "409" and "object has been modified" in body["message"])
(str(e.status) == "403" and "exceeded quota" in message)
or (str(e.status) == "409" and "object has been modified" in message)
) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
self.task_publish_retries[key] + 1,
self.task_publish_max_retries,
key,
e.reason,
body["message"],
message,
)
self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,27 @@ def execute(self, context):
try:
self.hook.apply_from_yaml_file(yaml_objects=yaml_objects)
except FailToCreateError as ex:
error_bodies = [json.loads(e.body) for e in ex.api_exceptions]
error_bodies = []
for e in ex.api_exceptions:
try:
if e.body:
error_bodies.append(json.loads(e.body))
else:
# If no body content, use reason as the message
reason = getattr(e, "reason", "Unknown")
error_bodies.append({"message": reason, "reason": reason})
except (json.JSONDecodeError, ValueError, TypeError):
# If the body is a string (e.g., in a 429 error), it can't be parsed as JSON.
# Use the body directly as the message instead.
error_bodies.append({"message": e.body, "reason": getattr(e, "reason", "Unknown")})
if next((e for e in error_bodies if e.get("reason") == "AlreadyExists"), None):
self.log.info("Kueue is already enabled for the cluster")

if errors := [e for e in error_bodies if e.get("reason") != "AlreadyExists"]:
error_message = "\n".join(e.get("body") for e in errors)
error_message = "\n".join(
e.get("message") or e.get("body") or f"Unknown error: {e.get('reason', 'Unknown')}"
for e in errors
)
raise AirflowException(error_message)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,27 @@ def setup_method(self) -> None:
State.SUCCESS,
id="409 conflict",
),
pytest.param(
HTTPResponse(body="Too many requests, please try again later.", status=429),
0,
False,
State.FAILED,
id="429 Too Many Requests (non-JSON body)",
),
pytest.param(
HTTPResponse(body="Too many requests, please try again later.", status=429),
1,
False,
State.FAILED,
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(body="", status=429),
0,
False,
State.FAILED,
id="429 Too Many Requests (empty body)",
),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,29 @@ def test_execute_error(self, mock_hook, mock_log):
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()
mock_log.info.assert_called_once_with("Kueue is already enabled for the cluster")

@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesInstallKueueOperator.log"))
@mock.patch(KUEUE_OPERATORS_PATH.format("KubernetesHook"))
def test_execute_non_json_response(self, mock_hook, mock_log):
"""Test handling of non-JSON API response bodies (e.g., 429 errors)."""
mock_get_yaml_content_from_file = mock_hook.return_value.get_yaml_content_from_file
mock_yaml_objects = mock_get_yaml_content_from_file.return_value
mock_apply_from_yaml_file = mock_hook.return_value.apply_from_yaml_file

# Create mock exceptions with non-JSON bodies (simulating 429 errors)
api_exceptions = [
mock.MagicMock(body="Too many requests, please try again later.", reason="TooManyRequests"),
mock.MagicMock(body="", reason="RateLimited"), # Empty body case
]
mock_apply_from_yaml_file.side_effect = FailToCreateError(api_exceptions)
expected_error_message = "Too many requests, please try again later.\nRateLimited"

with pytest.raises(AirflowException, match=expected_error_message):
self.operator.execute(context=mock.MagicMock())

mock_get_yaml_content_from_file.assert_called_once_with(kueue_yaml_url=KUEUE_YAML_URL)
mock_apply_from_yaml_file.assert_called_once_with(yaml_objects=mock_yaml_objects)
mock_hook.return_value.check_kueue_deployment_running.assert_not_called()


class TestKubernetesStartKueueJobOperator:
def test_template_fields(self):
Expand Down