Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations


class AuthManagerRefreshTokenExpiredException(Exception):
"""Exception to throw when the user refresh token is expired."""
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN
from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.core_api.security import resolve_user_from_token
from airflow.configuration import conf
Expand All @@ -40,26 +41,34 @@ class JWTRefreshMiddleware(BaseHTTPMiddleware):
"""

async def dispatch(self, request: Request, call_next):
new_user = None
new_token = None
current_token = request.cookies.get(COOKIE_NAME_JWT_TOKEN)
try:
if current_token:
new_user, current_user = await self._refresh_user(current_token)
if user := (new_user or current_user):
request.state.user = user
if current_token is not None:
try:
new_user, current_user = await self._refresh_user(current_token)
if user := (new_user or current_user):
request.state.user = user
if new_user:
# If we created a new user, serialize it and set it as a cookie
new_token = get_auth_manager().generate_jwt(new_user)
except (HTTPException, AuthManagerRefreshTokenExpiredException):
# Receive a HTTPException when the Airflow token is expired
# Receive a AuthManagerRefreshTokenExpiredException when the potential underlying refresh
# token used by the auth manager is expired
new_token = ""

response = await call_next(request)

if new_user:
# If we created a new user, serialize it and set it as a cookie
new_token = get_auth_manager().generate_jwt(new_user)
if new_token is not None:
secure = bool(conf.get("api", "ssl_cert", fallback=""))
response.set_cookie(
COOKIE_NAME_JWT_TOKEN,
new_token,
httponly=True,
secure=secure,
samesite="lax",
max_age=0 if new_token == "" else None,
)
except HTTPException as exc:
# If any HTTPException is raised during user resolution or refresh, return it as response
Expand All @@ -68,9 +77,5 @@ async def dispatch(self, request: Request, call_next):

@staticmethod
async def _refresh_user(current_token: str) -> tuple[BaseUser | None, BaseUser | None]:
try:
user = await resolve_user_from_token(current_token)
except HTTPException:
return None, None

user = await resolve_user_from_token(current_token)
return get_auth_manager().refresh_user(user=user), user
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ async def test_dispatch_no_token(self, mock_refresh_user, middleware, mock_reque
@pytest.mark.asyncio
async def test_dispatch_invalid_token(self, mock_refresh_user, middleware, mock_request):
mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "valid_token"}
call_next = AsyncMock(return_value=Response())
call_next = AsyncMock(return_value=Response(status_code=401))

response = await middleware.dispatch(mock_request, call_next)
assert response.status_code == 403
assert response.body == b'{"detail":"Invalid JWT token"}'
assert response.status_code == 401
assert '_token=""; HttpOnly; Max-Age=0; Path=/; SameSite=lax' in response.headers.get("set-cookie")

@patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager")
@patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

import requests
from fastapi import FastAPI
from keycloak import KeycloakOpenID, KeycloakPostError
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakPostError
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

Expand Down Expand Up @@ -162,13 +163,14 @@ def refresh_tokens(self, *, user: KeycloakAuthManagerUser) -> dict[str, str]:
client = self.get_keycloak_client()
return client.refresh_token(user.refresh_token)
except KeycloakPostError as exc:
log.warning(
"KeycloakPostError encountered during token refresh. "
"Suppressing the exception and returning None.",
exc_info=exc,
)

return {}
try:
from airflow.api_fastapi.auth.managers.exceptions import (
AuthManagerRefreshTokenExpiredException,
)
except ImportError:
return {}
else:
raise AuthManagerRefreshTokenExpiredException(exc)

def is_authorized_configuration(
self,
Expand Down Expand Up @@ -396,6 +398,9 @@ def _is_batch_authorized(

if resp.status_code == 200:
return {(perm["scopes"][0], perm["rsname"]) for perm in resp.json()}
if resp.status_code == 401:
log.debug("Received 401 from Keycloak: %s", resp.text)
return set()
if resp.status_code == 403:
return set()
if resp.status_code == 400:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@
import logging
from typing import Annotated, cast

from fastapi import Depends, Request
from fastapi import Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from starlette.status import HTTP_401_UNAUTHORIZED

from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN
from airflow.providers.keycloak.version_compat import AIRFLOW_V_3_1_1_PLUS

try:
from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException
except ImportError:

class AuthManagerRefreshTokenExpiredException(Exception): # type: ignore[no-redef]
"""In case it is using a version of Airflow without ``AuthManagerRefreshTokenExpiredException``."""

pass


from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.security import get_user
from airflow.providers.common.compat.sdk import conf
from airflow.providers.keycloak.auth_manager.keycloak_auth_manager import KeycloakAuthManager
from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser
from airflow.providers.keycloak.version_compat import AIRFLOW_V_3_1_1_PLUS

log = logging.getLogger(__name__)
login_router = AirflowRouter(tags=["KeycloakAuthManagerLogin"])
Expand Down Expand Up @@ -134,7 +146,10 @@ def refresh(
) -> RedirectResponse:
"""Refresh the token."""
auth_manager = cast("KeycloakAuthManager", get_auth_manager())
refreshed_user = auth_manager.refresh_user(user=user)
try:
refreshed_user = auth_manager.refresh_user(user=user)
except AuthManagerRefreshTokenExpiredException:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Refresh token Expired")
redirect_url = request.query_params.get("next", conf.get("api", "base_url", fallback="/"))
response = RedirectResponse(url=redirect_url, status_code=303)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser

from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS


class TestLoginRouter:
@patch("airflow.providers.keycloak.auth_manager.routes.login.KeycloakAuthManager.get_keycloak_client")
Expand Down Expand Up @@ -124,3 +126,17 @@ def test_refresh_token(self, mock_get_auth_manager, client):
assert response.cookies["_token"] == "token"
mock_auth_manager.refresh_user.assert_called_once()
mock_auth_manager.generate_jwt.assert_called_once()

@pytest.mark.skipif(
not AIRFLOW_V_3_2_PLUS, reason="``AuthManagerRefreshTokenExpiredException`` has been added in 3.2.0"
)
@patch("airflow.providers.keycloak.auth_manager.routes.login.get_auth_manager")
def test_refresh_token_expired(self, mock_get_auth_manager, client):
from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException

mock_auth_manager = Mock()
mock_auth_manager.refresh_user.side_effect = AuthManagerRefreshTokenExpiredException()
mock_get_auth_manager.return_value = mock_auth_manager

response = client.get(AUTH_MANAGER_FASTAPI_APP_PREFIX + "/refresh", follow_redirects=False)
assert response.status_code == 401
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from airflow.providers.keycloak.auth_manager.user import KeycloakAuthManagerUser

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS


@pytest.fixture
Expand Down Expand Up @@ -160,7 +161,13 @@ def test_refresh_user_expired_with_invalid_token(

mock_get_keycloak_client.return_value = keycloak_client

assert auth_manager.refresh_user(user=user) is None
if AIRFLOW_V_3_2_PLUS:
from airflow.api_fastapi.auth.managers.exceptions import AuthManagerRefreshTokenExpiredException

with pytest.raises(AuthManagerRefreshTokenExpiredException):
auth_manager.refresh_user(user=user)
else:
auth_manager.refresh_user(user=user)

keycloak_client.refresh_token.assert_called_with("refresh_token")

Expand Down Expand Up @@ -449,6 +456,7 @@ def test_is_authorized_custom_view(
],
[200, [{"scopes": ["MENU"], "rsname": "Assets"}], {MenuItem.ASSETS}],
[200, [], set()],
[401, [{"scopes": ["MENU"], "rsname": "Assets"}], set()],
[403, [{"scopes": ["MENU"], "rsname": "Assets"}], set()],
],
)
Expand Down
Loading