From e58ab2f9c8119a1cffa67c69cf427587f5941a2f Mon Sep 17 00:00:00 2001 From: vincbeck Date: Fri, 16 Jan 2026 15:32:55 -0500 Subject: [PATCH] Logout the user when the refresh token is no longer valid --- .../api_fastapi/auth/managers/exceptions.py | 22 +++++++++++++ .../auth/middlewares/refresh_token.py | 31 +++++++++++-------- .../auth/middlewares/test_refresh_token.py | 6 ++-- .../auth_manager/keycloak_auth_manager.py | 21 ++++++++----- .../keycloak/auth_manager/routes/login.py | 21 +++++++++++-- .../auth_manager/routes/test_login.py | 16 ++++++++++ .../test_keycloak_auth_manager.py | 10 +++++- 7 files changed, 99 insertions(+), 28 deletions(-) create mode 100644 airflow-core/src/airflow/api_fastapi/auth/managers/exceptions.py diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/exceptions.py b/airflow-core/src/airflow/api_fastapi/auth/managers/exceptions.py new file mode 100644 index 0000000000000..711b2a6334db0 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/exceptions.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + + +class AuthManagerRefreshTokenExpiredException(Exception): + """Exception to throw when the user refresh token is expired.""" diff --git a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py index a8386f4013844..a64da351d25b5 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py +++ b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py @@ -23,6 +23,7 @@ from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN +from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException from airflow.api_fastapi.auth.managers.models.base_user import BaseUser from airflow.api_fastapi.core_api.security import resolve_user_from_token from airflow.configuration import conf @@ -40,19 +41,26 @@ class JWTRefreshMiddleware(BaseHTTPMiddleware): """ async def dispatch(self, request: Request, call_next): - new_user = None + new_token = None current_token = request.cookies.get(COOKIE_NAME_JWT_TOKEN) try: - if current_token: - new_user, current_user = await self._refresh_user(current_token) - if user := (new_user or current_user): - request.state.user = user + if current_token is not None: + try: + new_user, current_user = await self._refresh_user(current_token) + if user := (new_user or current_user): + request.state.user = user + if new_user: + # If we created a new user, serialize it and set it as a cookie + new_token = get_auth_manager().generate_jwt(new_user) + except (HTTPException, AuthManagerRefreshTokenExpiredException): + # Receive a HTTPException when the Airflow token is expired + # Receive a AuthManagerRefreshTokenExpiredException when the potential underlying refresh + # token used by the auth manager is expired + new_token = "" response = await call_next(request) - if new_user: - # If we created a new user, serialize it and set it as a cookie - new_token = get_auth_manager().generate_jwt(new_user) + if new_token is not None: secure = bool(conf.get("api", "ssl_cert", fallback="")) response.set_cookie( COOKIE_NAME_JWT_TOKEN, @@ -60,6 +68,7 @@ async def dispatch(self, request: Request, call_next): httponly=True, secure=secure, samesite="lax", + max_age=0 if new_token == "" else None, ) except HTTPException as exc: # If any HTTPException is raised during user resolution or refresh, return it as response @@ -68,9 +77,5 @@ async def dispatch(self, request: Request, call_next): @staticmethod async def _refresh_user(current_token: str) -> tuple[BaseUser | None, BaseUser | None]: - try: - user = await resolve_user_from_token(current_token) - except HTTPException: - return None, None - + user = await resolve_user_from_token(current_token) return get_auth_manager().refresh_user(user=user), user diff --git a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py index 834f864e40977..b8f0d7c7726b9 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py +++ b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py @@ -61,11 +61,11 @@ async def test_dispatch_no_token(self, mock_refresh_user, middleware, mock_reque @pytest.mark.asyncio async def test_dispatch_invalid_token(self, mock_refresh_user, middleware, mock_request): mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "valid_token"} - call_next = AsyncMock(return_value=Response()) + call_next = AsyncMock(return_value=Response(status_code=401)) response = await middleware.dispatch(mock_request, call_next) - assert response.status_code == 403 - assert response.body == b'{"detail":"Invalid JWT token"}' + assert response.status_code == 401 + assert '_token=""; HttpOnly; Max-Age=0; Path=/; SameSite=lax' in response.headers.get("set-cookie") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 168f570039fd9..955b431742f5e 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -25,7 +25,8 @@ import requests from fastapi import FastAPI -from keycloak import KeycloakOpenID, KeycloakPostError +from keycloak import KeycloakOpenID +from keycloak.exceptions import KeycloakPostError from requests.adapters import HTTPAdapter from urllib3.util import Retry @@ -162,13 +163,14 @@ def refresh_tokens(self, *, user: KeycloakAuthManagerUser) -> dict[str, str]: client = self.get_keycloak_client() return client.refresh_token(user.refresh_token) except KeycloakPostError as exc: - log.warning( - "KeycloakPostError encountered during token refresh. " - "Suppressing the exception and returning None.", - exc_info=exc, - ) - - return {} + try: + from airflow.api_fastapi.auth.managers.exceptions import ( + AuthManagerRefreshTokenExpiredException, + ) + except ImportError: + return {} + else: + raise AuthManagerRefreshTokenExpiredException(exc) def is_authorized_configuration( self, @@ -396,6 +398,9 @@ def _is_batch_authorized( if resp.status_code == 200: return {(perm["scopes"][0], perm["rsname"]) for perm in resp.json()} + if resp.status_code == 401: + log.debug("Received 401 from Keycloak: %s", resp.text) + return set() if resp.status_code == 403: return set() if resp.status_code == 400: diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/routes/login.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/routes/login.py index a957d6da646f8..8d38ff6c6c469 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/routes/login.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/routes/login.py @@ -20,17 +20,29 @@ import logging from typing import Annotated, cast -from fastapi import Depends, Request +from fastapi import Depends, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse +from starlette.status import HTTP_401_UNAUTHORIZED from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN +from airflow.providers.keycloak.version_compat import AIRFLOW_V_3_1_1_PLUS + +try: + from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException +except ImportError: + + class AuthManagerRefreshTokenExpiredException(Exception): # type: ignore[no-redef] + """In case it is using a version of Airflow without ``AuthManagerRefreshTokenExpiredException``.""" + + pass + + from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.security import get_user from airflow.providers.common.compat.sdk import conf from airflow.providers.keycloak.auth_manager.keycloak_auth_manager import KeycloakAuthManager from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser -from airflow.providers.keycloak.version_compat import AIRFLOW_V_3_1_1_PLUS log = logging.getLogger(__name__) login_router = AirflowRouter(tags=["KeycloakAuthManagerLogin"]) @@ -134,7 +146,10 @@ def refresh( ) -> RedirectResponse: """Refresh the token.""" auth_manager = cast("KeycloakAuthManager", get_auth_manager()) - refreshed_user = auth_manager.refresh_user(user=user) + try: + refreshed_user = auth_manager.refresh_user(user=user) + except AuthManagerRefreshTokenExpiredException: + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Refresh token Expired") redirect_url = request.query_params.get("next", conf.get("api", "base_url", fallback="/")) response = RedirectResponse(url=redirect_url, status_code=303) diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/routes/test_login.py b/providers/keycloak/tests/unit/keycloak/auth_manager/routes/test_login.py index c63f60a1128b8..f45c6a9e03a71 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/routes/test_login.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/routes/test_login.py @@ -23,6 +23,8 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser +from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS + class TestLoginRouter: @patch("airflow.providers.keycloak.auth_manager.routes.login.KeycloakAuthManager.get_keycloak_client") @@ -124,3 +126,17 @@ def test_refresh_token(self, mock_get_auth_manager, client): assert response.cookies["_token"] == "token" mock_auth_manager.refresh_user.assert_called_once() mock_auth_manager.generate_jwt.assert_called_once() + + @pytest.mark.skipif( + not AIRFLOW_V_3_2_PLUS, reason="``AuthManagerRefreshTokenExpiredException`` has been added in 3.2.0" + ) + @patch("airflow.providers.keycloak.auth_manager.routes.login.get_auth_manager") + def test_refresh_token_expired(self, mock_get_auth_manager, client): + from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException + + mock_auth_manager = Mock() + mock_auth_manager.refresh_user.side_effect = AuthManagerRefreshTokenExpiredException() + mock_get_auth_manager.return_value = mock_auth_manager + + response = client.get(AUTH_MANAGER_FASTAPI_APP_PREFIX + "/refresh", follow_redirects=False) + assert response.status_code == 401 diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index 8e7fb92433375..9b6746f27e226 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -51,6 +51,7 @@ from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS @pytest.fixture @@ -160,7 +161,13 @@ def test_refresh_user_expired_with_invalid_token( mock_get_keycloak_client.return_value = keycloak_client - assert auth_manager.refresh_user(user=user) is None + if AIRFLOW_V_3_2_PLUS: + from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException + + with pytest.raises(AuthManagerRefreshTokenExpiredException): + auth_manager.refresh_user(user=user) + else: + auth_manager.refresh_user(user=user) keycloak_client.refresh_token.assert_called_with("refresh_token") @@ -449,6 +456,7 @@ def test_is_authorized_custom_view( ], [200, [{"scopes": ["MENU"], "rsname": "Assets"}], {MenuItem.ASSETS}], [200, [], set()], + [401, [{"scopes": ["MENU"], "rsname": "Assets"}], set()], [403, [{"scopes": ["MENU"], "rsname": "Assets"}], set()], ], )