-
Notifications
You must be signed in to change notification settings - Fork 1
Fix redirects #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix redirects #27
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| """ | ||
| import secrets | ||
| import logging | ||
| from urllib.parse import urlparse | ||
| from fastapi import APIRouter, Request, HTTPException | ||
| from fastapi.responses import RedirectResponse | ||
|
|
||
|
|
@@ -12,23 +13,80 @@ | |
| router = APIRouter() | ||
|
|
||
|
|
||
| def validate_redirect_url(url: str, request: Request) -> str: | ||
| """ | ||
| Validate and normalize redirect URL for security. | ||
| Only allows internal paths (starting with /) to prevent open redirect attacks. | ||
|
|
||
| Args: | ||
| url: URL to validate | ||
| request: Request object to get base URL | ||
|
|
||
| Returns: | ||
| Normalized internal path (relative URL) | ||
|
|
||
| Raises: | ||
| HTTPException: If URL is invalid or external | ||
| """ | ||
| if not url: | ||
| return "/" | ||
|
|
||
| # Parse URL | ||
| parsed = urlparse(url) | ||
|
|
||
| # If URL has scheme (http/https), check if it's internal | ||
| if parsed.scheme: | ||
| # Get base URL from request | ||
| base_url = str(request.base_url).rstrip('/') | ||
| request_host = request.url.hostname | ||
|
|
||
| # Allow only same host | ||
| if parsed.netloc and parsed.netloc != request_host: | ||
| logger.warning(f"External redirect URL blocked: {url}") | ||
| return "/" | ||
|
|
||
| # Extract path and query | ||
| path = parsed.path or "/" | ||
| query = parsed.query | ||
| if query: | ||
| path = f"{path}?{query}" | ||
| return path | ||
|
|
||
| # If no scheme, treat as relative path | ||
| # Ensure it starts with / | ||
| if not url.startswith('/'): | ||
| url = '/' + url | ||
|
Comment on lines
+57
to
+58
|
||
|
|
||
| return url | ||
|
|
||
|
|
||
| @router.get("/github") | ||
| async def github_login(request: Request, redirect_after: str = None): | ||
| """Initiate GitHub OAuth login""" | ||
| """ | ||
| Initiate GitHub OAuth login | ||
|
|
||
| Saves redirect URL for post-authentication redirect. | ||
| Only saves if explicitly provided via parameter or query string. | ||
| """ | ||
| # Generate state for CSRF protection | ||
| state = secrets.token_urlsafe(32) | ||
| request.session["oauth_state"] = state | ||
|
|
||
| # Save redirect URL if provided | ||
| if redirect_after: | ||
| request.session["oauth_redirect_after"] = redirect_after | ||
| elif "oauth_redirect_after" not in request.session: | ||
| # Get redirect_after from query parameter if not in session | ||
| # Get redirect_after from parameter or query string | ||
| if not redirect_after: | ||
| redirect_after = request.query_params.get("redirect_after") | ||
| if redirect_after: | ||
| request.session["oauth_redirect_after"] = redirect_after | ||
|
|
||
| logger.info(f"OAuth login initiated, state generated: {state[:10]}..., redirect_after: {request.session.get('oauth_redirect_after', 'not set')}") | ||
| # Validate and save redirect URL if provided | ||
| if redirect_after: | ||
| try: | ||
| validated_url = validate_redirect_url(redirect_after, request) | ||
| request.session["oauth_redirect_after"] = validated_url | ||
| logger.info(f"OAuth login initiated, state: {state[:10]}..., redirect_after: {validated_url}") | ||
| except Exception as e: | ||
| logger.warning(f"Invalid redirect URL provided: {redirect_after}, error: {e}") | ||
| # Continue without redirect_after, will redirect to / after auth | ||
| else: | ||
| logger.info(f"OAuth login initiated, state: {state[:10]}..., no redirect_after (will redirect to /)") | ||
|
|
||
| # Redirect to GitHub OAuth | ||
| oauth_url = get_oauth_url(state=state) | ||
|
|
@@ -85,8 +143,15 @@ async def github_callback(request: Request, code: str = None, state: str = None) | |
| request.session.pop("oauth_state", None) | ||
| logger.info(f"Session updated for user: {user_info['login']}") | ||
|
|
||
| # Redirect to saved URL or main page | ||
| # Get and validate redirect URL | ||
| redirect_url = request.session.pop("oauth_redirect_after", "/") | ||
| try: | ||
| # Validate redirect URL for security (prevent open redirect attacks) | ||
| redirect_url = validate_redirect_url(redirect_url, request) | ||
| except Exception as e: | ||
| logger.warning(f"Invalid redirect URL in session: {redirect_url}, error: {e}, redirecting to /") | ||
| redirect_url = "/" | ||
|
|
||
| logger.info(f"Redirecting after OAuth to: {redirect_url}") | ||
| return RedirectResponse(url=redirect_url, status_code=303) | ||
| except HTTPException: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The early return for empty URL in
validate_redirect_url()lacks test coverage. Consider adding a test case intest_auth_redirect.pythat explicitly tests empty string input to ensure this branch is covered.