Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/authentication/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException

from authentication.interface import AuthInterface
from authentication.interface import NO_AUTH_TUPLE, AuthInterface
from authentication.utils import extract_user_token
from configuration import configuration
from constants import DEFAULT_VIRTUAL_PATH
Expand Down Expand Up @@ -240,6 +240,12 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
Raises:
HTTPException: If authentication or authorization fails.
"""
# LCORE-694: Config option to skip authorization for readiness and liveness probe
if not request.headers.get("Authorization"):
if configuration.authentication_configuration.skip_for_health_probes:
if request.url.path in ("/readiness", "/liveness"):
return NO_AUTH_TUPLE

token = extract_user_token(request.headers)
user_info = get_user_info(token)

Expand Down
275 changes: 275 additions & 0 deletions tests/unit/authentication/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
K8sClientSingleton,
)

from configuration import AppConfig


class MockK8sResponseStatus:
"""Mock Kubernetes Response Status.
Expand Down Expand Up @@ -193,6 +195,279 @@ async def test_auth_dependency_invalid_token(mocker: MockerFixture) -> None:
assert detail["cause"] == "Invalid or expired Kubernetes token"


async def test_auth_dependency_no_token(mocker: MockerFixture) -> None:
"""Test the auth dependency without a token."""
dependency = K8SAuthDependency()

# Mock the Kubernetes API calls
mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api")
mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api")

# Setup mock responses for invalid token
mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse(
authenticated=False
)
mock_authz_api.return_value.create_subject_access_review.return_value = (
MockK8sResponse(allowed=False)
)

# Simulate a request with an invalid token
request = Request(
scope={
"type": "http",
"headers": [],
}
)

# Expect an HTTPException for invalid tokens
with pytest.raises(HTTPException) as exc_info:
await dependency(request)

# Check if the correct status code is returned for unauthorized access
assert exc_info.value.status_code == 401
detail = cast(dict[str, str], exc_info.value.detail)
assert detail["response"] == ("Missing or invalid credentials provided by client")
assert detail["cause"] == "No Authorization header found"


async def test_auth_dependency_no_token_readiness_liveness_endpoints_1(
mocker: MockerFixture,
) -> None:
"""Test the auth dependency without a token for readiness and liveness endpoints.

For this test the skip_for_health_probes configuration parameter is set to
True.
"""
config_dict = {
"name": "test",
"service": {
"host": "localhost",
"port": 8080,
"auth_enabled": False,
"workers": 1,
"color_log": True,
"access_log": True,
},
"llama_stack": {
"api_key": "test-key",
"url": "http://test.com:1234",
"use_as_library_client": False,
},
"authentication": {
"module": "k8s",
"skip_for_health_probes": True,
},
"user_data_collection": {
"feedback_enabled": False,
"feedback_storage": ".",
"transcripts_enabled": False,
"transcripts_storage": ".",
},
}
cfg = AppConfig()
cfg.init_from_dict(config_dict)
# Update configuration for this test
mocker.patch("authentication.k8s.configuration", cfg)

dependency = K8SAuthDependency()

# Mock the Kubernetes API calls
mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api")
mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api")

# Setup mock responses for invalid token
mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse(
authenticated=False
)
mock_authz_api.return_value.create_subject_access_review.return_value = (
MockK8sResponse(allowed=False)
)

paths = ("/readiness", "/liveness")

for path in paths:
# Simulate a request with an invalid token
request = Request(
scope={
"type": "http",
"headers": [],
"path": path,
}
)

user_uid, username, skip_userid_check, token = await dependency(request)

# Check if the correct user info has been returned
assert user_uid == "00000000-0000-0000-0000-000"
assert username == "lightspeed-user"
assert skip_userid_check is True
assert token == ""


async def test_auth_dependency_no_token_readiness_liveness_endpoints_2(
mocker: MockerFixture,
) -> None:
"""Test the auth dependency without a token.

For this test the skip_for_health_probes configuration parameter is set to
False.
"""

config_dict = {
"name": "test",
"service": {
"host": "localhost",
"port": 8080,
"auth_enabled": False,
"workers": 1,
"color_log": True,
"access_log": True,
},
"llama_stack": {
"api_key": "test-key",
"url": "http://test.com:1234",
"use_as_library_client": False,
},
"authentication": {
"module": "k8s",
"skip_for_health_probes": False,
},
"user_data_collection": {
"feedback_enabled": False,
"feedback_storage": ".",
"transcripts_enabled": False,
"transcripts_storage": ".",
},
}
cfg = AppConfig()
cfg.init_from_dict(config_dict)
# Update configuration for this test
mocker.patch("authentication.k8s.configuration", cfg)
dependency = K8SAuthDependency()

# Mock the Kubernetes API calls
mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api")
mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api")

# Setup mock responses for invalid token
mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse(
authenticated=False
)
mock_authz_api.return_value.create_subject_access_review.return_value = (
MockK8sResponse(allowed=False)
)

# Simulate a request with an invalid token
request = Request(
scope={
"type": "http",
"headers": [],
}
)

paths = ("/readiness", "/liveness")

for path in paths:
# Simulate a request with an invalid token
request = Request(
scope={
"type": "http",
"headers": [],
"path": path,
}
)

# Expect an HTTPException for invalid tokens
with pytest.raises(HTTPException) as exc_info:
await dependency(request)

# Check if the correct status code is returned for unauthorized access
assert exc_info.value.status_code == 401
detail = cast(dict[str, str], exc_info.value.detail)
assert detail["response"] == (
"Missing or invalid credentials provided by client"
)
assert detail["cause"] == "No Authorization header found"

Comment on lines +307 to +391
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove dead code and use a more descriptive function name.

Two issues:

  1. Rename to test_auth_dependency_no_token_readiness_liveness_endpoints_skip_disabled for clarity.

  2. Lines 360-366 create a request variable that's immediately overwritten inside the loop (lines 372-378). This dead code should be removed.

🔧 Proposed fix
-async def test_auth_dependency_no_token_readiness_liveness_endpoints_2(
+async def test_auth_dependency_no_token_readiness_liveness_endpoints_skip_disabled(
     mocker: MockerFixture,
 ) -> None:
     """Test the auth dependency without a token.

     For this test the skip_for_health_probes configuration parameter is set to
     False.
     """
     config_dict = {
         # ... config ...
     }
     # ... mock setup ...

-    # Simulate a request with an invalid token
-    request = Request(
-        scope={
-            "type": "http",
-            "headers": [],
-        }
-    )
-
     paths = ("/readiness", "/liveness")

     for path in paths:
🤖 Prompt for AI Agents
In `@tests/unit/authentication/test_k8s.py` around lines 307 - 391, Rename the
test function from test_auth_dependency_no_token_readiness_liveness_endpoints_2
to test_auth_dependency_no_token_readiness_liveness_endpoints_skip_disabled and
remove the dead pre-loop Request creation: delete the initial "request =
Request(scope={ 'type': 'http', 'headers': [], })" that appears before the paths
loop since a new request is constructed for each path inside the loop; keep the
mocked config, K8SAuthDependency instantiation, and looped per-path Request
creation unchanged.


async def test_auth_dependency_no_token_normal_endpoints(
mocker: MockerFixture,
) -> None:
"""Test the auth dependency without a token for endpoints different to readiness and liveness.

For this test the skip_for_health_probes configuration parameter is set to
True.
"""
config_dict = {
"name": "test",
"service": {
"host": "localhost",
"port": 8080,
"auth_enabled": False,
"workers": 1,
"color_log": True,
"access_log": True,
},
"llama_stack": {
"api_key": "test-key",
"url": "http://test.com:1234",
"use_as_library_client": False,
},
"authentication": {
"module": "k8s",
"skip_for_health_probes": True,
},
"user_data_collection": {
"feedback_enabled": False,
"feedback_storage": ".",
"transcripts_enabled": False,
"transcripts_storage": ".",
},
}
cfg = AppConfig()
cfg.init_from_dict(config_dict)
# Update configuration for this test
mocker.patch("authentication.k8s.configuration", cfg)

dependency = K8SAuthDependency()

# Mock the Kubernetes API calls
mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api")
mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api")

# Setup mock responses for invalid token
mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse(
authenticated=False
)
mock_authz_api.return_value.create_subject_access_review.return_value = (
MockK8sResponse(allowed=False)
)

paths = ("/", "/v1/info")

for path in paths:
# Simulate a request with an invalid token
request = Request(
scope={
"type": "http",
"headers": [],
"path": path,
}
)

# Expect an HTTPException for invalid tokens
with pytest.raises(HTTPException) as exc_info:
await dependency(request)

# Check if the correct status code is returned for unauthorized access
assert exc_info.value.status_code == 401
detail = cast(dict[str, str], exc_info.value.detail)
assert detail["response"] == (
"Missing or invalid credentials provided by client"
)
assert detail["cause"] == "No Authorization header found"


async def test_cluster_id_is_used_for_kube_admin(mocker: MockerFixture) -> None:
"""Test the cluster id is used as user_id when user is kube:admin."""
dependency = K8SAuthDependency()
Expand Down
Loading