From ef241e7d8dff5a77801fa2eda1a3f59af5341fb0 Mon Sep 17 00:00:00 2001 From: anon Date: Thu, 12 Dec 2024 16:56:54 +0100 Subject: [PATCH 1/5] Add missing requirement --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 7dfe2bae..d7845d58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ fastapi[standard] uvicorn fastapi-analytics -slowapi \ No newline at end of file +slowapi +tokencost \ No newline at end of file From a203a9257ef75daa3b73fbdb5c571981f830465d Mon Sep 17 00:00:00 2001 From: nol_tech Date: Thu, 12 Dec 2024 17:10:54 +0100 Subject: [PATCH 2/5] Prevent following symlinks outside of cloned repository --- src/ingest.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/ingest.py b/src/ingest.py index 650c202b..e57cfb7a 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -15,6 +15,19 @@ def should_ignore(path: str, base_path: str, ignore_patterns: List[str]) -> bool return True return False +def is_safe_symlink(symlink_path: str, base_path: str) -> bool: + """Check if a symlink points to a location within the base directory.""" + try: + # Get the absolute path of the symlink target + target_path = os.path.realpath(symlink_path) + # Get the absolute path of the base directory + base_path = os.path.realpath(base_path) + # Check if the target path starts with the base path + return os.path.commonpath([target_path]) == os.path.commonpath([target_path, base_path]) + except (OSError, ValueError): + # If there's any error resolving the paths, consider it unsafe + return False + def is_text_file(file_path: str) -> bool: """Determines if a file is likely a text file based on its content.""" try: @@ -51,6 +64,41 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic if should_ignore(item_path, base_path, ignore_patterns): continue + # Handle symlinks + if os.path.islink(item_path): + if not is_safe_symlink(item_path, base_path): + print(f"Skipping symlink that points outside base directory: {item_path}") + continue + # Get the real path for further checks + real_path = os.path.realpath(item_path) + # Use the real path for file operations but keep original path for display + if os.path.isfile(real_path): + file_size = os.path.getsize(real_path) + is_text = is_text_file(real_path) + content = read_file_content(real_path) if is_text else "[Non-text file]" + + child = { + "name": item, + "type": "file", + "size": file_size, + "content": content, + "path": item_path # Keep the original path + } + result["children"].append(child) + result["size"] += file_size + result["file_count"] += 1 + + elif os.path.isdir(real_path): + subdir = scan_directory(real_path, ignore_patterns, base_path) + if subdir: + subdir["name"] = item # Keep the original name + subdir["path"] = item_path # Keep the original path + result["children"].append(subdir) + result["size"] += subdir["size"] + result["file_count"] += subdir["file_count"] + result["dir_count"] += 1 + subdir["dir_count"] + continue + if os.path.isfile(item_path): file_size = os.path.getsize(item_path) is_text = is_text_file(item_path) From 6e4ecb41e02188672891595c7f76ecba29c3decb Mon Sep 17 00:00:00 2001 From: nol_tech Date: Thu, 12 Dec 2024 17:10:54 +0100 Subject: [PATCH 3/5] Launch as non root user in Dockerfile --- Dockerfile | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Dockerfile b/Dockerfile index bd475989..95147895 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,9 +2,18 @@ FROM python:3.12 WORKDIR /app +# Create a non-root user +RUN useradd -m -u 1000 appuser + COPY src/ ./ COPY requirements.txt ./ RUN pip install -r requirements.txt +# Change ownership of the application files +RUN chown -R appuser:appuser /app + +# Switch to non-root user +USER appuser + CMD ["uvicorn", "main:app", "--reload"] From f568a957fbd955cb3428b37618095bcc56c18261 Mon Sep 17 00:00:00 2001 From: nol_tech Date: Thu, 12 Dec 2024 17:10:54 +0100 Subject: [PATCH 4/5] Prevent circular links --- src/ingest.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index e57cfb7a..14574828 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -45,8 +45,17 @@ def read_file_content(file_path: str) -> str: except Exception as e: return f"Error reading file: {str(e)}" -def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dict: +def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_paths: set = None) -> Dict: """Recursively analyzes a directory and its contents.""" + if seen_paths is None: + seen_paths = set() + + real_path = os.path.realpath(path) + if real_path in seen_paths: + print(f"Skipping already visited path: {path}") + return None + seen_paths.add(real_path) + result = { "name": os.path.basename(path), "type": "directory", @@ -71,6 +80,9 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic continue # Get the real path for further checks real_path = os.path.realpath(item_path) + if real_path in seen_paths: + print(f"Skipping already visited symlink target: {item_path}") + continue # Use the real path for file operations but keep original path for display if os.path.isfile(real_path): file_size = os.path.getsize(real_path) @@ -89,7 +101,7 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic result["file_count"] += 1 elif os.path.isdir(real_path): - subdir = scan_directory(real_path, ignore_patterns, base_path) + subdir = scan_directory(real_path, ignore_patterns, base_path, seen_paths) if subdir: subdir["name"] = item # Keep the original name subdir["path"] = item_path # Keep the original path @@ -116,7 +128,7 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic result["file_count"] += 1 elif os.path.isdir(item_path): - subdir = scan_directory(item_path, ignore_patterns, base_path) + subdir = scan_directory(item_path, ignore_patterns, base_path, seen_paths) if subdir: result["children"].append(subdir) result["size"] += subdir["size"] From 6a0b5d86c70bc9485cd2a64c79376e629462cdd2 Mon Sep 17 00:00:00 2001 From: nol_tech Date: Thu, 12 Dec 2024 17:10:54 +0100 Subject: [PATCH 5/5] Add depth and size limits to avoid DOS --- src/ingest.py | 62 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index 14574828..82ee0a77 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -4,6 +4,10 @@ from tokencost import count_string_tokens from typing import Dict, List, Union +MAX_DIRECTORY_DEPTH = 10 # Maximum depth of directory traversal +MAX_FILES = 10000 # Maximum number of files to process +MAX_TOTAL_SIZE_BYTES = 100 * 1024 * 1024 # 100MB total size limit + def should_ignore(path: str, base_path: str, ignore_patterns: List[str]) -> bool: """Checks if a file or directory should be ignored based on patterns.""" name = os.path.basename(path) @@ -45,10 +49,27 @@ def read_file_content(file_path: str) -> str: except Exception as e: return f"Error reading file: {str(e)}" -def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_paths: set = None) -> Dict: - """Recursively analyzes a directory and its contents.""" +def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_paths: set = None, depth: int = 0, stats: Dict = None) -> Dict: + """Recursively analyzes a directory and its contents with safety limits.""" if seen_paths is None: seen_paths = set() + if stats is None: + stats = {"total_files": 0, "total_size": 0} + + # Check depth limit + if depth > MAX_DIRECTORY_DEPTH: + print(f"Skipping deep directory: {path} (max depth {MAX_DIRECTORY_DEPTH} reached)") + return None + + # Check total files limit + if stats["total_files"] >= MAX_FILES: + print(f"Skipping further processing: maximum file limit ({MAX_FILES}) reached") + return None + + # Check total size limit + if stats["total_size"] >= MAX_TOTAL_SIZE_BYTES: + print(f"Skipping further processing: maximum total size ({MAX_TOTAL_SIZE_BYTES/1024/1024:.1f}MB) reached") + return None real_path = os.path.realpath(path) if real_path in seen_paths: @@ -78,14 +99,25 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_p if not is_safe_symlink(item_path, base_path): print(f"Skipping symlink that points outside base directory: {item_path}") continue - # Get the real path for further checks real_path = os.path.realpath(item_path) if real_path in seen_paths: print(f"Skipping already visited symlink target: {item_path}") continue - # Use the real path for file operations but keep original path for display + if os.path.isfile(real_path): file_size = os.path.getsize(real_path) + # Check if adding this file would exceed total size limit + if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES: + print(f"Skipping file {item_path}: would exceed total size limit") + continue + + stats["total_files"] += 1 + stats["total_size"] += file_size + + if stats["total_files"] > MAX_FILES: + print(f"Maximum file limit ({MAX_FILES}) reached") + return result + is_text = is_text_file(real_path) content = read_file_content(real_path) if is_text else "[Non-text file]" @@ -94,17 +126,17 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_p "type": "file", "size": file_size, "content": content, - "path": item_path # Keep the original path + "path": item_path } result["children"].append(child) result["size"] += file_size result["file_count"] += 1 elif os.path.isdir(real_path): - subdir = scan_directory(real_path, ignore_patterns, base_path, seen_paths) + subdir = scan_directory(real_path, ignore_patterns, base_path, seen_paths, depth + 1, stats) if subdir: - subdir["name"] = item # Keep the original name - subdir["path"] = item_path # Keep the original path + subdir["name"] = item + subdir["path"] = item_path result["children"].append(subdir) result["size"] += subdir["size"] result["file_count"] += subdir["file_count"] @@ -113,6 +145,18 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_p if os.path.isfile(item_path): file_size = os.path.getsize(item_path) + # Check if adding this file would exceed total size limit + if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES: + print(f"Skipping file {item_path}: would exceed total size limit") + continue + + stats["total_files"] += 1 + stats["total_size"] += file_size + + if stats["total_files"] > MAX_FILES: + print(f"Maximum file limit ({MAX_FILES}) reached") + return result + is_text = is_text_file(item_path) content = read_file_content(item_path) if is_text else "[Non-text file]" @@ -128,7 +172,7 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_p result["file_count"] += 1 elif os.path.isdir(item_path): - subdir = scan_directory(item_path, ignore_patterns, base_path, seen_paths) + subdir = scan_directory(item_path, ignore_patterns, base_path, seen_paths, depth + 1, stats) if subdir: result["children"].append(subdir) result["size"] += subdir["size"]