From a8c2f6d801f8b5ffae974d325e5f4b0e1f6c66d9 Mon Sep 17 00:00:00 2001 From: IkeSalmonson Date: Tue, 26 Aug 2025 16:00:25 -0300 Subject: [PATCH 1/2] feature: add get_current_community() refactor: reorganize imports and improve authentication logic in routers --- app/routers/authentication.py | 111 +++++++++++++++++++------ app/routers/libraries/routes.py | 2 +- app/services/auth.py | 31 +++---- app/services/database/orm/community.py | 11 ++- tests/test_subscriptions.py | 8 +- 5 files changed, 111 insertions(+), 52 deletions(-) diff --git a/app/routers/authentication.py b/app/routers/authentication.py index 400dc50..d1cf368 100755 --- a/app/routers/authentication.py +++ b/app/routers/authentication.py @@ -1,58 +1,119 @@ -from fastapi import APIRouter, Depends, HTTPException, status, Request -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm -from sqlmodel.ext.asyncio.session import AsyncSession +from typing import Annotated + import jwt +from fastapi import APIRouter, Depends, HTTPException, Request, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt.exceptions import InvalidTokenError +from sqlmodel.ext.asyncio.session import AsyncSession +from app.schemas import Token, TokenPayload from app.services import auth -from app.schemas import Token, TokenPayload, Community from app.services.database.models import Community as DBCommunity from app.services.database.orm.community import get_community_by_username oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/authentication/token") + def setup(): - router = APIRouter(prefix='/authentication', tags=['authentication']) - async def authenticate_community( request: Request , username: str, password: str): - # Valida se o usuário existe e se a senha está correta - session: AsyncSession = request.app.db_session_factory - found_community = await get_community_by_username( - username=username, - session= session - ) - if not found_community or not auth.verify_password(password, found_community.password): + router = APIRouter(prefix="/authentication", tags=["authentication"]) + + async def authenticate_community( + request: Request, username: str, password: str + ): + # Valida se o usuário existe e se a senha está correta + session: AsyncSession = request.app.db_session_factory + found_community = await get_community_by_username( + username=username, session=session + ) + if not found_community or not auth.verify_password( + password, found_community.password + ): return None - return found_community + return found_community + async def get_current_community( + request: Request, + token: Annotated[str, Depends(oauth2_scheme)], + ) -> DBCommunity: + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) - #### Teste + try: + payload = jwt.decode( + token, auth.SECRET_KEY, algorithms=[auth.ALGORITHM] + ) + username = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenPayload(username=username) + except InvalidTokenError: + raise credentials_exception + session: AsyncSession = request.app.db_session_factory + community = await get_community_by_username( + session=session, username=token_data.username + ) + if community is None: + raise credentials_exception + + return community + + async def get_current_active_community( + current_user: Annotated[DBCommunity, Depends(get_current_community)], + ) -> DBCommunity: + # A função simplesmente retorna o usuário. + # Pode ser estendido futuramente para verificar um status "ativo". + return current_user + + # Teste @router.post("/create_commumity") - async def create_community(request: Request ): + async def create_community(request: Request): password = "123Asd!@#" - hashed_password=auth.hash_password(password) - community = DBCommunity(username="username", email="username@test.com", password=hashed_password) + hashed_password = auth.hash_password(password) + community = DBCommunity( + username="username", + email="username@test.com", + password=hashed_password, + ) session: AsyncSession = request.app.db_session_factory session.add(community) await session.commit() await session.refresh(community) - return {'msg':'succes? '} - #### Teste + return {"msg": "succes? "} + + # Teste @router.post("/token", response_model=Token) - async def login_for_access_token(request: Request , form_data: OAuth2PasswordRequestForm = Depends() ) : + async def login_for_access_token( + request: Request, form_data: OAuth2PasswordRequestForm = Depends() + ): # Rota de login: valida credenciais e retorna token JWT - community = await authenticate_community( request, form_data.username, form_data.password) + community = await authenticate_community( + request, form_data.username, form_data.password + ) if not community: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Credenciais inválidas" + detail="Credenciais inválidas", ) payload = TokenPayload(username=community.username) token, expires_in = auth.create_access_token(data=payload) return { "access_token": token, "token_type": "Bearer", - "expires_in": expires_in + "expires_in": expires_in, } - return router # Retorna o router configurado com as rotas de autenticação + + @router.get("/me", response_model=DBCommunity) + async def read_community_me( + current_community: Annotated[ + DBCommunity, Depends(get_current_active_community) + ], + ): + # Rota para obter informações do usuário autenticado + return current_community + + return router # Retorna o router configurado com as rotas de autenticação diff --git a/app/routers/libraries/routes.py b/app/routers/libraries/routes.py index 125ca68..38bf7ec 100644 --- a/app/routers/libraries/routes.py +++ b/app/routers/libraries/routes.py @@ -1,9 +1,9 @@ from fastapi import APIRouter, Request, status from pydantic import BaseModel -from services.database.orm.library import insert_library from app.schemas import Library as LibrarySchema from app.services.database.models.libraries import Library +from app.services.database.orm.library import insert_library class LibraryResponse(BaseModel): diff --git a/app/services/auth.py b/app/services/auth.py index 6f15a14..c68d1ae 100755 --- a/app/services/auth.py +++ b/app/services/auth.py @@ -1,22 +1,26 @@ -#from passlib.context import CryptContext -import bcrypt +# from passlib.context import CryptContext +import os from datetime import datetime, timedelta, timezone -from app.schemas import TokenPayload + +import bcrypt import jwt -import os + +from app.schemas import TokenPayload SECRET_KEY = os.getenv("SECRET_KEY", "default_fallback_key") ALGORITHM = os.getenv("ALGORITHM", "HS256") ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 20)) -def verify_password(plain, hashed): + +def verify_password(plain, hashed): # Verifica se a senha passada bate com a hash da comunidade return bcrypt.checkpw( bytes(plain, encoding="utf-8"), hashed, ) -def hash_password(password): + +def hash_password(password): # Retorna a senha em hash para salvar no banco de dados return bcrypt.hashpw( bytes(password, encoding="utf-8"), @@ -24,18 +28,9 @@ def hash_password(password): ) - -#pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - -#def verify_password(plain, hashed): -# # Verifica se a senha passada bate com a hash da comunidade -# return pwd_context.verify(plain, hashed) -# -#def hash_password(password): -# # Retorna a senha em hash para salvar no banco de dados -# return pwd_context.hash(password) - -def create_access_token(data: TokenPayload, expires_delta: timedelta | None = None): +def create_access_token( + data: TokenPayload, expires_delta: timedelta | None = None +): """ Gera um token JWT contendo os dados do usuário (payload) e uma data de expiração. JWT specification says that there's a key sub (subject) that should be used to identify the user. diff --git a/app/services/database/orm/community.py b/app/services/database/orm/community.py index 1e41933..d69e572 100644 --- a/app/services/database/orm/community.py +++ b/app/services/database/orm/community.py @@ -1,21 +1,24 @@ from typing import Optional + from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession + from app.services.database.models import Community async def get_community_by_username( username: str, - session: AsyncSession,) -> Optional[Community]: + session: AsyncSession, +) -> Optional[Community]: """ Busca e retorna um membro da comunidade pelo nome de usuário. Retorna None se o usuário não for encontrado. """ # Cria a declaração SQL para buscar a comunidade pelo nome de usuário statement = select(Community).where(Community.username == username) - + # Executa a declaração na sessão e retorna o primeiro resultado result = await session.exec(statement) community = result.first() - - return community \ No newline at end of file + + return community diff --git a/tests/test_subscriptions.py b/tests/test_subscriptions.py index 1b8b40c..ff2ed3e 100755 --- a/tests/test_subscriptions.py +++ b/tests/test_subscriptions.py @@ -1,11 +1,9 @@ import pytest import pytest_asyncio - +from services.database.models import Community, Subscription from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession -from services.database.models import Community, Subscription - @pytest_asyncio.fixture async def community(session: AsyncSession): @@ -26,7 +24,9 @@ async def test_insert_subscription(session: AsyncSession, community: Community): session.add(subscription) await session.commit() - statement = select(Subscription).where(Subscription.email == "teste@teste.com") + statement = select(Subscription).where( + Subscription.email == "teste@teste.com" + ) result = await session.exec(statement) found = result.first() From 16bcbb75484165261a64274080c9d2b98c73d2f2 Mon Sep 17 00:00:00 2001 From: IkeSalmonson Date: Tue, 26 Aug 2025 16:19:51 -0300 Subject: [PATCH 2/2] Feature: Testes adicionados --- app/routers/authentication.py | 4 +- tests/test_auth.py | 96 ++++++++++++++++++++++++++++------- 2 files changed, 79 insertions(+), 21 deletions(-) diff --git a/app/routers/authentication.py b/app/routers/authentication.py index d1cf368..052b5ae 100755 --- a/app/routers/authentication.py +++ b/app/routers/authentication.py @@ -6,7 +6,7 @@ from jwt.exceptions import InvalidTokenError from sqlmodel.ext.asyncio.session import AsyncSession -from app.schemas import Token, TokenPayload +from app.schemas import Community, Token, TokenPayload from app.services import auth from app.services.database.models import Community as DBCommunity from app.services.database.orm.community import get_community_by_username @@ -107,7 +107,7 @@ async def login_for_access_token( "expires_in": expires_in, } - @router.get("/me", response_model=DBCommunity) + @router.get("/me", response_model=Community) async def read_community_me( current_community: Annotated[ DBCommunity, Depends(get_current_active_community) diff --git a/tests/test_auth.py b/tests/test_auth.py index 5650044..660b939 100755 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,21 +1,22 @@ import pytest import pytest_asyncio +from fastapi import status +from httpx import AsyncClient from services.database.models import Community -from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession -from httpx import AsyncClient -from fastapi import status -from typing import Mapping from app.services.auth import hash_password password = "123Asd!@#" -## gerar usuario para autenticação + +# gerar usuario para autenticação @pytest_asyncio.fixture async def community(session: AsyncSession): - hashed_password=hash_password(password) - community = Community(username="username", email="username@test.com", password=hashed_password) + hashed_password = hash_password(password) + community = Community( + username="username", email="username@test.com", password=hashed_password + ) session.add(community) await session.commit() await session.refresh(community) @@ -24,23 +25,20 @@ async def community(session: AsyncSession): @pytest.mark.asyncio async def test_authentication_token_endpoint( - async_client: AsyncClient, - community: Community # Adicionando a comunidade do fixture + async_client: AsyncClient, + community: Community, # Adicionando a comunidade do fixture ): """ Testa o endpoint de login (/token) com credenciais válidas e inválidas. """ # 1. Teste de login com credenciais válidas # O OAuth2PasswordRequestForm espera 'username' e 'password' - form_data = { - "username": community.username, - "password": password - } - + form_data = {"username": community.username, "password": password} + response = await async_client.post( "/api/authentication/token", data=form_data, - headers={"Content-Type": "application/x-www-form-urlencoded"} + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) # Validar a resposta @@ -51,17 +49,77 @@ async def test_authentication_token_endpoint( # 2. Teste de login com credenciais inválidas invalid_form_data = { - "username": "wrong_username", - "password": "wrong_password" + "username": "wrong_username", + "password": "wrong_password", } response_invalid = await async_client.post( "/api/authentication/token", data=invalid_form_data, - headers={"Content-Type": "application/x-www-form-urlencoded"} + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) - + # Validar que o status é 401 Unauthorized assert response_invalid.status_code == status.HTTP_401_UNAUTHORIZED assert response_invalid.json()["detail"] == "Credenciais inválidas" + +@pytest.mark.asyncio +async def test_community_me_with_valid_token( + async_client: AsyncClient, community: Community +): + """ + Testa se o endpoint protegido /authenticate/me/ retorna os dados do usuário com um token válido. + """ + # 1. Obter um token de acesso primeiro + form_data = { + "grant_type": "password", + "username": community.username, + "password": password, + } + token_response = await async_client.post( + "/api/authentication/token", + data=form_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + assert token_response.status_code == status.HTTP_200_OK + token = token_response.json()["access_token"] + + # 2. Acessar o endpoint protegido com o token + headers = {"Authorization": f"Bearer {token}"} + response = await async_client.get("/api/authentication/me", headers=headers) + + # Validar a resposta + assert response.status_code == status.HTTP_200_OK + user_data = response.json() + assert user_data["username"] == community.username + assert user_data["email"] == community.email + # Assegurar que a senha não é retornada na resposta + assert "password" not in user_data + + +@pytest.mark.asyncio +async def test_community_me_without_token(async_client: AsyncClient): + """ + Testa se o endpoint protegido authentication/me/ retorna um erro 401 sem um token de acesso. + """ + response = await async_client.get("/api/authentication/me") + + # Validar a resposta + assert response.status_code == status.HTTP_401_UNAUTHORIZED + assert "detail" in response.json() + assert response.json()["detail"] == "Not authenticated" + + +@pytest.mark.asyncio +async def test_community_me_with_bad_token(async_client: AsyncClient): + """ + Testa se o endpoint protegido authentication/me/ retorna um erro 401 sem um token de acesso. + """ + headers = {"Authorization": "Bearer WrongToken"} + response = await async_client.get("/api/authentication/me", headers=headers) + + # Validar a resposta + assert response.status_code == status.HTTP_401_UNAUTHORIZED + assert "detail" in response.json() + assert response.json()["detail"] == "Could not validate credentials"