diff --git a/runhouse/resources/folders/folder.py b/runhouse/resources/folders/folder.py index 63bc0c19e..6c99fa7bf 100644 --- a/runhouse/resources/folders/folder.py +++ b/runhouse/resources/folders/folder.py @@ -467,6 +467,20 @@ def _path_absolute_to_rh_workdir(path): else str(Path(locate_working_dir()) / path) ) + @staticmethod + def _delete_contents(contents: List, folder_path: Path, recursive: bool): + for content in contents: + content_path = folder_path / content + if content_path.exists(): + if content_path.is_file(): + content_path.unlink() + elif content_path.is_dir() and recursive: + shutil.rmtree(content_path) + else: + raise ValueError( + f"Path {content_path} is a directory and recursive is set to False" + ) + @property def fsspec_url(self): """Generate the FSSpec style URL using the file system and path of the folder""" @@ -685,17 +699,7 @@ def rm(self, contents: list = None, recursive: bool = True): folder_path = Path(self.path).expanduser() if contents: - for content in contents: - content_path = folder_path / content - if content_path.exists(): - if content_path.is_file(): - content_path.unlink() - elif content_path.is_dir() and recursive: - shutil.rmtree(content_path) - else: - raise ValueError( - f"Path {content_path} is a directory and recursive is set to False" - ) + Folder._delete_contents(contents, folder_path, recursive) else: if recursive: shutil.rmtree(folder_path) diff --git a/runhouse/servers/http/http_server.py b/runhouse/servers/http/http_server.py index 69c28c6ae..27796ade5 100644 --- a/runhouse/servers/http/http_server.py +++ b/runhouse/servers/http/http_server.py @@ -36,12 +36,25 @@ from runhouse.servers.http.http_utils import ( CallParams, DeleteObjectParams, + folder_get, + folder_ls, + folder_mkdir, + folder_mv, + folder_put, + folder_rm, + FolderGetParams, + FolderLsParams, + FolderMvParams, + FolderParams, + FolderPutParams, + FolderRmParams, get_token_from_request, handle_exception_response, OutputType, PutObjectParams, PutResourceParams, RenameObjectParams, + resolve_folder_path, Response, serialize_data, ServerSettings, @@ -53,7 +66,6 @@ ) from runhouse.utils import sync_function - app = FastAPI(docs_url=None, redoc_url=None) @@ -612,6 +624,96 @@ async def rename_object(request: Request, params: RenameObjectParams): e, traceback.format_exc(), from_http_server=True ) + @staticmethod + @app.post("/folder/method/ls") + @validate_cluster_access + async def folder_ls_cmd(request: Request, ls_params: FolderLsParams): + try: + path = resolve_folder_path(ls_params.path) + return folder_ls(path, full_paths=ls_params.full_paths, sort=ls_params.sort) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + + @staticmethod + @app.post("/folder/method/mkdir") + @validate_cluster_access + async def folder_mkdir_cmd(request: Request, folder_params: FolderParams): + try: + path = resolve_folder_path(folder_params.path) + return folder_mkdir(path) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + + @staticmethod + @app.post("/folder/method/get") + @validate_cluster_access + async def folder_get_cmd(request: Request, get_params: FolderGetParams): + try: + path = resolve_folder_path(get_params.path) + return folder_get(path, mode=get_params.mode, encoding=get_params.encoding) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + + @staticmethod + @app.post("/folder/method/put") + @validate_cluster_access + async def folder_put_cmd(request: Request, put_params: FolderPutParams): + try: + path = resolve_folder_path(put_params.path) + return folder_put( + path, + overwrite=put_params.overwrite, + mode=put_params.mode, + serialization=put_params.serialization, + contents=put_params.contents, + ) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + + @staticmethod + @app.post("/folder/method/rm") + @validate_cluster_access + async def folder_rm_cmd(request: Request, rm_params: FolderRmParams): + try: + path = resolve_folder_path(rm_params.path) + return folder_rm( + path, contents=rm_params.contents, recursive=rm_params.recursive + ) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + + @staticmethod + @app.post("/folder/method/mv") + @validate_cluster_access + async def folder_mv_cmd(request: Request, mv_params: FolderMvParams): + try: + path = resolve_folder_path(mv_params.path) + return folder_mv( + src_path=path, + dest_path=mv_params.dest_path, + overwrite=mv_params.overwrite, + ) + + except Exception as e: + return handle_exception_response( + e, traceback.format_exc(), from_http_server=True + ) + @staticmethod @app.post("/delete_object") @validate_cluster_access diff --git a/runhouse/servers/http/http_utils.py b/runhouse/servers/http/http_utils.py index af4dde9fd..14e5be944 100644 --- a/runhouse/servers/http/http_utils.py +++ b/runhouse/servers/http/http_utils.py @@ -1,13 +1,15 @@ import codecs import json import re +import shutil import sys +from pathlib import Path from typing import Any, Dict, List, Optional, Union import requests from fastapi import HTTPException -from pydantic import BaseModel +from pydantic import BaseModel, validator from ray import cloudpickle as pickle from ray.exceptions import RayTaskError @@ -86,6 +88,45 @@ class OutputType: CONFIG = "config" +class FolderParams(BaseModel): + path: str + + @validator("path", pre=True, always=True) + def convert_path_to_string(cls, v): + return str(v) if v is not None else v + + +class FolderLsParams(FolderParams): + full_paths: Optional[bool] = True + sort: Optional[bool] = False + + +class FolderGetParams(FolderParams): + encoding: Optional[str] = None + mode: Optional[str] = None + + +class FolderPutParams(FolderParams): + contents: Optional[Any] + mode: Optional[str] = None + overwrite: Optional[bool] = False + serialization: Optional[str] = None + + +class FolderRmParams(FolderParams): + contents: Optional[List] = None + recursive: Optional[bool] = False + + +class FolderMvParams(FolderParams): + dest_path: str + overwrite: Optional[bool] = False + + @validator("dest_path", pre=True, always=True) + def convert_path_to_string(cls, v): + return str(v) if v is not None else v + + def pickle_b64(picklable): return codecs.encode(pickle.dumps(picklable), "base64").decode() @@ -281,3 +322,204 @@ def handle_response( elif output_type == OutputType.STDERR: res = response_data["data"] print(system_color + res + reset_color, file=sys.stderr) + + +########################### +#### Folder Operations #### +########################### + + +def resolve_folder_path(path: str): + return ( + None + if path is None + else Path(path).expanduser() + if path.startswith("~") + else Path(path).resolve() + ) + + +def folder_mkdir(path: Path): + if not path.parent.is_dir(): + raise ValueError( + f"Parent path {path.parent} does not exist or is not a directory" + ) + + path.mkdir(parents=True, exist_ok=True) + + return Response(output_type=OutputType.SUCCESS) + + +def folder_get(path: Path, mode: str = None, encoding: str = None): + mode = mode or "rb" + binary_mode = "b" in mode + + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path {path} does not exist") + + try: + with open(path, mode=mode, encoding=encoding) as f: + file_contents = f.read() + + except FileNotFoundError: + raise HTTPException(status_code=404, detail=f"File {path} not found") + + except PermissionError: + raise HTTPException( + status_code=403, detail=f"Permission denied for file in path {path}" + ) + + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error reading file {path}: {str(e)}" + ) + + if binary_mode and isinstance(file_contents, bytes): + file_contents = file_contents.decode() + + return Response( + data=file_contents, + output_type=OutputType.RESULT_SERIALIZED, + serialization=None, + ) + + +def folder_put( + path: Path, + overwrite: bool, + mode: str = None, + serialization: str = None, + contents: Dict[str, Any] = None, +): + mode = mode or "wb" + + if contents and not isinstance(contents, dict): + raise HTTPException( + status_code=422, + detail="`contents` argument must be a dict mapping filenames to file-like objects", + ) + + path.mkdir(parents=True, exist_ok=True) + + if overwrite is False: + existing_files = {str(item.name) for item in path.iterdir()} + intersection = existing_files.intersection(set(contents.keys())) + if intersection: + raise HTTPException( + status_code=409, + detail=f"File(s) {intersection} already exist(s) at path: {path}", + ) + + for filename, file_obj in contents.items(): + binary_mode = "b" in mode + + if serialization: + file_obj = serialize_data(file_obj, serialization) + + if binary_mode and not isinstance(file_obj, bytes): + file_obj = file_obj.encode() + + file_path = path / filename + if not overwrite and file_path.exists(): + raise HTTPException( + status_code=409, detail=f"File {file_path} already exists" + ) + + try: + with open(file_path, mode) as f: + f.write(file_obj) + except Exception as e: + HTTPException(status_code=500, detail=f"Failed to write file: {str(e)}") + + return Response(output_type=OutputType.SUCCESS) + + +def folder_ls(path: Path, full_paths: bool, sort: bool): + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path {path} does not exist") + + if not path.is_dir(): + raise HTTPException(status_code=400, detail=f"Path {path} is not a directory") + + paths = [p for p in path.iterdir()] + + # Sort the paths by modification time if sort is True + if sort: + paths.sort(key=lambda p: p.stat().st_mtime, reverse=True) + + # Convert paths to strings and format them based on full_paths + if full_paths: + files = [str(p.resolve()) for p in paths] + else: + files = [p.name for p in paths] + + return Response( + data=files, + output_type=OutputType.RESULT_SERIALIZED, + serialization=None, + ) + + +def folder_rm(path: Path, contents: List[str], recursive: bool): + if contents: + from runhouse import Folder + + try: + Folder._delete_contents(contents, path, recursive) + except ValueError as e: + raise HTTPException( + status_code=400, + detail=str(e), + ) + + return Response(output_type=OutputType.SUCCESS) + + if not path.is_dir(): + path.unlink() + return Response(output_type=OutputType.SUCCESS) + + if recursive: + shutil.rmtree(path) + return Response(output_type=OutputType.SUCCESS) + + items = list(path.iterdir()) + if not items: + # Remove the empty directory + path.rmdir() + return Response(output_type=OutputType.SUCCESS) + + # Remove file contents, but not the directory itself (since recursive not set to `True`) + for item in items: + if item.is_file(): + item.unlink() + else: + raise HTTPException( + status_code=400, + detail=f"Folder {item} found in {path}, recursive is set to `False`", + ) + + return Response(output_type=OutputType.SUCCESS) + + +def folder_mv(src_path: Path, dest_path: str, overwrite: bool): + dest_path = resolve_folder_path(dest_path) + + if not src_path.exists(): + raise HTTPException( + status_code=404, detail=f"The source path {src_path} does not exist" + ) + + if not overwrite and dest_path.exists(): + raise HTTPException( + status_code=409, + detail=f"The destination path {dest_path} already exists. Set `overwrite` to `True` to " + f"overwrite the destination path", + ) + + # Create the destination directory if it doesn't exist and overwrite is set to `True` + dest_path.parent.mkdir(parents=True, exist_ok=overwrite) + + # Move the directory + shutil.move(str(src_path), str(dest_path)) + + return Response(output_type=OutputType.SUCCESS)