From eb2632ca80b205b909f97519307de0723671f9e9 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Mon, 3 Nov 2025 14:18:38 -0800 Subject: [PATCH] Add missing failure retry case for Bedrock Bedrock Operators already have retries for the various authorizations but it was missing the 403 error code that we see periodically (in a race condition) --- .../providers/amazon/aws/operators/bedrock.py | 2 + .../unit/amazon/aws/operators/test_bedrock.py | 58 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py index f6e6378cc41bf..d49c379d4bd91 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py @@ -482,6 +482,8 @@ def _create_kb(): # It may also be that permissions haven't even propagated yet to check for the index or "server returned 401" in error_message or "user does not have permissions" in error_message + or "status code: 403" in error_message + or "bad authorization" in error_message ) if all( [ diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py index d1ced1bc57c0b..4cf029f04db87 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py @@ -301,6 +301,64 @@ def test_returns_id(self, mock_conn): def test_template_fields(self): validate_template_fields(self.operator) + def _create_validation_error(self, message: str) -> ClientError: + """Helper to create ValidationException with specific message.""" + return ClientError( + error_response={"Error": {"Message": message, "Code": "ValidationException"}}, + operation_name="CreateKnowledgeBase", + ) + + @pytest.mark.parametrize( + "error_message, should_retry", + [ + ("no such index [bedrock-kb-index]", True), + ("server returned 401", True), + ("user does not have permissions", True), + ("status code: 403", True), + ("Bad Authorization", True), + ("Some other validation error", False), + ], + ) + def test_retry_condition_validation(self, error_message, should_retry, mock_conn): + """Test which error messages trigger retries.""" + self.operator.wait_for_completion = False + + validation_error = self._create_validation_error(error_message) + mock_conn.create_knowledge_base.side_effect = [validation_error] + + if should_retry: + # For retryable errors, provide a success response for the retry + success_response = {"knowledgeBase": {"knowledgeBaseId": self.KNOWLEDGE_BASE_ID}} + mock_conn.create_knowledge_base.side_effect = [validation_error, success_response] + + with mock.patch("airflow.providers.amazon.aws.operators.bedrock.sleep"): + result = self.operator.execute({}) + assert result == self.KNOWLEDGE_BASE_ID + assert mock_conn.create_knowledge_base.call_count == 2 + else: + # For non-retryable errors, the original error should be raised immediately + with pytest.raises(ClientError): + self.operator.execute({}) + assert mock_conn.create_knowledge_base.call_count == 1 + + @mock.patch("airflow.providers.amazon.aws.operators.bedrock.sleep") + def test_retry_exhaustion_raises_original_error(self, mock_sleep, mock_conn): + """Test that original error is raised when retries are exhausted.""" + error_403 = self._create_validation_error( + "Dependency error document status code: 403, error message: Bad Authorization" + ) + + # Default number of waiter attempts is 20 + mock_conn.create_knowledge_base.side_effect = [error_403] * 21 + + with pytest.raises(ClientError) as exc_info: + self.operator.execute({}) + + assert exc_info.value.response["Error"]["Code"] == "ValidationException" + assert "status code: 403" in exc_info.value.response["Error"]["Message"] + assert mock_conn.create_knowledge_base.call_count == 21 + assert mock_sleep.call_count == 20 + class TestBedrockCreateDataSourceOperator: DATA_SOURCE_ID = "data_source_id"