diff --git a/src/authentication/rh_identity.py b/src/authentication/rh_identity.py index 9a315b14a..fe54ceda3 100644 --- a/src/authentication/rh_identity.py +++ b/src/authentication/rh_identity.py @@ -11,7 +11,8 @@ from fastapi import HTTPException, Request -from authentication.interface import AuthInterface, AuthTuple +from authentication.interface import NO_AUTH_TUPLE, AuthInterface, AuthTuple +from configuration import configuration from constants import DEFAULT_VIRTUAL_PATH, NO_USER_TOKEN logger = logging.getLogger(__name__) @@ -215,6 +216,10 @@ async def __call__(self, request: Request) -> AuthTuple: # Extract header identity_header = request.headers.get("x-rh-identity") if not identity_header: + # Skip auth for health probes when configured + if request.url.path in ("/readiness", "/liveness"): + if configuration.authentication_configuration.skip_for_health_probes: + return NO_AUTH_TUPLE logger.warning("Missing x-rh-identity header") raise HTTPException(status_code=401, detail="Missing x-rh-identity header") diff --git a/tests/unit/authentication/test_rh_identity.py b/tests/unit/authentication/test_rh_identity.py index fd3c76a17..6ac517b9b 100644 --- a/tests/unit/authentication/test_rh_identity.py +++ b/tests/unit/authentication/test_rh_identity.py @@ -9,7 +9,9 @@ import pytest from fastapi import HTTPException, Request +from pytest_mock import MockerFixture +from authentication.interface import NO_AUTH_TUPLE from authentication.rh_identity import RHIdentityAuthDependency, RHIdentityData from constants import NO_USER_TOKEN @@ -448,3 +450,62 @@ async def test_no_entitlement_data(self, user_identity_data: dict) -> None: user_id, username, _, _ = await auth_dep(request) assert user_id == "abc123" assert username == "user@redhat.com" + + +class TestRHIdentityHealthProbeSkip: + """Test suite for health probe skip functionality in RH Identity auth.""" + + @staticmethod + def _mock_configuration( + mocker: MockerFixture, skip_for_health_probes: bool + ) -> None: + """Patch the configuration singleton with a mock for probe skip tests.""" + mock_config = mocker.MagicMock() + mock_config.authentication_configuration.skip_for_health_probes = ( + skip_for_health_probes + ) + mocker.patch("authentication.rh_identity.configuration", mock_config) + + @pytest.mark.asyncio + @pytest.mark.parametrize("path", ["/readiness", "/liveness"]) + async def test_probe_paths_skip_auth_when_enabled( + self, mocker: MockerFixture, path: str + ) -> None: + """Test health probe paths bypass auth when skip_for_health_probes is True.""" + self._mock_configuration(mocker, skip_for_health_probes=True) + + auth_dep = RHIdentityAuthDependency() + request = Request(scope={"type": "http", "headers": [], "path": path}) + + result = await auth_dep(request) + assert result == NO_AUTH_TUPLE + + @pytest.mark.asyncio + @pytest.mark.parametrize("path", ["/readiness", "/liveness"]) + async def test_probe_paths_require_auth_when_disabled( + self, mocker: MockerFixture, path: str + ) -> None: + """Test health probe paths still require auth when skip_for_health_probes is False.""" + self._mock_configuration(mocker, skip_for_health_probes=False) + + auth_dep = RHIdentityAuthDependency() + request = Request(scope={"type": "http", "headers": [], "path": path}) + + with pytest.raises(HTTPException) as exc_info: + await auth_dep(request) + assert exc_info.value.status_code == 401 + + @pytest.mark.asyncio + @pytest.mark.parametrize("path", ["/", "/v1/query"]) + async def test_non_probe_paths_require_auth_when_skip_enabled( + self, mocker: MockerFixture, path: str + ) -> None: + """Test non-probe paths still require auth even when skip_for_health_probes is True.""" + self._mock_configuration(mocker, skip_for_health_probes=True) + + auth_dep = RHIdentityAuthDependency() + request = Request(scope={"type": "http", "headers": [], "path": path}) + + with pytest.raises(HTTPException) as exc_info: + await auth_dep(request) + assert exc_info.value.status_code == 401