Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

folder as resource fixes #1147

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

from runhouse.logger import logger
from runhouse.resources.hardware import _current_cluster, _get_cluster_from, Cluster
from runhouse.resources.module import Module
from runhouse.resources.resource import Resource
from runhouse.rns.utils.api import generate_uuid, relative_file_path
from runhouse.utils import locate_working_dir


class Folder(Module):
class Folder(Resource):
RESOURCE_TYPE = "folder"
DEFAULT_FS = "file"
CLUSTER_FS = "ssh"
Expand All @@ -37,17 +36,21 @@ def __init__(
.. note::
To build a folder, please use the factory method :func:`folder`.
"""
super().__init__(name=name, dryrun=dryrun, system=system)
super().__init__(name=name, dryrun=dryrun)

# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.gui.FileSelector.urlpath
# Note: no longer needed as part of previous fsspec usage, but still used by some s3 / gsutil commands
self._urlpath = None

self._system = _get_cluster_from(
system or _current_cluster(key="config"), dryrun=dryrun
)

# TODO [DG] Should we ever be allowing this to be None?
if path is None:
self._path = Folder.default_path(self.rns_address, system)
self._path = Folder.default_path(self.rns_address, self._system)
else:
if system != self.DEFAULT_FS:
if self._system != self.DEFAULT_FS:
self._path = path
else:
self._path = self._path_absolute_to_rh_workdir(path)
Expand Down Expand Up @@ -115,9 +118,17 @@ def path(self):
def path(self, path):
self._path = path

@property
def system(self):
return self._system

@system.setter
def system(self, new_system: Union[str, Cluster]):
self._system = _get_cluster_from(new_system)

@property
def _fs_str(self):
if isinstance(self.system, Resource): # if system is a cluster
if isinstance(self.system, Cluster):
if self.system.on_this_cluster():
return self.DEFAULT_FS
return self.CLUSTER_FS
Expand Down Expand Up @@ -217,17 +228,7 @@ def to(

# rsync the folder contents to the cluster's destination path
logger.debug(f"Syncing folder contents to cluster in path: {dest_path}")
self._to_cluster(system, path=dest_path)

# update the folder's system + path to the relative path on the cluster, since we'll return a
# new folder module which points to the cluster's file system
self.system = system
self.path = dest_path

# Note: setting `force_install` to ensure the module gets installed the cluster
# the folder's system may already be set to a cluster, which would skip the install
logger.debug("Sending folder module to cluster")
return super().to(system=system, force_install=True)
return self._to_cluster(system, path=dest_path)

path = str(
path or Folder.default_path(self.rns_address, system)
Expand Down Expand Up @@ -530,7 +531,7 @@ def ls(self, full_paths: bool = True, sort: bool = False) -> List:
Defaults to ``False``.
"""
if self._use_http_endpoint:
self.system._folder_ls(self.path)
return self.system._folder_ls(self.path, full_paths=full_paths, sort=sort)
else:
path = Path(self.path).expanduser()
paths = [p for p in path.iterdir()]
Expand Down Expand Up @@ -766,8 +767,7 @@ def put(self, contents, overwrite=False, mode: str = "wb"):
contents (Dict[str, Any] or Resource or List[Resource]): Contents to put in folder.
Must be a dict with keys being the file names (without full paths) and values being the file-like
objects to write, or a Resource object, or a list of Resources.
overwrite (bool): Whether to dump the file contents as json. By default expects data to be encoded.
Defaults to ``False``.
overwrite (bool): Whether to overwrite the existing file if it exists. Defaults to ``False``.
mode (Optional(str)): Write mode to use. Defaults to ``wb``.

Example:
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def to(
>>> local_module = rh.module(my_class)
>>> cluster_module = local_module.to("my_cluster")
"""
if system == self.system and env == self.env and not force_install:
if system == self.system and env == self.env:
if name and not self.name == name:
# TODO return duplicate object under new name, don't rename
self.rename(name)
Expand Down
2 changes: 1 addition & 1 deletion runhouse/servers/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def folder_put(
):
folder_params = FolderPutParams(
path=path,
contents=contents,
contents=serialize_data(contents, serialization),
mode=mode,
overwrite=overwrite,
serialization=serialization,
Expand Down
9 changes: 7 additions & 2 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from runhouse.servers.http.http_utils import (
CallParams,
DeleteObjectParams,
deserialize_data,
folder_exists,
folder_get,
folder_ls,
Expand Down Expand Up @@ -677,12 +678,16 @@ async def folder_get_cmd(request: Request, get_params: FolderGetParams):
async def folder_put_cmd(request: Request, put_params: FolderPutParams):
try:
path = resolve_folder_path(put_params.path)
serialization = put_params.serialization
serialized_contents = put_params.contents
contents = deserialize_data(serialized_contents, serialization)

return folder_put(
path,
contents=put_params.contents,
contents=contents,
overwrite=put_params.overwrite,
mode=put_params.mode,
serialization=put_params.serialization,
serialization=serialization,
)

except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion runhouse/servers/http/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class FolderGetParams(FolderParams):


class FolderPutParams(FolderParams):
contents: Optional[Any]
contents: Any
overwrite: Optional[bool] = False
mode: Optional[str] = None
serialization: Optional[str] = None
Expand Down
26 changes: 8 additions & 18 deletions tests/fixtures/folder_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from pathlib import Path

import pytest

import runhouse as rh
from runhouse.constants import TEST_ORG

from tests.conftest import init_args

Expand Down Expand Up @@ -37,25 +34,18 @@ def local_folder():

@pytest.fixture
def docker_cluster_folder(docker_cluster_pk_ssh_no_auth):
local_path = Path.cwd()
dest_path = "rh-folder"

args = {
"name": f"/{TEST_ORG}/test_docker_folder",
"path": local_path,
"name": "test_docker_folder",
"system": docker_cluster_pk_ssh_no_auth,
"path": "rh-folder",
}

# Create a local folder based on the current working dir, then send it to the docker cluster as a module
docker_folder = rh.folder(**args).to(
system=docker_cluster_pk_ssh_no_auth, path=dest_path
)
assert docker_folder.system == docker_cluster_pk_ssh_no_auth

init_args[id(docker_folder)] = args
docker_folder.put(
{f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True
local_folder_docker = rh.folder(**args)
init_args[id(local_folder_docker)] = args
local_folder_docker.put(
{f"sample_file_{i}.txt": f"file{i}" for i in range(3)}, overwrite=True
)
return docker_folder
return local_folder_docker


@pytest.fixture
Expand Down
43 changes: 30 additions & 13 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,34 +916,51 @@ def test_cluster_put_and_get(self, cluster):

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_put_and_get_serialized_object(self, cluster):
from runhouse.servers.http.http_utils import deserialize_data, serialize_data
def test_cluster_put_and_get_serialization_methods(self, cluster):
from runhouse.servers.http.http_utils import deserialize_data

raw_data = [1, 2, 3]
serialization = "pickle"
serialized_data = serialize_data(raw_data, serialization)
pickle_serialization = "pickle"

cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.pickle": serialized_data},
contents={"sample.pickle": raw_data},
overwrite=True,
serialization=pickle_serialization,
)

file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.pickle")
assert deserialize_data(file_contents, serialization) == raw_data
assert deserialize_data(file_contents, pickle_serialization) == raw_data

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_put_and_rm_with_contents(self, cluster):
raw_data = "Hello World!"
json_serialization = "json"
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.text": raw_data},
overwrite=True,
serialization=json_serialization,
)

file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

with pytest.raises(AttributeError):
# No serialization specified, default mode of "wb" used which is not supported for a list
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.pickle": raw_data},
overwrite=True,
)

# with no serialization specified, but with "w" mode
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.txt": raw_data},
overwrite=True,
mode="w",
)

cluster._folder_rm(path="~/.rh/new-folder", contents=["sample.txt"])
folder_contents = cluster._folder_ls(path="~/.rh/new-folder")
assert "sample.txt" not in [os.path.basename(f) for f in folder_contents]
file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

@pytest.mark.level("local")
@pytest.mark.clustertest
Expand Down
46 changes: 46 additions & 0 deletions tests/test_resources/test_data/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,52 @@ class TestFolder(tests.test_resources.test_resource.TestResource):
"dest": _all_folder_fixtures,
}

@pytest.mark.level("local")
def test_cluster_folder_put_and_get_and_ls(self, docker_cluster_folder):
docker_cluster_folder.put({"some_file.txt": "Hello world"})
file_contents = docker_cluster_folder.get(name="some_file.txt")
assert file_contents == "Hello world"

folder_contents = docker_cluster_folder.ls(full_paths=False)
assert "some_file.txt" in folder_contents

@pytest.mark.level("local")
def test_cluster_folder_exists_and_mkdir_and_rm(self, docker_cluster_folder):
assert docker_cluster_folder.exists_in_system()
docker_cluster_folder.rm()

assert not docker_cluster_folder.exists_in_system()

docker_cluster_folder.mkdir()
assert docker_cluster_folder.exists_in_system()

@pytest.mark.level("local")
def test_cluster_folder_put_and_get_serialization_methods(
self, docker_cluster_folder
):
from runhouse.servers.http.http_utils import deserialize_data, serialize_data

pickle_serialization = "pickle"
raw_data = [1, 2, 3]
docker_cluster_folder.put(
{"some_file.pickle": serialize_data(raw_data, pickle_serialization)}
)

file_contents = docker_cluster_folder.get(name="some_file.pickle")
assert deserialize_data(file_contents, pickle_serialization) == raw_data

with pytest.raises(Exception):
docker_cluster_folder.put({"some_file.pickle": pickle.dumps(raw_data)})

json_serialization = "json"
raw_data = {"name": "Runhouse"}
docker_cluster_folder.put(
{"some_file.text": serialize_data(raw_data, json_serialization)}
)

file_contents = docker_cluster_folder.get(name="some_file.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

@pytest.mark.level("minimal")
def test_send_folder_to_dest(self, folder, dest):
_check_skip_test(folder, dest)
Expand Down
Loading
Loading