Skip to content

Commit

Permalink
folder as resource fixes (#1147)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 authored Aug 15, 2024
1 parent c4bc995 commit bc54e31
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 73 deletions.
40 changes: 20 additions & 20 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

from runhouse.logger import logger
from runhouse.resources.hardware import _current_cluster, _get_cluster_from, Cluster
from runhouse.resources.module import Module
from runhouse.resources.resource import Resource
from runhouse.rns.utils.api import generate_uuid, relative_file_path
from runhouse.utils import locate_working_dir


class Folder(Module):
class Folder(Resource):
RESOURCE_TYPE = "folder"
DEFAULT_FS = "file"
CLUSTER_FS = "ssh"
Expand All @@ -37,17 +36,21 @@ def __init__(
.. note::
To build a folder, please use the factory method :func:`folder`.
"""
super().__init__(name=name, dryrun=dryrun, system=system)
super().__init__(name=name, dryrun=dryrun)

# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.gui.FileSelector.urlpath
# Note: no longer needed as part of previous fsspec usage, but still used by some s3 / gsutil commands
self._urlpath = None

self._system = _get_cluster_from(
system or _current_cluster(key="config"), dryrun=dryrun
)

# TODO [DG] Should we ever be allowing this to be None?
if path is None:
self._path = Folder.default_path(self.rns_address, system)
self._path = Folder.default_path(self.rns_address, self._system)
else:
if system != self.DEFAULT_FS:
if self._system != self.DEFAULT_FS:
self._path = path
else:
self._path = self._path_absolute_to_rh_workdir(path)
Expand Down Expand Up @@ -115,9 +118,17 @@ def path(self):
def path(self, path):
self._path = path

@property
def system(self):
return self._system

@system.setter
def system(self, new_system: Union[str, Cluster]):
self._system = _get_cluster_from(new_system)

@property
def _fs_str(self):
if isinstance(self.system, Resource): # if system is a cluster
if isinstance(self.system, Cluster):
if self.system.on_this_cluster():
return self.DEFAULT_FS
return self.CLUSTER_FS
Expand Down Expand Up @@ -217,17 +228,7 @@ def to(

# rsync the folder contents to the cluster's destination path
logger.debug(f"Syncing folder contents to cluster in path: {dest_path}")
self._to_cluster(system, path=dest_path)

# update the folder's system + path to the relative path on the cluster, since we'll return a
# new folder module which points to the cluster's file system
self.system = system
self.path = dest_path

# Note: setting `force_install` to ensure the module gets installed the cluster
# the folder's system may already be set to a cluster, which would skip the install
logger.debug("Sending folder module to cluster")
return super().to(system=system, force_install=True)
return self._to_cluster(system, path=dest_path)

path = str(
path or Folder.default_path(self.rns_address, system)
Expand Down Expand Up @@ -530,7 +531,7 @@ def ls(self, full_paths: bool = True, sort: bool = False) -> List:
Defaults to ``False``.
"""
if self._use_http_endpoint:
self.system._folder_ls(self.path)
return self.system._folder_ls(self.path, full_paths=full_paths, sort=sort)
else:
path = Path(self.path).expanduser()
paths = [p for p in path.iterdir()]
Expand Down Expand Up @@ -766,8 +767,7 @@ def put(self, contents, overwrite=False, mode: str = "wb"):
contents (Dict[str, Any] or Resource or List[Resource]): Contents to put in folder.
Must be a dict with keys being the file names (without full paths) and values being the file-like
objects to write, or a Resource object, or a list of Resources.
overwrite (bool): Whether to dump the file contents as json. By default expects data to be encoded.
Defaults to ``False``.
overwrite (bool): Whether to overwrite the existing file if it exists. Defaults to ``False``.
mode (Optional(str)): Write mode to use. Defaults to ``wb``.
Example:
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def to(
>>> local_module = rh.module(my_class)
>>> cluster_module = local_module.to("my_cluster")
"""
if system == self.system and env == self.env and not force_install:
if system == self.system and env == self.env:
if name and not self.name == name:
# TODO return duplicate object under new name, don't rename
self.rename(name)
Expand Down
2 changes: 1 addition & 1 deletion runhouse/servers/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def folder_put(
):
folder_params = FolderPutParams(
path=path,
contents=contents,
contents=serialize_data(contents, serialization),
mode=mode,
overwrite=overwrite,
serialization=serialization,
Expand Down
9 changes: 7 additions & 2 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from runhouse.servers.http.http_utils import (
CallParams,
DeleteObjectParams,
deserialize_data,
folder_exists,
folder_get,
folder_ls,
Expand Down Expand Up @@ -677,12 +678,16 @@ async def folder_get_cmd(request: Request, get_params: FolderGetParams):
async def folder_put_cmd(request: Request, put_params: FolderPutParams):
try:
path = resolve_folder_path(put_params.path)
serialization = put_params.serialization
serialized_contents = put_params.contents
contents = deserialize_data(serialized_contents, serialization)

return folder_put(
path,
contents=put_params.contents,
contents=contents,
overwrite=put_params.overwrite,
mode=put_params.mode,
serialization=put_params.serialization,
serialization=serialization,
)

except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion runhouse/servers/http/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class FolderGetParams(FolderParams):


class FolderPutParams(FolderParams):
contents: Optional[Any]
contents: Any
overwrite: Optional[bool] = False
mode: Optional[str] = None
serialization: Optional[str] = None
Expand Down
26 changes: 8 additions & 18 deletions tests/fixtures/folder_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from pathlib import Path

import pytest

import runhouse as rh
from runhouse.constants import TEST_ORG

from tests.conftest import init_args

Expand Down Expand Up @@ -37,25 +34,18 @@ def local_folder():

@pytest.fixture
def docker_cluster_folder(docker_cluster_pk_ssh_no_auth):
local_path = Path.cwd()
dest_path = "rh-folder"

args = {
"name": f"/{TEST_ORG}/test_docker_folder",
"path": local_path,
"name": "test_docker_folder",
"system": docker_cluster_pk_ssh_no_auth,
"path": "rh-folder",
}

# Create a local folder based on the current working dir, then send it to the docker cluster as a module
docker_folder = rh.folder(**args).to(
system=docker_cluster_pk_ssh_no_auth, path=dest_path
)
assert docker_folder.system == docker_cluster_pk_ssh_no_auth

init_args[id(docker_folder)] = args
docker_folder.put(
{f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True
local_folder_docker = rh.folder(**args)
init_args[id(local_folder_docker)] = args
local_folder_docker.put(
{f"sample_file_{i}.txt": f"file{i}" for i in range(3)}, overwrite=True
)
return docker_folder
return local_folder_docker


@pytest.fixture
Expand Down
43 changes: 30 additions & 13 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,34 +916,51 @@ def test_cluster_put_and_get(self, cluster):

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_put_and_get_serialized_object(self, cluster):
from runhouse.servers.http.http_utils import deserialize_data, serialize_data
def test_cluster_put_and_get_serialization_methods(self, cluster):
from runhouse.servers.http.http_utils import deserialize_data

raw_data = [1, 2, 3]
serialization = "pickle"
serialized_data = serialize_data(raw_data, serialization)
pickle_serialization = "pickle"

cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.pickle": serialized_data},
contents={"sample.pickle": raw_data},
overwrite=True,
serialization=pickle_serialization,
)

file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.pickle")
assert deserialize_data(file_contents, serialization) == raw_data
assert deserialize_data(file_contents, pickle_serialization) == raw_data

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_put_and_rm_with_contents(self, cluster):
raw_data = "Hello World!"
json_serialization = "json"
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.text": raw_data},
overwrite=True,
serialization=json_serialization,
)

file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

with pytest.raises(AttributeError):
# No serialization specified, default mode of "wb" used which is not supported for a list
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.pickle": raw_data},
overwrite=True,
)

# with no serialization specified, but with "w" mode
cluster._folder_put(
path="~/.rh/new-folder",
contents={"sample.txt": raw_data},
overwrite=True,
mode="w",
)

cluster._folder_rm(path="~/.rh/new-folder", contents=["sample.txt"])
folder_contents = cluster._folder_ls(path="~/.rh/new-folder")
assert "sample.txt" not in [os.path.basename(f) for f in folder_contents]
file_contents = cluster._folder_get(path="~/.rh/new-folder/sample.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

@pytest.mark.level("local")
@pytest.mark.clustertest
Expand Down
46 changes: 46 additions & 0 deletions tests/test_resources/test_data/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,52 @@ class TestFolder(tests.test_resources.test_resource.TestResource):
"dest": _all_folder_fixtures,
}

@pytest.mark.level("local")
def test_cluster_folder_put_and_get_and_ls(self, docker_cluster_folder):
docker_cluster_folder.put({"some_file.txt": "Hello world"})
file_contents = docker_cluster_folder.get(name="some_file.txt")
assert file_contents == "Hello world"

folder_contents = docker_cluster_folder.ls(full_paths=False)
assert "some_file.txt" in folder_contents

@pytest.mark.level("local")
def test_cluster_folder_exists_and_mkdir_and_rm(self, docker_cluster_folder):
assert docker_cluster_folder.exists_in_system()
docker_cluster_folder.rm()

assert not docker_cluster_folder.exists_in_system()

docker_cluster_folder.mkdir()
assert docker_cluster_folder.exists_in_system()

@pytest.mark.level("local")
def test_cluster_folder_put_and_get_serialization_methods(
self, docker_cluster_folder
):
from runhouse.servers.http.http_utils import deserialize_data, serialize_data

pickle_serialization = "pickle"
raw_data = [1, 2, 3]
docker_cluster_folder.put(
{"some_file.pickle": serialize_data(raw_data, pickle_serialization)}
)

file_contents = docker_cluster_folder.get(name="some_file.pickle")
assert deserialize_data(file_contents, pickle_serialization) == raw_data

with pytest.raises(Exception):
docker_cluster_folder.put({"some_file.pickle": pickle.dumps(raw_data)})

json_serialization = "json"
raw_data = {"name": "Runhouse"}
docker_cluster_folder.put(
{"some_file.text": serialize_data(raw_data, json_serialization)}
)

file_contents = docker_cluster_folder.get(name="some_file.text")
assert deserialize_data(file_contents, json_serialization) == raw_data

@pytest.mark.level("minimal")
def test_send_folder_to_dest(self, folder, dest):
_check_skip_test(folder, dest)
Expand Down
Loading

0 comments on commit bc54e31

Please sign in to comment.