Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ def _create_kb():
# It may also be that permissions haven't even propagated yet to check for the index
or "server returned 401" in error_message
or "user does not have permissions" in error_message
or "status code: 403" in error_message
or "bad authorization" in error_message
)
if all(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,64 @@ def test_returns_id(self, mock_conn):
def test_template_fields(self):
validate_template_fields(self.operator)

def _create_validation_error(self, message: str) -> ClientError:
"""Helper to create ValidationException with specific message."""
return ClientError(
error_response={"Error": {"Message": message, "Code": "ValidationException"}},
operation_name="CreateKnowledgeBase",
)

@pytest.mark.parametrize(
"error_message, should_retry",
[
("no such index [bedrock-kb-index]", True),
("server returned 401", True),
("user does not have permissions", True),
("status code: 403", True),
("Bad Authorization", True),
("Some other validation error", False),
],
)
def test_retry_condition_validation(self, error_message, should_retry, mock_conn):
"""Test which error messages trigger retries."""
self.operator.wait_for_completion = False

validation_error = self._create_validation_error(error_message)
mock_conn.create_knowledge_base.side_effect = [validation_error]

if should_retry:
# For retryable errors, provide a success response for the retry
success_response = {"knowledgeBase": {"knowledgeBaseId": self.KNOWLEDGE_BASE_ID}}
mock_conn.create_knowledge_base.side_effect = [validation_error, success_response]

with mock.patch("airflow.providers.amazon.aws.operators.bedrock.sleep"):
result = self.operator.execute({})
assert result == self.KNOWLEDGE_BASE_ID
assert mock_conn.create_knowledge_base.call_count == 2
else:
# For non-retryable errors, the original error should be raised immediately
with pytest.raises(ClientError):
self.operator.execute({})
assert mock_conn.create_knowledge_base.call_count == 1

@mock.patch("airflow.providers.amazon.aws.operators.bedrock.sleep")
def test_retry_exhaustion_raises_original_error(self, mock_sleep, mock_conn):
"""Test that original error is raised when retries are exhausted."""
error_403 = self._create_validation_error(
"Dependency error document status code: 403, error message: Bad Authorization"
)

# Default number of waiter attempts is 20
mock_conn.create_knowledge_base.side_effect = [error_403] * 21

with pytest.raises(ClientError) as exc_info:
self.operator.execute({})

assert exc_info.value.response["Error"]["Code"] == "ValidationException"
assert "status code: 403" in exc_info.value.response["Error"]["Message"]
assert mock_conn.create_knowledge_base.call_count == 21
assert mock_sleep.call_count == 20


class TestBedrockCreateDataSourceOperator:
DATA_SOURCE_ID = "data_source_id"
Expand Down