Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions airflow/api_fastapi/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@

import logging
from abc import ABCMeta, abstractmethod
from functools import cache
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from jwt import InvalidTokenError
from sqlalchemy import select

from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails, DagDetails
from airflow.api_fastapi.auth.tokens import (
JWTGenerator,
JWTValidator,
get_sig_validation_args,
get_signing_args,
)
from airflow.api_fastapi.common.types import ExtraMenuItem, MenuItem
from airflow.configuration import conf
from airflow.models import DagModel
from airflow.typing_compat import Literal
from airflow.utils.jwt_signer import JWTSigner, get_signing_key
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down Expand Up @@ -86,24 +92,24 @@ def deserialize_user(self, token: dict[str, Any]) -> T:

@abstractmethod
def serialize_user(self, user: T) -> dict[str, Any]:
"""Create a dict from a user object."""
"""Create a subject and extra claims dict from a user object."""

def get_user_from_token(self, token: str) -> BaseUser:
async def get_user_from_token(self, token: str) -> BaseUser:
"""Verify the JWT token is valid and create a user object from it if valid."""
try:
payload: dict[str, Any] = self._get_token_signer().verify_token(token)
payload: dict[str, Any] = await self._get_token_validator().avalidated_claims(token)
return self.deserialize_user(payload)
except InvalidTokenError as e:
log.error("JWT token is not valid")
log.error("JWT token is not valid: %s", e)
raise e

def get_jwt_token(
self, user: T, *, expiration_time_in_seconds: int = conf.getint("api", "auth_jwt_expiration_time")
def generate_jwt(
self, user: T, *, expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time")
) -> str:
"""Return the JWT token from a user object."""
return self._get_token_signer(
expiration_time_in_seconds=expiration_time_in_seconds
).generate_signed_token(self.serialize_user(user))
return self._get_token_signer(expiration_time_in_seconds=expiration_time_in_seconds).generate(
self.serialize_user(user)
)

@abstractmethod
def get_url_login(self, **kwargs) -> str:
Expand Down Expand Up @@ -450,19 +456,35 @@ def get_extra_menu_items(self, *, user: T) -> list[ExtraMenuItem]:
"""
return []

@staticmethod
@classmethod
@cache
def _get_token_signer(
expiration_time_in_seconds: int = conf.getint("api", "auth_jwt_expiration_time"),
) -> JWTSigner:
cls,
expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time"),
) -> JWTGenerator:
"""
Return the signer used to sign JWT token.

:meta private:

:param expiration_time_in_seconds: expiration time in seconds of the token
"""
return JWTSigner(
secret_key=get_signing_key("api", "auth_jwt_secret"),
expiration_time_in_seconds=expiration_time_in_seconds,
audience="front-apis",
return JWTGenerator(
**get_signing_args(),
valid_for=expiration_time_in_seconds,
audience=conf.get("api", "jwt_audience", fallback="apache-airflow"),
)

@classmethod
@cache
def _get_token_validator(cls) -> JWTValidator:
"""
Return the signer used to sign JWT token.

:meta private:
"""
return JWTValidator(
**get_sig_validation_args(),
leeway=conf.getint("api_auth", "jwt_leeway"),
audience=conf.get("api_auth", "jwt_audience", fallback="apache-airflow"),
)
4 changes: 2 additions & 2 deletions airflow/api_fastapi/auth/managers/simple/routes/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def create_token_all_admins() -> RedirectResponse:
username="Anonymous",
role="ADMIN",
)
url = urljoin(conf.get("api", "base_url"), f"?token={get_auth_manager().get_jwt_token(user)}")
url = urljoin(conf.get("api", "base_url"), f"?token={get_auth_manager().generate_jwt(user)}")
return RedirectResponse(url=url)


Expand All @@ -76,5 +76,5 @@ def create_token_cli(
) -> LoginResponse:
"""Authenticate the user for the CLI."""
return SimpleAuthManagerLogin.create_token(
body=body, expiration_time_in_sec=conf.getint("api", "auth_jwt_cli_expiration_time")
body=body, expiration_time_in_sec=conf.getint("api_auth", "jwt_cli_expiration_time")
)
4 changes: 2 additions & 2 deletions airflow/api_fastapi/auth/managers/simple/services/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SimpleAuthManagerLogin:

@classmethod
def create_token(
cls, body: LoginBody, expiration_time_in_sec: int = conf.getint("api", "auth_jwt_expiration_time")
cls, body: LoginBody, expiration_time_in_sec: int = conf.getint("api_auth", "jwt_expiration_time")
) -> LoginResponse:
"""
Authenticate user with given configuration.
Expand Down Expand Up @@ -67,7 +67,7 @@ def create_token(
)

return LoginResponse(
jwt_token=get_auth_manager().get_jwt_token(
jwt_token=get_auth_manager().generate_jwt(
user=user, expiration_time_in_seconds=expiration_time_in_sec
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ def get_url_login(self, **kwargs) -> str:
return AUTH_MANAGER_FASTAPI_APP_PREFIX + "/login"

def deserialize_user(self, token: dict[str, Any]) -> SimpleAuthManagerUser:
return SimpleAuthManagerUser(username=token["username"], role=token["role"])
return SimpleAuthManagerUser(username=token["sub"], role=token["role"])

def serialize_user(self, user: SimpleAuthManagerUser) -> dict[str, Any]:
return {"username": user.username, "role": user.role}
return {"sub": user.username, "role": user.role}

def is_authorized_configuration(
self,
Expand Down
Loading