From cc87e69a4a7d1e356aa840fa39e761dcc5111b9a Mon Sep 17 00:00:00 2001 From: jlewitt1 Date: Mon, 29 Jul 2024 19:55:28 +0300 Subject: [PATCH] convert cluster rsync to public method --- runhouse/resources/folders/folder.py | 6 ++-- runhouse/resources/folders/gcs_folder.py | 2 +- runhouse/resources/folders/s3_folder.py | 2 +- runhouse/resources/hardware/cluster.py | 30 ++++++++++++++----- .../resources/hardware/on_demand_cluster.py | 2 +- .../hardware/sagemaker/sagemaker_cluster.py | 6 ++-- tests/fixtures/folder_fixtures.py | 4 ++- .../test_clusters/test_multinode_cluster.py | 2 +- tests/test_resources/test_data/test_folder.py | 24 +++++++++++++++ 9 files changed, 59 insertions(+), 19 deletions(-) diff --git a/runhouse/resources/folders/folder.py b/runhouse/resources/folders/folder.py index 5650ba1ad..a5fe0f94f 100644 --- a/runhouse/resources/folders/folder.py +++ b/runhouse/resources/folders/folder.py @@ -307,9 +307,7 @@ def _to_cluster(self, dest_cluster, path=None): if self._fs_str == self.DEFAULT_FS and dest_cluster.name is not None: # Includes case where we're on the cluster itself # And the destination is a cluster, not rh.here - dest_cluster._rsync( - source=self.path, dest=dest_path, up=True, contents=True - ) + dest_cluster.rsync(source=self.path, dest=dest_path, up=True, contents=True) elif isinstance(self.system, Resource): if self.system.endpoint(external=False) == dest_cluster.endpoint( @@ -371,7 +369,7 @@ def _cluster_to_local(self, cluster, dest_path): if not cluster.address: raise ValueError("Cluster must be started before copying data from it.") Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True) - cluster._rsync( + cluster.rsync( source=self.path, dest=str(Path(dest_path).expanduser()), up=False, diff --git a/runhouse/resources/folders/gcs_folder.py b/runhouse/resources/folders/gcs_folder.py index 78d1ca180..332b09adf 100644 --- a/runhouse/resources/folders/gcs_folder.py +++ b/runhouse/resources/folders/gcs_folder.py @@ -57,7 +57,7 @@ def _cluster_to_local(self, cluster, dest_path): raise ValueError("Cluster must be started before copying data from it.") Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True) - cluster._rsync( + cluster.rsync( source=self.path, dest=str(Path(dest_path).expanduser()), up=False, diff --git a/runhouse/resources/folders/s3_folder.py b/runhouse/resources/folders/s3_folder.py index 37ed33d0b..7baa3f8d8 100644 --- a/runhouse/resources/folders/s3_folder.py +++ b/runhouse/resources/folders/s3_folder.py @@ -63,7 +63,7 @@ def _cluster_to_local(self, cluster, dest_path): raise ValueError("Cluster must be started before copying data from it.") Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True) - cluster._rsync( + cluster.rsync( source=self.path, dest=str(Path(dest_path).expanduser()), up=False, diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index 9ca0159d6..05d343df8 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -459,7 +459,7 @@ def _sync_runhouse_to_cluster( local_rh_package_path = local_rh_package_path.parent dest_path = f"~/{local_rh_package_path.name}" - self._rsync( + self.rsync( source=str(local_rh_package_path), dest=dest_path, node="all", @@ -1099,7 +1099,7 @@ def __getstate__(self): return state # ----------------- SSH Methods ----------------- # - def _rsync( + def rsync( self, source: str, dest: str, @@ -1112,8 +1112,24 @@ def _rsync( """ Sync the contents of the source directory into the destination. - .. note: - Ending `source` with a slash will copy the contents of the directory into dest, + Args: + source (str): The source path. + dest (str): The target path. + up (bool): The direction of the sync. If ``True``, will rsync from local to cluster. If ``False`` + will rsync from cluster to local. + node (Optional[str]): Specific cluster node to rsync to. If not specified will use the address of the + cluster's head node. + contents (Optional[bool]): Whether the contents of the source directory or the directory itself should + be copied to destination. + If ``True`` the contents of the source directory are copied to the destination, and the source + directory itself is not created at the destination. + If ``False`` the source directory along with its contents are copied ot the destination, creating + an additional directory layer at the destination. (Default: ``False``). + filter_options (Optional[str]): The filter options for rsync. + stream_logs (Optional[bool]): Whether to stream logs to the stdout/stderr. (Default: ``False``). + + .. note:: + Ending ``source`` with a slash will copy the contents of the directory into dest, while omitting it will copy the directory itself (adding a directory layer). """ # Theoretically we could reuse this logic from SkyPilot which Rsyncs to all nodes in parallel: @@ -1122,7 +1138,7 @@ def _rsync( # If we need to change it to be greedier we can. if up and (node == "all" or (len(self.ips) > 1 and not node)): for node in self.ips: - self._rsync( + self.rsync( source, dest, up=up, @@ -1269,7 +1285,7 @@ def _copy_certs_to_cluster(self): # Copy to the home directory by default source = str(Path(self.cert_config.key_path).parent) dest = self.cert_config.DEFAULT_CLUSTER_DIR - self._rsync(source, dest, up=True) + self.rsync(source, dest, up=True) if self._use_caddy: # Move to the Caddy directory to ensure the daemon has access to the certs @@ -1581,7 +1597,7 @@ def notebook( if sync_package_on_close == "./": sync_package_on_close = locate_working_dir() pkg = Package.from_string("local:" + sync_package_on_close) - self._rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False) + self.rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False) if not persist: tunnel.terminate() kill_jupyter_cmd = f"jupyter notebook stop {port_fwd}" diff --git a/runhouse/resources/hardware/on_demand_cluster.py b/runhouse/resources/hardware/on_demand_cluster.py index 0f5f307c6..df8159c33 100644 --- a/runhouse/resources/hardware/on_demand_cluster.py +++ b/runhouse/resources/hardware/on_demand_cluster.py @@ -200,7 +200,7 @@ def endpoint(self, external=False): def _copy_sky_yaml_from_cluster(self, abs_yaml_path: str): if not Path(abs_yaml_path).exists(): Path(abs_yaml_path).parent.mkdir(parents=True, exist_ok=True) - self._rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False) + self.rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False) # Save SSH info to the ~/.ssh/config ray_yaml = yaml.safe_load(open(abs_yaml_path, "r")) diff --git a/runhouse/resources/hardware/sagemaker/sagemaker_cluster.py b/runhouse/resources/hardware/sagemaker/sagemaker_cluster.py index cf4ad720f..371e8223e 100644 --- a/runhouse/resources/hardware/sagemaker/sagemaker_cluster.py +++ b/runhouse/resources/hardware/sagemaker/sagemaker_cluster.py @@ -1147,7 +1147,7 @@ def _load_authorized_keys(self, bucket, auth_keys_file) -> Union[str, None]: # ------------------------------------------------------- # Cluster Helpers # ------------------------------------------------------- - def _rsync(self, source: str, dest: str, up: bool, contents: bool = False): + def rsync(self, source: str, dest: str, up: bool, contents: bool = False): source = source + "/" if not source.endswith("/") else source dest = dest + "/" if not dest.endswith("/") else dest @@ -1308,7 +1308,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non self.connect_server_client() # Sync the local ~/.rh directory to the cluster - self._rsync( + self.rsync( source=str(Path("~/.rh").expanduser()), dest="~/.rh", up=True, @@ -1332,7 +1332,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non local_rh_package_path = local_rh_package_path.parent dest_path = f"~/{local_rh_package_path.name}" - self._rsync( + self.rsync( source=str(local_rh_package_path), dest=dest_path, up=True, diff --git a/tests/fixtures/folder_fixtures.py b/tests/fixtures/folder_fixtures.py index d8c33e4c7..5dd100b92 100644 --- a/tests/fixtures/folder_fixtures.py +++ b/tests/fixtures/folder_fixtures.py @@ -67,7 +67,9 @@ def cluster_folder(ondemand_aws_cluster): cluster_folder = rh.folder(**args).to(system=ondemand_aws_cluster) init_args[id(cluster_folder)] = args - cluster_folder.put({f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}) + cluster_folder.put( + {f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True + ) return cluster_folder diff --git a/tests/test_resources/test_clusters/test_multinode_cluster.py b/tests/test_resources/test_clusters/test_multinode_cluster.py index bdd77333a..47496442b 100644 --- a/tests/test_resources/test_clusters/test_multinode_cluster.py +++ b/tests/test_resources/test_clusters/test_multinode_cluster.py @@ -17,7 +17,7 @@ def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_cluster): dest_path = f"~/{local_rh_package_path.name}" # Rsync Runhouse package onto the worker node - multinode_cpu_cluster._rsync( + multinode_cpu_cluster.rsync( source=str(local_rh_package_path), dest=dest_path, up=True, diff --git a/tests/test_resources/test_data/test_folder.py b/tests/test_resources/test_data/test_folder.py index 9427f2913..0f138e6cf 100644 --- a/tests/test_resources/test_data/test_folder.py +++ b/tests/test_resources/test_data/test_folder.py @@ -111,6 +111,30 @@ def test_send_folder_to_dest(self, folder, dest): new_folder.rm() + @pytest.mark.level("minimal") + def test_cluster_folder_sync_upload(self, local_folder, cluster): + cluster_dest = rh.Folder.DEFAULT_CACHE_FOLDER + cluster.rsync(source=local_folder.path, dest=cluster_dest, up=True) + res = cluster.run([f"ls -l {cluster_dest}"]) + + assert all(f"sample_file_{i}.txt" in res[0][1] for i in range(3)) + + @pytest.mark.level("minimal") + def test_cluster_folder_sync_download(self, cluster_folder): + import subprocess + + cluster = cluster_folder.system + local_dest = rh.Folder.DEFAULT_CACHE_FOLDER + + cluster.rsync( + source=cluster_folder.path, dest=local_dest, contents=True, up=False + ) + res = subprocess.run( + f"ls -l {local_dest}", shell=True, check=True, capture_output=True + ) + + assert all(f"sample_file_{i}.txt" in res.stdout.decode() for i in range(3)) + @pytest.mark.level("minimal") def test_send_folder_to_cluster(self, cluster): path = Path.cwd()