Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert cluster rsync to public method #1082

Merged
merged 1 commit into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions runhouse/resources/folders/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ def _to_cluster(self, dest_cluster, path=None):
if self._fs_str == self.DEFAULT_FS and dest_cluster.name is not None:
# Includes case where we're on the cluster itself
# And the destination is a cluster, not rh.here
dest_cluster._rsync(
source=self.path, dest=dest_path, up=True, contents=True
)
dest_cluster.rsync(source=self.path, dest=dest_path, up=True, contents=True)

elif isinstance(self.system, Resource):
if self.system.endpoint(external=False) == dest_cluster.endpoint(
Expand Down Expand Up @@ -371,7 +369,7 @@ def _cluster_to_local(self, cluster, dest_path):
if not cluster.address:
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/gcs_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _cluster_to_local(self, cluster, dest_path):
raise ValueError("Cluster must be started before copying data from it.")
Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)

cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/folders/s3_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _cluster_to_local(self, cluster, dest_path):
raise ValueError("Cluster must be started before copying data from it.")

Path(dest_path).expanduser().mkdir(parents=True, exist_ok=True)
cluster._rsync(
cluster.rsync(
source=self.path,
dest=str(Path(dest_path).expanduser()),
up=False,
Expand Down
30 changes: 23 additions & 7 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def _sync_runhouse_to_cluster(
local_rh_package_path = local_rh_package_path.parent
dest_path = f"~/{local_rh_package_path.name}"

self._rsync(
self.rsync(
source=str(local_rh_package_path),
dest=dest_path,
node="all",
Expand Down Expand Up @@ -1099,7 +1099,7 @@ def __getstate__(self):
return state

# ----------------- SSH Methods ----------------- #
def _rsync(
def rsync(
self,
source: str,
dest: str,
Expand All @@ -1112,8 +1112,24 @@ def _rsync(
"""
Sync the contents of the source directory into the destination.

.. note:
Ending `source` with a slash will copy the contents of the directory into dest,
Args:
source (str): The source path.
dest (str): The target path.
up (bool): The direction of the sync. If ``True``, will rsync from local to cluster. If ``False``
will rsync from cluster to local.
node (Optional[str]): Specific cluster node to rsync to. If not specified will use the address of the
cluster's head node.
contents (Optional[bool]): Whether the contents of the source directory or the directory itself should
be copied to destination.
If ``True`` the contents of the source directory are copied to the destination, and the source
directory itself is not created at the destination.
If ``False`` the source directory along with its contents are copied ot the destination, creating
an additional directory layer at the destination. (Default: ``False``).
filter_options (Optional[str]): The filter options for rsync.
stream_logs (Optional[bool]): Whether to stream logs to the stdout/stderr. (Default: ``False``).

.. note::
Ending ``source`` with a slash will copy the contents of the directory into dest,
while omitting it will copy the directory itself (adding a directory layer).
"""
# Theoretically we could reuse this logic from SkyPilot which Rsyncs to all nodes in parallel:
Expand All @@ -1122,7 +1138,7 @@ def _rsync(
# If we need to change it to be greedier we can.
if up and (node == "all" or (len(self.ips) > 1 and not node)):
for node in self.ips:
self._rsync(
self.rsync(
source,
dest,
up=up,
Expand Down Expand Up @@ -1269,7 +1285,7 @@ def _copy_certs_to_cluster(self):
# Copy to the home directory by default
source = str(Path(self.cert_config.key_path).parent)
dest = self.cert_config.DEFAULT_CLUSTER_DIR
self._rsync(source, dest, up=True)
self.rsync(source, dest, up=True)

if self._use_caddy:
# Move to the Caddy directory to ensure the daemon has access to the certs
Expand Down Expand Up @@ -1581,7 +1597,7 @@ def notebook(
if sync_package_on_close == "./":
sync_package_on_close = locate_working_dir()
pkg = Package.from_string("local:" + sync_package_on_close)
self._rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False)
self.rsync(source=f"~/{pkg.name}", dest=pkg.local_path, up=False)
if not persist:
tunnel.terminate()
kill_jupyter_cmd = f"jupyter notebook stop {port_fwd}"
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/on_demand_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def endpoint(self, external=False):
def _copy_sky_yaml_from_cluster(self, abs_yaml_path: str):
if not Path(abs_yaml_path).exists():
Path(abs_yaml_path).parent.mkdir(parents=True, exist_ok=True)
self._rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False)
self.rsync("~/.sky/sky_ray.yml", abs_yaml_path, up=False)

# Save SSH info to the ~/.ssh/config
ray_yaml = yaml.safe_load(open(abs_yaml_path, "r"))
Expand Down
6 changes: 3 additions & 3 deletions runhouse/resources/hardware/sagemaker/sagemaker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ def _load_authorized_keys(self, bucket, auth_keys_file) -> Union[str, None]:
# -------------------------------------------------------
# Cluster Helpers
# -------------------------------------------------------
def _rsync(self, source: str, dest: str, up: bool, contents: bool = False):
def rsync(self, source: str, dest: str, up: bool, contents: bool = False):
source = source + "/" if not source.endswith("/") else source
dest = dest + "/" if not dest.endswith("/") else dest

Expand Down Expand Up @@ -1308,7 +1308,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non
self.connect_server_client()

# Sync the local ~/.rh directory to the cluster
self._rsync(
self.rsync(
source=str(Path("~/.rh").expanduser()),
dest="~/.rh",
up=True,
Expand All @@ -1332,7 +1332,7 @@ def _sync_runhouse_to_cluster(self, node: str = None, _install_url=None, env=Non
local_rh_package_path = local_rh_package_path.parent
dest_path = f"~/{local_rh_package_path.name}"

self._rsync(
self.rsync(
source=str(local_rh_package_path),
dest=dest_path,
up=True,
Expand Down
4 changes: 3 additions & 1 deletion tests/fixtures/folder_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def cluster_folder(ondemand_aws_cluster):

cluster_folder = rh.folder(**args).to(system=ondemand_aws_cluster)
init_args[id(cluster_folder)] = args
cluster_folder.put({f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)})
cluster_folder.put(
{f"sample_file_{i}.txt": f"file{i}".encode() for i in range(3)}, overwrite=True
)
return cluster_folder


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_rsync_and_ssh_onto_worker_node(self, multinode_cpu_cluster):
dest_path = f"~/{local_rh_package_path.name}"

# Rsync Runhouse package onto the worker node
multinode_cpu_cluster._rsync(
multinode_cpu_cluster.rsync(
source=str(local_rh_package_path),
dest=dest_path,
up=True,
Expand Down
24 changes: 24 additions & 0 deletions tests/test_resources/test_data/test_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ def test_send_folder_to_dest(self, folder, dest):

new_folder.rm()

@pytest.mark.level("minimal")
def test_cluster_folder_sync_upload(self, local_folder, cluster):
cluster_dest = rh.Folder.DEFAULT_CACHE_FOLDER
cluster.rsync(source=local_folder.path, dest=cluster_dest, up=True)
res = cluster.run([f"ls -l {cluster_dest}"])

assert all(f"sample_file_{i}.txt" in res[0][1] for i in range(3))

@pytest.mark.level("minimal")
def test_cluster_folder_sync_download(self, cluster_folder):
import subprocess

cluster = cluster_folder.system
local_dest = rh.Folder.DEFAULT_CACHE_FOLDER

cluster.rsync(
source=cluster_folder.path, dest=local_dest, contents=True, up=False
)
res = subprocess.run(
f"ls -l {local_dest}", shell=True, check=True, capture_output=True
)

assert all(f"sample_file_{i}.txt" in res.stdout.decode() for i in range(3))

@pytest.mark.level("minimal")
def test_send_folder_to_cluster(self, cluster):
path = Path.cwd()
Expand Down
Loading