Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions backend/app/config/settings.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Security Settings
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
API_KEY = os.getenv("API_KEY", "dev-api-key-change-in-production")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))

Comment on lines +8 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid shipping dev secrets in production

SECRET_KEY/API_KEY have permissive dev defaults. Enforce env presence in prod (fail-fast or warn) to prevent weak secrets in deployments.

Example:

+APP_ENV = os.getenv("APP_ENV", "development").lower()
 SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
 API_KEY = os.getenv("API_KEY", "dev-api-key-change-in-production")
@@
-ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
+ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
+if APP_ENV == "production" and (
+    SECRET_KEY == "dev-secret-key-change-in-production"
+    or API_KEY == "dev-api-key-change-in-production"
+):
+    raise RuntimeError("SECRET_KEY/API_KEY must be set in production")
πŸ“ Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
API_KEY = os.getenv("API_KEY", "dev-api-key-change-in-production")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
APP_ENV = os.getenv("APP_ENV", "development").lower()
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
API_KEY = os.getenv("API_KEY", "dev-api-key-change-in-production")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
if APP_ENV == "production" and (
SECRET_KEY == "dev-secret-key-change-in-production"
or API_KEY == "dev-api-key-change-in-production"
):
raise RuntimeError("SECRET_KEY/API_KEY must be set in production")
πŸ€– Prompt for AI Agents
In backend/app/config/settings.py around lines 8 to 12, SECRET_KEY and API_KEY
currently fall back to permissive dev defaults; remove or avoid silent defaults
and enforce environment presence in production by either reading them without a
default (os.environ["SECRET_KEY"], os.environ["API_KEY"]) or validating after
load and raising an error when running in production (e.g., when ENV/DEBUG
indicates prod). Implement a fail-fast behavior: if running in production and
either SECRET_KEY or API_KEY is missing or empty, raise a clear exception on
startup (or exit) with a message instructing to set the env var; for local/dev
runs keep acknowleged dev-only defaults or explicit debug branch that logs a
warning rather than silently using weak keys.

# CORS Settings
ALLOWED_ORIGINS = os.getenv(
"ALLOWED_ORIGINS", "http://localhost:1420,tauri://localhost"
).split(",")

# Rate Limiting
RATE_LIMIT_PER_MINUTE = int(os.getenv("RATE_LIMIT_PER_MINUTE", "60"))
RATE_LIMIT_PER_HOUR = int(os.getenv("RATE_LIMIT_PER_HOUR", "1000"))

# Model Exports Path
MODEL_EXPORTS_PATH = "app/models/ONNX_Exports"

# Microservice URLs
SYNC_MICROSERVICE_URL = "http://localhost:8001/api/v1"
SYNC_MICROSERVICE_URL = os.getenv(
"SYNC_MICROSERVICE_URL", "http://localhost:8001/api/v1"
)

CONFIDENCE_PERCENT = 0.6
# Object Detection Models:
Expand All @@ -20,6 +43,6 @@

TEST_INPUT_PATH = "tests/inputs"
TEST_OUTPUT_PATH = "tests/outputs"
DATABASE_PATH = "app/database/PictoPy.db"
DATABASE_PATH = os.getenv("DATABASE_PATH", "app/database/PictoPy.db")
THUMBNAIL_IMAGES_PATH = "./images/thumbnails"
IMAGES_PATH = "./images"
19 changes: 19 additions & 0 deletions backend/app/middleware/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Middleware package for PictoPy API.
"""

from .auth import (
create_access_token,
verify_token,
verify_api_key,
get_current_user,
get_current_user_optional,
)

__all__ = [
"create_access_token",
"verify_token",
"verify_api_key",
"get_current_user",
"get_current_user_optional",
]
155 changes: 155 additions & 0 deletions backend/app/middleware/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
Authentication middleware for PictoPy API.
Supports both JWT tokens and API key authentication.
"""

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, HTTPException, status, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt

from app.config.settings import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, API_KEY

# Security schemes
security = HTTPBearer(auto_error=False)


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token.

Args:
data: Data to encode in the token
expires_delta: Token expiration time

Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


def verify_token(token: str) -> dict:
"""
Verify and decode a JWT token.

Args:
token: JWT token to verify

Returns:
Decoded token payload

Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)


async def verify_api_key(x_api_key: Optional[str] = Header(None)) -> bool:
"""
Verify API key from header for Tauri application.

Args:
x_api_key: API key from X-API-Key header

Returns:
True if API key is valid

Raises:
HTTPException: If API key is invalid or missing
"""
if not x_api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key is missing",
headers={"WWW-Authenticate": "ApiKey"},
)

if x_api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key",
)

return True


async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None),
) -> Optional[dict]:
"""
Get current user from JWT token or API key (optional authentication).
Used for endpoints that work with or without authentication.

Args:
credentials: HTTP Bearer credentials
x_api_key: API key from header

Returns:
User data if authenticated, None otherwise
"""
# Check API key first (for Tauri app)
if x_api_key and x_api_key == API_KEY:
return {"authenticated_via": "api_key", "client": "tauri"}

# Check JWT token
if credentials and credentials.credentials:
try:
payload = verify_token(credentials.credentials)
return payload
except HTTPException:
return None

return None


async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None),
) -> dict:
"""
Get current user from JWT token or API key (required authentication).
Used for protected endpoints that require authentication.

Args:
credentials: HTTP Bearer credentials
x_api_key: API key from header

Returns:
User data

Raises:
HTTPException: If authentication fails
"""
# Check API key first (for Tauri app)
if x_api_key and x_api_key == API_KEY:
return {"authenticated_via": "api_key", "client": "tauri"}

# Check JWT token
if credentials and credentials.credentials:
payload = verify_token(credentials.credentials)
return payload

raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
104 changes: 104 additions & 0 deletions backend/app/routes/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Authentication routes for PictoPy API.
"""

from datetime import timedelta
from typing import Optional

from fastapi import APIRouter, HTTPException, status, Header
from pydantic import BaseModel

from app.middleware.auth import create_access_token, verify_api_key
from app.config.settings import ACCESS_TOKEN_EXPIRE_MINUTES, API_KEY

router = APIRouter()


class TokenRequest(BaseModel):
"""Request model for token generation."""

client_id: str
api_key: str


class TokenResponse(BaseModel):
"""Response model for token generation."""

access_token: str
token_type: str
expires_in: int


class AuthStatusResponse(BaseModel):
"""Response model for auth status check."""

authenticated: bool
auth_method: Optional[str] = None
message: str


@router.post(
"/token",
response_model=TokenResponse,
summary="Generate JWT Token",
description="Generate a JWT access token using API key authentication. Used for testing or future web interface.",
)
async def generate_token(request: TokenRequest):
"""
Generate a JWT access token.

Args:
request: Token request containing client_id and api_key

Returns:
Access token and expiration info

Raises:
HTTPException: If API key is invalid
"""
# Verify API key
if request.api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key",
)

# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": request.client_id, "client": "web"}, expires_delta=access_token_expires
)

return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60, # in seconds
)


@router.get(
"/status",
response_model=AuthStatusResponse,
summary="Check Authentication Status",
description="Check if the provided API key is valid.",
)
async def check_auth_status(x_api_key: Optional[str] = Header(None)):
"""
Check authentication status.

Args:
x_api_key: API key from X-API-Key header

Returns:
Authentication status
"""
if x_api_key and x_api_key == API_KEY:
return AuthStatusResponse(
authenticated=True,
auth_method="api_key",
message="Successfully authenticated via API key",
)

return AuthStatusResponse(
authenticated=False, auth_method=None, message="Not authenticated"
)
Loading