diff --git a/src/authentication/k8s.py b/src/authentication/k8s.py index 4cc4c18c0..31be7f66d 100644 --- a/src/authentication/k8s.py +++ b/src/authentication/k8s.py @@ -10,7 +10,7 @@ from kubernetes.client.rest import ApiException from kubernetes.config import ConfigException -from authentication.interface import AuthInterface +from authentication.interface import NO_AUTH_TUPLE, AuthInterface from authentication.utils import extract_user_token from configuration import configuration from constants import DEFAULT_VIRTUAL_PATH @@ -240,6 +240,12 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]: Raises: HTTPException: If authentication or authorization fails. """ + # LCORE-694: Config option to skip authorization for readiness and liveness probe + if not request.headers.get("Authorization"): + if configuration.authentication_configuration.skip_for_health_probes: + if request.url.path in ("/readiness", "/liveness"): + return NO_AUTH_TUPLE + token = extract_user_token(request.headers) user_info = get_user_info(token) diff --git a/tests/unit/authentication/test_k8s.py b/tests/unit/authentication/test_k8s.py index 6abcad5a2..a200aaad0 100644 --- a/tests/unit/authentication/test_k8s.py +++ b/tests/unit/authentication/test_k8s.py @@ -18,6 +18,8 @@ K8sClientSingleton, ) +from configuration import AppConfig + class MockK8sResponseStatus: """Mock Kubernetes Response Status. @@ -193,6 +195,279 @@ async def test_auth_dependency_invalid_token(mocker: MockerFixture) -> None: assert detail["cause"] == "Invalid or expired Kubernetes token" +async def test_auth_dependency_no_token(mocker: MockerFixture) -> None: + """Test the auth dependency without a token.""" + dependency = K8SAuthDependency() + + # Mock the Kubernetes API calls + mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api") + mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api") + + # Setup mock responses for invalid token + mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse( + authenticated=False + ) + mock_authz_api.return_value.create_subject_access_review.return_value = ( + MockK8sResponse(allowed=False) + ) + + # Simulate a request with an invalid token + request = Request( + scope={ + "type": "http", + "headers": [], + } + ) + + # Expect an HTTPException for invalid tokens + with pytest.raises(HTTPException) as exc_info: + await dependency(request) + + # Check if the correct status code is returned for unauthorized access + assert exc_info.value.status_code == 401 + detail = cast(dict[str, str], exc_info.value.detail) + assert detail["response"] == ("Missing or invalid credentials provided by client") + assert detail["cause"] == "No Authorization header found" + + +async def test_auth_dependency_no_token_readiness_liveness_endpoints_1( + mocker: MockerFixture, +) -> None: + """Test the auth dependency without a token for readiness and liveness endpoints. + + For this test the skip_for_health_probes configuration parameter is set to + True. + """ + config_dict = { + "name": "test", + "service": { + "host": "localhost", + "port": 8080, + "auth_enabled": False, + "workers": 1, + "color_log": True, + "access_log": True, + }, + "llama_stack": { + "api_key": "test-key", + "url": "http://test.com:1234", + "use_as_library_client": False, + }, + "authentication": { + "module": "k8s", + "skip_for_health_probes": True, + }, + "user_data_collection": { + "feedback_enabled": False, + "feedback_storage": ".", + "transcripts_enabled": False, + "transcripts_storage": ".", + }, + } + cfg = AppConfig() + cfg.init_from_dict(config_dict) + # Update configuration for this test + mocker.patch("authentication.k8s.configuration", cfg) + + dependency = K8SAuthDependency() + + # Mock the Kubernetes API calls + mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api") + mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api") + + # Setup mock responses for invalid token + mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse( + authenticated=False + ) + mock_authz_api.return_value.create_subject_access_review.return_value = ( + MockK8sResponse(allowed=False) + ) + + paths = ("/readiness", "/liveness") + + for path in paths: + # Simulate a request with an invalid token + request = Request( + scope={ + "type": "http", + "headers": [], + "path": path, + } + ) + + user_uid, username, skip_userid_check, token = await dependency(request) + + # Check if the correct user info has been returned + assert user_uid == "00000000-0000-0000-0000-000" + assert username == "lightspeed-user" + assert skip_userid_check is True + assert token == "" + + +async def test_auth_dependency_no_token_readiness_liveness_endpoints_2( + mocker: MockerFixture, +) -> None: + """Test the auth dependency without a token. + + For this test the skip_for_health_probes configuration parameter is set to + False. + """ + + config_dict = { + "name": "test", + "service": { + "host": "localhost", + "port": 8080, + "auth_enabled": False, + "workers": 1, + "color_log": True, + "access_log": True, + }, + "llama_stack": { + "api_key": "test-key", + "url": "http://test.com:1234", + "use_as_library_client": False, + }, + "authentication": { + "module": "k8s", + "skip_for_health_probes": False, + }, + "user_data_collection": { + "feedback_enabled": False, + "feedback_storage": ".", + "transcripts_enabled": False, + "transcripts_storage": ".", + }, + } + cfg = AppConfig() + cfg.init_from_dict(config_dict) + # Update configuration for this test + mocker.patch("authentication.k8s.configuration", cfg) + dependency = K8SAuthDependency() + + # Mock the Kubernetes API calls + mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api") + mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api") + + # Setup mock responses for invalid token + mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse( + authenticated=False + ) + mock_authz_api.return_value.create_subject_access_review.return_value = ( + MockK8sResponse(allowed=False) + ) + + # Simulate a request with an invalid token + request = Request( + scope={ + "type": "http", + "headers": [], + } + ) + + paths = ("/readiness", "/liveness") + + for path in paths: + # Simulate a request with an invalid token + request = Request( + scope={ + "type": "http", + "headers": [], + "path": path, + } + ) + + # Expect an HTTPException for invalid tokens + with pytest.raises(HTTPException) as exc_info: + await dependency(request) + + # Check if the correct status code is returned for unauthorized access + assert exc_info.value.status_code == 401 + detail = cast(dict[str, str], exc_info.value.detail) + assert detail["response"] == ( + "Missing or invalid credentials provided by client" + ) + assert detail["cause"] == "No Authorization header found" + + +async def test_auth_dependency_no_token_normal_endpoints( + mocker: MockerFixture, +) -> None: + """Test the auth dependency without a token for endpoints different to readiness and liveness. + + For this test the skip_for_health_probes configuration parameter is set to + True. + """ + config_dict = { + "name": "test", + "service": { + "host": "localhost", + "port": 8080, + "auth_enabled": False, + "workers": 1, + "color_log": True, + "access_log": True, + }, + "llama_stack": { + "api_key": "test-key", + "url": "http://test.com:1234", + "use_as_library_client": False, + }, + "authentication": { + "module": "k8s", + "skip_for_health_probes": True, + }, + "user_data_collection": { + "feedback_enabled": False, + "feedback_storage": ".", + "transcripts_enabled": False, + "transcripts_storage": ".", + }, + } + cfg = AppConfig() + cfg.init_from_dict(config_dict) + # Update configuration for this test + mocker.patch("authentication.k8s.configuration", cfg) + + dependency = K8SAuthDependency() + + # Mock the Kubernetes API calls + mock_authn_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authn_api") + mock_authz_api = mocker.patch("authentication.k8s.K8sClientSingleton.get_authz_api") + + # Setup mock responses for invalid token + mock_authn_api.return_value.create_token_review.return_value = MockK8sResponse( + authenticated=False + ) + mock_authz_api.return_value.create_subject_access_review.return_value = ( + MockK8sResponse(allowed=False) + ) + + paths = ("/", "/v1/info") + + for path in paths: + # Simulate a request with an invalid token + request = Request( + scope={ + "type": "http", + "headers": [], + "path": path, + } + ) + + # Expect an HTTPException for invalid tokens + with pytest.raises(HTTPException) as exc_info: + await dependency(request) + + # Check if the correct status code is returned for unauthorized access + assert exc_info.value.status_code == 401 + detail = cast(dict[str, str], exc_info.value.detail) + assert detail["response"] == ( + "Missing or invalid credentials provided by client" + ) + assert detail["cause"] == "No Authorization header found" + + async def test_cluster_id_is_used_for_kube_admin(mocker: MockerFixture) -> None: """Test the cluster id is used as user_id when user is kube:admin.""" dependency = K8SAuthDependency()