Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ def get_fastapi_app(self) -> FastAPI | None:
app.include_router(roles_router)
app.include_router(users_router)

# Session cleanup middleware to prevent PendingRollbackError.
# FAB's Flask views (e.g., /users/list/, /roles/list/) are mounted below via
# WSGIMiddleware. These views use settings.Session (SQLAlchemy scoped_session),
# but unlike a native Flask app where teardown_appcontext calls Session.remove(),
# the WSGI wrapper does not trigger Flask's teardown hooks.
# Without explicit cleanup, sessions remain in "idle in transaction" state.
# When the database connection times out (e.g., PostgreSQL's
# idle_in_transaction_session_timeout), subsequent requests reusing the
# invalidated session raise PendingRollbackError.
@app.middleware("http")
async def cleanup_session_middleware(request, call_next):
try:
response = await call_next(request)
return response
finally:
from airflow import settings

if settings.Session:
settings.Session.remove()

app.mount("/", WSGIMiddleware(flask_app))

return app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,3 +957,85 @@ def test_resetdb(
mock_init.assert_not_called()
else:
mock_init.assert_called_once()


class TestFabAuthManagerSessionCleanup:
"""Test session cleanup middleware in FAB auth manager FastAPI app.
Background:
FAB auth manager's FastAPI app has the following route structure:
- /token, /logout: FastAPI routes (login_router)
- /users/*, /roles/*: FastAPI API routes
- /*: WSGIMiddleware -> Flask App (FAB views like /users/list/, /roles/list/)
Problem:
FAB's Flask views (e.g., /users/list/, /roles/list/) use settings.Session
(SQLAlchemy scoped_session). In a normal Flask app, teardown_appcontext
automatically calls Session.remove() after each request. However, when Flask
is mounted via WSGIMiddleware in FastAPI, teardown_appcontext does NOT trigger.
This leaves database sessions in "idle in transaction" state. When the database
connection times out (e.g., PostgreSQL's idle_in_transaction_session_timeout),
subsequent requests reusing the invalidated session raise PendingRollbackError.
Solution:
Add a FastAPI middleware that calls Session.remove() in the finally block,
ensuring session cleanup for ALL requests including those forwarded to Flask via WSGI.
"""

@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
def test_session_cleanup_middleware_on_wsgi_route(self, mock_create_app):
"""Test Session.remove() is called after requests to WSGI-mounted Flask routes.
This is the critical scenario: requests to Flask AppBuilder views like
/users/list/ and /roles/list/ go through WSGIMiddleware. Without the
cleanup middleware, these requests leave sessions in "idle in transaction"
state, eventually causing PendingRollbackError.
"""
from unittest.mock import patch

from fastapi.testclient import TestClient

# Setup mock Flask app (simulates FAB's Flask app)
mock_flask_app = MagicMock()
mock_create_app.return_value = mock_flask_app

auth_manager = FabAuthManager()
fastapi_app = auth_manager.get_fastapi_app()

client = TestClient(fastapi_app, raise_server_exceptions=False)

with patch("airflow.settings.Session") as mock_session:
# Request to a path not handled by FastAPI routers goes to WSGIMiddleware -> Flask
# This simulates accessing /users/list/ or /roles/list/ which caused the original bug
client.get("/users/list/")

# Verify Session.remove() was called by the cleanup middleware
mock_session.remove.assert_called()

@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
def test_session_cleanup_middleware_on_fastapi_route(self, mock_create_app):
"""Test Session.remove() is also called after FastAPI route requests.
Even though FastAPI routes may not directly use settings.Session,
the middleware should clean up any session that might have been
used during request processing (e.g., by dependencies or nested calls).
"""
from unittest.mock import patch

from fastapi.testclient import TestClient

mock_flask_app = MagicMock()
mock_create_app.return_value = mock_flask_app

auth_manager = FabAuthManager()
fastapi_app = auth_manager.get_fastapi_app()

client = TestClient(fastapi_app, raise_server_exceptions=False)

with patch("airflow.settings.Session") as mock_session:
# Request to a FastAPI route (login endpoint)
client.post("/token", json={"username": "test", "password": "test"})

# Verify Session.remove() was called
mock_session.remove.assert_called()