Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions providers/fab/docs/auth-manager/api-authentication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ command as in the example below.
.. versionchanged:: 3.0.0

In Airflow, the default setting is using token based authentication.
This approach is independent from which ``auth_backend`` is used.
The default setting is using Airflow public API to create a token (JWT) first and use this token in the requests to access the API.
Airflow now uses token-based authentication for the public API.
This mechanism is independent of the configured ``auth_backend``.
Clients must first obtain a JWT token using :doc:`token`, then include that token in subsequent API requests.

Kerberos authentication
'''''''''''''''''''''''
Expand Down
118 changes: 112 additions & 6 deletions providers/fab/docs/auth-manager/token.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ Generate JWT token with FAB auth manager
.. note::
This guide only applies if your environment is configured with FAB auth manager.

In order to use the :doc:`Airflow public API <apache-airflow:stable-rest-api-ref>`, you need a JWT token for authentication.
You can then include this token in your Airflow public API requests.
To generate a JWT token, use the ``Create Token`` API in :doc:`/api-ref/fab-token-api-ref`.
To use the :doc:`Airflow public API <apache-airflow:stable-rest-api-ref>`, you first need to obtain a JWT Token for
authentication.
Once you have the token, include it in the ``Authorization`` header when making requests to the public API.

You can generate a JWT token using the ``Create Token`` API endpoint,
documented in :doc:`/api-ref/fab-token-api-ref`.

Example
'''''''

Use the following example to generate a token via username and password.

.. code-block:: bash
ENDPOINT_URL="http://localhost:8080"
Expand All @@ -39,7 +44,108 @@ Example
"password": "<password>"
}'
This process will return a token that you can use in the Airflow public API requests.
If successful, this request returns a JWT token that you can use for subsequent Airflow public API calls.

Only users authenticated via the database (``AUTH_TYPE = AUTH_DB``) or LDAP
(``AUTH_TYPE = AUTH_LDAP``) can generate tokens using this method.
For more details, see :doc:`webserver-authentication`.

If you need to generate a token using a different authentication mechanism, see the next section.

Custom authentication implementation
------------------------------------

By default, JWT tokens for the Airflow public API can only be generated using
basic authentication (username and password) for database or LDAP users.

If you want to support another authentication mechanism, such as oauth, you can do so by overriding the
``create_token`` method in the FAB auth manager.

Example
'''''''

.. code-block:: python
class MyAuthManager(FabAuthManager):
def create_token(self, headers: dict[str, str], body: dict[str, Any]) -> User:
"""
Return the authenticated user for a given payload.
Implement your own custom token creation logic here.
"""
...
Oauth example
'''''''''''''

Below is an example implementation that uses OAuth to allow users to obtain a JWT token.
This custom logic overrides the default ``create_token`` method from the FAB authentication manager.

.. warning::
The example shown below disables signature verification (``verify_signature=False``).
This is **insecure** and should only be used for testing. Always validate tokens properly in production.

.. code-block:: python
class MyAuthManager(FabAuthManager):
def create_token(self, headers: dict[str, str], body: dict[str, Any]) -> User:
"""
Return the authenticated user derived from an OAuth access token.
Implement your own custom token validation and user mapping logic here.
"""
user = None
# Handle OAuth-based authentication
if self.security_manager.auth_type == AUTH_OAUTH:
# Require a Bearer token
auth_header = headers.get("Authorization")
if not auth_header:
return None
token = auth_header.replace("Bearer ", "")
# Example token decoding
#
# With signature validation (recommended):
# me = jwt.decode(
# token,
# public_key,
# algorithms=['HS256', 'RS256'],
# audience=CLIENT_ID
# )
#
# Without signature validation (not recommended):
me = jwt.decode(token, options={"verify_signature": False})
# Extract groups/roles (example schema — adjust to your provider)
groups = me["resource_access"]["airflow"]["roles"] # requires validation
if not groups:
groups = ["airflow_public"]
else:
groups = [g for g in groups if "airflow" in g]
# Build user info payload for FAB
userinfo = {
"username": me.get("preferred_username"),
"email": me.get("email"),
"first_name": me.get("given_name"),
"last_name": me.get("family_name"),
"role_keys": groups,
}
user = self.security_manager.auth_user_oauth(userinfo)
# Fall back to the default implementation
else:
user = super().create_token(headers=headers, body=body)
log.info("User: %s", user)
# Log user into the session
if user is not None:
login_user(user, remember=False)
Only users from database (`AUTH_TYPE = AUTH_DB`) or from LDAP (`AUTH_TYPE = AUTH_LDAP`) can be used to generate a token.
See :doc:`Airflow public API <webserver-authentication>` for more details.
return user
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,3 @@ class LoginResponse(BaseModel):
"""API Token serializer for responses."""

access_token: str


class LoginBody(BaseModel):
"""API Token serializer for requests."""

username: str
password: str
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/LoginBody'
additionalProperties: true
type: object
title: Body
required: true
responses:
'201':
Expand Down Expand Up @@ -55,7 +57,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/LoginBody'
additionalProperties: true
type: object
title: Body
required: true
responses:
'201':
Expand Down Expand Up @@ -440,20 +444,6 @@ components:
title: Detail
type: object
title: HTTPValidationError
LoginBody:
properties:
username:
type: string
title: Username
password:
type: string
title: Password
type: object
required:
- username
- password
title: LoginBody
description: API Token serializer for requests.
LoginResponse:
properties:
access_token:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations

from typing import Any

from fastapi import Body
from starlette import status
from starlette.requests import Request # noqa: TC002
from starlette.responses import RedirectResponse
Expand All @@ -25,7 +28,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginBody, LoginResponse
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginResponse
from airflow.providers.fab.auth_manager.api_fastapi.services.login import FABAuthManagerLogin
from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder

Expand All @@ -38,10 +41,10 @@
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED]),
)
def create_token(body: LoginBody) -> LoginResponse:
def create_token(request: Request, body: dict[str, Any] = Body(...)) -> LoginResponse:
"""Generate a new API token."""
with get_application_builder():
return FABAuthManagerLogin.create_token(body=body)
return FABAuthManagerLogin.create_token(headers=dict(request.headers), body=body)


@login_router.post(
Expand All @@ -50,11 +53,13 @@ def create_token(body: LoginBody) -> LoginResponse:
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED]),
)
def create_token_cli(body: LoginBody) -> LoginResponse:
def create_token_cli(request: Request, body: dict[str, Any] = Body(...)) -> LoginResponse:
"""Generate a new CLI API token."""
with get_application_builder():
return FABAuthManagerLogin.create_token(
body=body, expiration_time_in_seconds=conf.getint("api_auth", "jwt_cli_expiration_time")
headers=dict(request.headers),
body=body,
expiration_time_in_seconds=conf.getint("api_auth", "jwt_cli_expiration_time"),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,32 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, cast
from typing import Any

from flask_appbuilder.const import AUTH_LDAP
from starlette import status
from starlette.exceptions import HTTPException

from airflow.api_fastapi.app import get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginBody, LoginResponse

if TYPE_CHECKING:
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.fab.auth_manager.models import User
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginResponse
from airflow.providers.fab.www.utils import get_fab_auth_manager


class FABAuthManagerLogin:
"""Login Service for FABAuthManager."""

@classmethod
def create_token(
cls, body: LoginBody, expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time")
cls,
headers: dict[str, str],
body: dict[str, Any],
expiration_time_in_seconds: int = conf.getint("api_auth", "jwt_expiration_time"),
) -> LoginResponse:
"""Create a new token."""
if not body.username or not body.password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username and password must be provided"
)

auth_manager = cast("FabAuthManager", get_auth_manager())
user: User | None = None

if auth_manager.security_manager.auth_type == AUTH_LDAP:
user = auth_manager.security_manager.auth_user_ldap(
body.username, body.password, rotate_session_id=False
)
if user is None:
user = auth_manager.security_manager.auth_user_db(
body.username, body.password, rotate_session_id=False
)
auth_manager = get_fab_auth_manager()
try:
user = auth_manager.create_token(headers=headers, body=body)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))

if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from connexion import FlaskApi
from fastapi import FastAPI
from flask import Blueprint, current_app, g
from flask_appbuilder.const import AUTH_LDAP
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from starlette.middleware.wsgi import WSGIMiddleware
Expand Down Expand Up @@ -299,6 +300,32 @@ def is_logged_in(self) -> bool:
or (not user.is_anonymous and user.is_active)
)

def create_token(self, headers: dict[str, str], body: dict[str, Any]) -> User:
"""
Create a new token from a payload.
By default, it uses basic authentication (username and password).
Override this method to use a different authentication method (e.g. oauth).
:param headers: request headers
:param body: request body
"""
if not body.get("username") or not body.get("password"):
raise ValueError("Username and password must be provided")

user: User | None = None

if self.security_manager.auth_type == AUTH_LDAP:
user = self.security_manager.auth_user_ldap(
body["username"], body["password"], rotate_session_id=False
)
if user is None:
user = self.security_manager.auth_user_db(
body["username"], body["password"], rotate_session_id=False
)

return user

def is_authorized_configuration(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import pytest

from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginBody, LoginResponse
from airflow.providers.fab.auth_manager.api_fastapi.datamodels.login import LoginResponse


@pytest.mark.db_test
class TestLogin:
dummy_login_body = LoginBody(username="dummy", password="dummy")
dummy_login_body = {"username": "dummy", "password": "dummy"}
dummy_token = LoginResponse(access_token="DUMMY_TOKEN")

@patch("airflow.providers.fab.auth_manager.api_fastapi.routes.login.FABAuthManagerLogin")
Expand All @@ -36,7 +36,7 @@ def test_create_token(self, mock_fab_auth_manager_login, test_client):

response = test_client.post(
"/token",
json=self.dummy_login_body.model_dump(),
json=self.dummy_login_body,
)
assert response.status_code == 201
assert response.json()["access_token"] == self.dummy_token.access_token
Expand All @@ -47,7 +47,7 @@ def test_create_token_cli(self, mock_fab_auth_manager_login, test_client):

response = test_client.post(
"/token/cli",
json=self.dummy_login_body.model_dump(),
json=self.dummy_login_body,
)
assert response.status_code == 201
assert response.json()["access_token"] == self.dummy_token.access_token
Expand Down
Loading