Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 86 additions & 25 deletions app/routers/authentication.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,119 @@
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Annotated

import jwt
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from sqlmodel.ext.asyncio.session import AsyncSession

from app.schemas import Community, Token, TokenPayload
from app.services import auth
from app.schemas import Token, TokenPayload, Community
from app.services.database.models import Community as DBCommunity
from app.services.database.orm.community import get_community_by_username

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/authentication/token")


def setup():
router = APIRouter(prefix='/authentication', tags=['authentication'])
async def authenticate_community( request: Request , username: str, password: str):
# Valida se o usuário existe e se a senha está correta
session: AsyncSession = request.app.db_session_factory
found_community = await get_community_by_username(
username=username,
session= session
)
if not found_community or not auth.verify_password(password, found_community.password):
router = APIRouter(prefix="/authentication", tags=["authentication"])

async def authenticate_community(
request: Request, username: str, password: str
):
# Valida se o usuário existe e se a senha está correta
session: AsyncSession = request.app.db_session_factory
found_community = await get_community_by_username(
username=username, session=session
)
if not found_community or not auth.verify_password(
password, found_community.password
):
return None
return found_community
return found_community

async def get_current_community(
request: Request,
token: Annotated[str, Depends(oauth2_scheme)],
) -> DBCommunity:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

#### Teste
try:
payload = jwt.decode(
token, auth.SECRET_KEY, algorithms=[auth.ALGORITHM]
)
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenPayload(username=username)
except InvalidTokenError:
raise credentials_exception
session: AsyncSession = request.app.db_session_factory
community = await get_community_by_username(
session=session, username=token_data.username
)
if community is None:
raise credentials_exception

return community

async def get_current_active_community(
current_user: Annotated[DBCommunity, Depends(get_current_community)],
) -> DBCommunity:
# A função simplesmente retorna o usuário.
# Pode ser estendido futuramente para verificar um status "ativo".
return current_user

# Teste

@router.post("/create_commumity")
async def create_community(request: Request ):
async def create_community(request: Request):
password = "123Asd!@#"
hashed_password=auth.hash_password(password)
community = DBCommunity(username="username", email="username@test.com", password=hashed_password)
hashed_password = auth.hash_password(password)
community = DBCommunity(
username="username",
email="username@test.com",
password=hashed_password,
)
session: AsyncSession = request.app.db_session_factory
session.add(community)
await session.commit()
await session.refresh(community)
return {'msg':'succes? '}
#### Teste
return {"msg": "succes? "}

# Teste

@router.post("/token", response_model=Token)
async def login_for_access_token(request: Request , form_data: OAuth2PasswordRequestForm = Depends() ) :
async def login_for_access_token(
request: Request, form_data: OAuth2PasswordRequestForm = Depends()
):
# Rota de login: valida credenciais e retorna token JWT
community = await authenticate_community( request, form_data.username, form_data.password)
community = await authenticate_community(
request, form_data.username, form_data.password
)
if not community:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenciais inválidas"
detail="Credenciais inválidas",
)
payload = TokenPayload(username=community.username)
token, expires_in = auth.create_access_token(data=payload)
return {
"access_token": token,
"token_type": "Bearer",
"expires_in": expires_in
"expires_in": expires_in,
}
return router # Retorna o router configurado com as rotas de autenticação

@router.get("/me", response_model=Community)
async def read_community_me(
current_community: Annotated[
DBCommunity, Depends(get_current_active_community)
],
):
# Rota para obter informações do usuário autenticado
return current_community

return router # Retorna o router configurado com as rotas de autenticação
2 changes: 1 addition & 1 deletion app/routers/libraries/routes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from fastapi import APIRouter, Request, status
from pydantic import BaseModel
from services.database.orm.library import insert_library

from app.schemas import Library as LibrarySchema
from app.services.database.models.libraries import Library
from app.services.database.orm.library import insert_library


class LibraryResponse(BaseModel):
Expand Down
31 changes: 13 additions & 18 deletions app/services/auth.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,36 @@
#from passlib.context import CryptContext
import bcrypt
# from passlib.context import CryptContext
import os
from datetime import datetime, timedelta, timezone
from app.schemas import TokenPayload

import bcrypt
import jwt
import os

from app.schemas import TokenPayload

SECRET_KEY = os.getenv("SECRET_KEY", "default_fallback_key")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 20))

def verify_password(plain, hashed):

def verify_password(plain, hashed):
# Verifica se a senha passada bate com a hash da comunidade
return bcrypt.checkpw(
bytes(plain, encoding="utf-8"),
hashed,
)

def hash_password(password):

def hash_password(password):
# Retorna a senha em hash para salvar no banco de dados
return bcrypt.hashpw(
bytes(password, encoding="utf-8"),
bcrypt.gensalt(),
)



#pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

#def verify_password(plain, hashed):
# # Verifica se a senha passada bate com a hash da comunidade
# return pwd_context.verify(plain, hashed)
#
#def hash_password(password):
# # Retorna a senha em hash para salvar no banco de dados
# return pwd_context.hash(password)

def create_access_token(data: TokenPayload, expires_delta: timedelta | None = None):
def create_access_token(
data: TokenPayload, expires_delta: timedelta | None = None
):
"""
Gera um token JWT contendo os dados do usuário (payload) e uma data de expiração.
JWT specification says that there's a key sub (subject) that should be used to identify the user.
Expand Down
11 changes: 7 additions & 4 deletions app/services/database/orm/community.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from typing import Optional

from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

from app.services.database.models import Community


async def get_community_by_username(
username: str,
session: AsyncSession,) -> Optional[Community]:
session: AsyncSession,
) -> Optional[Community]:
"""
Busca e retorna um membro da comunidade pelo nome de usuário.
Retorna None se o usuário não for encontrado.
"""
# Cria a declaração SQL para buscar a comunidade pelo nome de usuário
statement = select(Community).where(Community.username == username)

# Executa a declaração na sessão e retorna o primeiro resultado
result = await session.exec(statement)
community = result.first()
return community

return community
96 changes: 77 additions & 19 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import pytest
import pytest_asyncio
from fastapi import status
from httpx import AsyncClient
from services.database.models import Community
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

from httpx import AsyncClient
from fastapi import status
from typing import Mapping
from app.services.auth import hash_password

password = "123Asd!@#"

## gerar usuario para autenticação

# gerar usuario para autenticação
@pytest_asyncio.fixture
async def community(session: AsyncSession):
hashed_password=hash_password(password)
community = Community(username="username", email="username@test.com", password=hashed_password)
hashed_password = hash_password(password)
community = Community(
username="username", email="username@test.com", password=hashed_password
)
session.add(community)
await session.commit()
await session.refresh(community)
Expand All @@ -24,23 +25,20 @@ async def community(session: AsyncSession):

@pytest.mark.asyncio
async def test_authentication_token_endpoint(
async_client: AsyncClient,
community: Community # Adicionando a comunidade do fixture
async_client: AsyncClient,
community: Community, # Adicionando a comunidade do fixture
):
"""
Testa o endpoint de login (/token) com credenciais válidas e inválidas.
"""
# 1. Teste de login com credenciais válidas
# O OAuth2PasswordRequestForm espera 'username' e 'password'
form_data = {
"username": community.username,
"password": password
}

form_data = {"username": community.username, "password": password}

response = await async_client.post(
"/api/authentication/token",
data=form_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

# Validar a resposta
Expand All @@ -51,17 +49,77 @@ async def test_authentication_token_endpoint(

# 2. Teste de login com credenciais inválidas
invalid_form_data = {
"username": "wrong_username",
"password": "wrong_password"
"username": "wrong_username",
"password": "wrong_password",
}

response_invalid = await async_client.post(
"/api/authentication/token",
data=invalid_form_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

# Validar que o status é 401 Unauthorized
assert response_invalid.status_code == status.HTTP_401_UNAUTHORIZED
assert response_invalid.json()["detail"] == "Credenciais inválidas"


@pytest.mark.asyncio
async def test_community_me_with_valid_token(
async_client: AsyncClient, community: Community
):
"""
Testa se o endpoint protegido /authenticate/me/ retorna os dados do usuário com um token válido.
"""
# 1. Obter um token de acesso primeiro
form_data = {
"grant_type": "password",
"username": community.username,
"password": password,
}
token_response = await async_client.post(
"/api/authentication/token",
data=form_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert token_response.status_code == status.HTTP_200_OK
token = token_response.json()["access_token"]

# 2. Acessar o endpoint protegido com o token
headers = {"Authorization": f"Bearer {token}"}
response = await async_client.get("/api/authentication/me", headers=headers)

# Validar a resposta
assert response.status_code == status.HTTP_200_OK
user_data = response.json()
assert user_data["username"] == community.username
assert user_data["email"] == community.email
# Assegurar que a senha não é retornada na resposta
assert "password" not in user_data


@pytest.mark.asyncio
async def test_community_me_without_token(async_client: AsyncClient):
"""
Testa se o endpoint protegido authentication/me/ retorna um erro 401 sem um token de acesso.
"""
response = await async_client.get("/api/authentication/me")

# Validar a resposta
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "detail" in response.json()
assert response.json()["detail"] == "Not authenticated"


@pytest.mark.asyncio
async def test_community_me_with_bad_token(async_client: AsyncClient):
"""
Testa se o endpoint protegido authentication/me/ retorna um erro 401 sem um token de acesso.
"""
headers = {"Authorization": "Bearer WrongToken"}
response = await async_client.get("/api/authentication/me", headers=headers)

# Validar a resposta
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "detail" in response.json()
assert response.json()["detail"] == "Could not validate credentials"
Loading