Skip to content

Commit

Permalink
convert cluster rsync to public method (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 authored Aug 4, 2024
1 parent 17500c8 commit 394cf06
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 19 deletions.
6 changes: 2 additions & 4 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ def _to_cluster(self, dest_cluster, path=None):
if self._fs_str == self.DEFAULT_FS and dest_cluster.name is not None:
# Includes case where we're on the cluster itself
# And the destination is a cluster, not rh.here
dest_cluster._rsync(
source=self.path, dest=dest_path, up=True, contents=True
)
dest_cluster.rsync(source=self.path, dest=dest_path, up=True, contents=True)

elif isinstance(self.system, Resource):
if self.system.endpoint(external=False) == dest_cluster.endpoint(
Expand Down Expand Up @@ -371,7 +369,7 @@ def _cluster_to_local(self, cluster, dest_path):
if not cluster.address:
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/gcs_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _cluster_to_local(self, cluster, dest_path):
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)

cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/s3_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _cluster_to_local(self, cluster, dest_path):
raise ValueError("Cluster must be started before copying data from it.")

Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
30 changes: 23 additions & 7 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def _sync_runhouse_to_cluster(
local_rh_package_path = local_rh_package_path.parent
dest_path = f"~/{local_rh_package_path.name}"

self._rsync(
self.rsync(
source=str(local_rh_package_path),
dest=dest_path,
node="all",
Expand Down Expand Up @@ -1099,7 +1099,7 @@ def __getstate__(self):
return state

# ----------------- SSH Methods ----------------- #
def _rsync(
def rsync(
self,
source: str,
dest: str,
Expand All @@ -1112,8 +1112,24 @@ def _rsync(
"""
Sync the contents of the source directory into the destination.
.. note:
Ending `source` with a slash will copy the contents of the directory into dest,
Args:
source (str): The source path.
dest (str): The target path.
up (bool): The direction of the sync. If ``True``, will rsync from local to cluster. If ``False``
will rsync from cluster to local.
node (Optional[str]): Specific cluster node to rsync to. If not specified will use the address of the
cluster's head node.
contents (Optional[bool]): Whether the contents of the source directory or the directory itself should
be copied to destination.
If ``True`` the contents of the source directory are copied to the destination, and the source
directory itself is not created at the destination.
If ``False`` the source directory along with its contents are copied ot the destination, creating
an additional directory layer at the destination. (Default: ``False``).
filter_options (Optional[str]): The filter options for rsync.
stream_logs (Optional[bool]): Whether to stream logs to the stdout/stderr. (Default: ``False``).
.. note::
Ending ``source`` with a slash will copy the contents of the directory into dest,
while omitting it will copy the directory itself (adding a directory layer).
"""
# Theoretically we could reuse this logic from SkyPilot which Rsyncs to all nodes in parallel:
Expand All @@ -1122,7 +1138,7 @@ def _rsync(
# If we need to change it to be greedier we can.
if up and (node == "all" or (len(self.ips) > 1 and not node)):
for node in self.ips:
self._rsync(
self.rsync(
source,
dest,
up=up,
Expand Down Expand Up @@ -1269,7 +1285,7 @@ def _copy_certs_to_cluster(self):
# Copy to the home directory by default
source = str(Path(self.cert_config.key_path).parent)
dest = self.cert_config.DEFAULT_CLUSTER_DIR
self._rsync(source, dest, up=True)
self.rsync(source, dest, up=True)

if self._use_caddy:
# Move to the Caddy directory to ensure the daemon has access to the certs
Expand Down Expand Up @@ -1581,7 +1597,7 @@ def notebook(
if sync_package_on_close == "./":
sync_package_on_close = locate_working_dir()
pkg = Package.from_string("local:" + sync_package_on_close)
self._rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False)
self.rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False)
if not persist:
tunnel.terminate()
kill_jupyter_cmd = f"jupyter notebook stop {port_fwd}"
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def endpoint(self, external=False):
def _copy_sky_yaml_from_cluster(self, abs_yaml_path: str):
if not Path(abs_yaml_path).exists():
Path(abs_yaml_path).parent.mkdir(parents=True, exist_ok=True)
self._rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False)
self.rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False)

# Save SSH info to the ~/.ssh/config
ray_yaml = yaml.safe_load(open(abs_yaml_path, "r"))
Expand Down
6 changes: 3 additions & 3 deletions runhouse/resources/hardware/sagemaker/sagemaker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ def _load_authorized_keys(self, bucket, auth_keys_file) -> Union[str, None]:
# -------------------------------------------------------
# Cluster Helpers
# -------------------------------------------------------
def _rsync(self, source: str, dest: str, up: bool, contents: bool = False):
def rsync(self, source: str, dest: str, up: bool, contents: bool = False):
source = source + "/" if not source.endswith("/") else source
dest = dest + "/" if not dest.endswith("/") else dest

Expand Down Expand Up @@ -1308,7 +1308,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non
self.connect_server_client()

# Sync the local ~/.rh directory to the cluster
self._rsync(
self.rsync(
source=str(Path("~/.rh").expanduser()),
dest="~/.rh",
up=True,
Expand All @@ -1332,7 +1332,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non
local_rh_package_path = local_rh_package_path.parent
dest_path = f"~/{local_rh_package_path.name}"

self._rsync(
self.rsync(
source=str(local_rh_package_path),
dest=dest_path,
up=True,
Expand Down
4 changes: 3 additions & 1 deletion tests/fixtures/folder_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def cluster_folder(ondemand_aws_cluster):

cluster_folder = rh.folder(**args).to(system=ondemand_aws_cluster)
init_args[id(cluster_folder)] = args
cluster_folder.put({f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)})
cluster_folder.put(
{f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True
)
return cluster_folder


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_cluster):
dest_path = f"~/{local_rh_package_path.name}"

# Rsync Runhouse package onto the worker node
multinode_cpu_cluster._rsync(
multinode_cpu_cluster.rsync(
source=str(local_rh_package_path),
dest=dest_path,
up=True,
Expand Down
24 changes: 24 additions & 0 deletions tests/test_resources/test_data/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ def test_send_folder_to_dest(self, folder, dest):

new_folder.rm()

@pytest.mark.level("minimal")
def test_cluster_folder_sync_upload(self, local_folder, cluster):
cluster_dest = rh.Folder.DEFAULT_CACHE_FOLDER
cluster.rsync(source=local_folder.path, dest=cluster_dest, up=True)
res = cluster.run([f"ls -l {cluster_dest}"])

assert all(f"sample_file_{i}.txt" in res[0][1] for i in range(3))

@pytest.mark.level("minimal")
def test_cluster_folder_sync_download(self, cluster_folder):
import subprocess

cluster = cluster_folder.system
local_dest = rh.Folder.DEFAULT_CACHE_FOLDER

cluster.rsync(
source=cluster_folder.path, dest=local_dest, contents=True, up=False
)
res = subprocess.run(
f"ls -l {local_dest}", shell=True, check=True, capture_output=True
)

assert all(f"sample_file_{i}.txt" in res.stdout.decode() for i in range(3))

@pytest.mark.level("minimal")
def test_send_folder_to_cluster(self, cluster):
path = Path.cwd()
Expand Down

0 comments on commit 394cf06

Please sign in to comment.