diff --git a/python/packages/autogen-ext/src/autogen_ext/agents/file_surfer/_markdown_file_browser.py b/python/packages/autogen-ext/src/autogen_ext/agents/file_surfer/_markdown_file_browser.py index e904e2be9f37..93d693274ca6 100644 --- a/python/packages/autogen-ext/src/autogen_ext/agents/file_surfer/_markdown_file_browser.py +++ b/python/packages/autogen-ext/src/autogen_ext/agents/file_surfer/_markdown_file_browser.py @@ -17,14 +17,18 @@ class MarkdownFileBrowser: # TODO: Fix unfollowed import def __init__( # type: ignore - self, viewport_size: Union[int, None] = 1024 * 8, base_path: str = os.getcwd() + self, + viewport_size: Union[int, None] = 1024 * 8, + base_path: str | None = os.getcwd(), + cwd: str | None = None, ): """ Instantiate a new MarkdownFileBrowser. Arguments: viewport_size: Approximately how many *characters* fit in the viewport. Viewport dimensions are adjusted dynamically to avoid cutting off words (default: 8192). - base_path: The base path to use for the file browser. Defaults to the current working directory. + base_path: The base path to use for the file browser. Files outside this path cannot be accessed. Defaults to the current working directory. + cwd: The browser's current working directory. Defaults to the system's current working directory. """ self.viewport_size = viewport_size # Applies only to the standard uri types self.history: List[Tuple[str, float]] = list() @@ -32,19 +36,54 @@ def __init__( # type: ignore self.viewport_current_page = 0 self.viewport_pages: List[Tuple[int, int]] = list() self._markdown_converter = MarkItDown() - self._base_path = base_path + self._base_path = None if base_path is None else os.path.realpath(base_path) self._page_content: str = "" self._find_on_page_query: Union[str, None] = None self._find_on_page_last_result: Union[int, None] = None # Location of the last result - self.set_path(self._base_path) + + # Set the working directory + if cwd is None: + if self._validate_path(os.getcwd()): + # Use the current working directory if it's in the base path + cwd = os.path.realpath(os.getcwd()) + elif self._base_path is not None: + # Otherwise, use the base path + cwd = os.path.realpath(self._base_path) + else: + raise ValueError("No valid working directory (cwd) provided.") + elif not self._validate_path(cwd): + # A cwd was provided, but it is not valid + raise ValueError(f"Working directory (cwd) '{cwd}' is not valid. It must be within the base path.") + + # Populate the history with the current working directory + self.set_path(os.path.realpath(cwd)) @property def path(self) -> str: """Return the path of the current page.""" - if len(self.history) == 0: - return self._base_path - else: - return self.history[-1][0] + assert len(self.history) > 0 + return self.history[-1][0] + + def _validate_path(self, path: str) -> bool: + """Validates the path to ensure it is within the base path. + + Arguments: + path: The path to validate. + Returns: + True if the path is valid, False otherwise. + """ + if self._base_path is None: + return True + + # Normalize the paths + path = os.path.realpath(path) + base = os.path.realpath(self._base_path) + + # Check if the path is within the base path + if os.path.commonpath([path, base]) != base: + return False + + return True def set_path(self, path: str) -> None: """Sets the path of the current page. @@ -63,6 +102,9 @@ def set_path(self, path: str) -> None: path = os.path.abspath(os.path.join(self.path, path)) # If neither a file or a directory, take it verbatim + # Validating the path wrt. the base path is done in _open_path + path = os.path.realpath(path) + self.history.append((path, time.time())) self._open_path(path) self.viewport_current_page = 0 @@ -204,26 +246,35 @@ def _open_path( Arguments: path: The path of the file or directory to open. """ - try: - if os.path.isdir(path): # TODO: Fix markdown_converter types - res = self._markdown_converter.convert_stream( # type: ignore - io.BytesIO(self._fetch_local_dir(path).encode("utf-8")), file_extension=".txt" - ) - self.page_title = res.title - self._set_page_content(res.text_content, split_pages=False) - else: - res = self._markdown_converter.convert_local(path) - self.page_title = res.title - self._set_page_content(res.text_content) - except UnsupportedFormatException: - self.page_title = "UnsupportedFormatException" - self._set_page_content(f"# Cannot preview '{path}' as Markdown.") - except FileConversionException: - self.page_title = "FileConversionException." - self._set_page_content(f"# Error converting '{path}' to Markdown.") - except FileNotFoundError: + + if not self._validate_path(path): + # Not robust to TOCTOU issues. + # Mitigate by running with limited permissions, or use a sandbox. self.page_title = "FileNotFoundError" - self._set_page_content(f"# File not found: {path}") + self._set_page_content(f"# FileNotFoundError\n\nFile not found: {path}") + else: + try: + if os.path.isdir(path): # TODO: Fix markdown_converter types + res = self._markdown_converter.convert_stream( # type: ignore + io.BytesIO(self._fetch_local_dir(path).encode("utf-8")), file_extension=".txt" + ) + assert self._validate_path(path) + self.page_title = res.title + self._set_page_content(res.text_content, split_pages=False) + else: + res = self._markdown_converter.convert_local(path) + assert self._validate_path(path) + self.page_title = res.title + self._set_page_content(res.text_content) + except UnsupportedFormatException: + self.page_title = "UnsupportedFormatException" + self._set_page_content(f"# UnsupportedFormatException\n\nCannot preview '{path}' as Markdown.") + except FileConversionException: + self.page_title = "FileConversionException." + self._set_page_content(f"# FileConversionException\n\nError converting '{path}' to Markdown.") + except FileNotFoundError: + self.page_title = "FileNotFoundError" + self._set_page_content(f"# FileNotFoundError\n\nFile not found: {path}") def _fetch_local_dir(self, local_path: str) -> str: """Render a local directory listing in HTML to assist with local file browsing via the "file://" protocol.