Skip to content

Commit

Permalink
Update and reuse rsync method
Browse files Browse the repository at this point in the history
  • Loading branch information
carolineechen committed Aug 29, 2024
1 parent 039cc71 commit 2c70e61
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 82 deletions.
12 changes: 10 additions & 2 deletions runhouse/resources/hardware/sky/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ def _rsync(
stream_logs: bool = True,
max_retry: int = 1,
prefix_command: Optional[str] = None,
get_remote_home_dir: Callable[[], str] = lambda: '~') -> None:
get_remote_home_dir: Callable[[], str] = lambda: '~',
filter_options: Optional[str] = None, # RH MODIFIED,
return_cmd: bool = False, # RH MODIFIED,
) -> None:
"""Builds the rsync command."""
# Build command.
rsync_command = []
Expand All @@ -239,7 +242,8 @@ def _rsync(
rsync_command += ['rsync', RSYNC_DISPLAY_OPTION]

# --filter
rsync_command.append(RSYNC_FILTER_OPTION)
addtl_filter_options = f" --filter='{filter_options}'" if filter_options else "" # RH MODIFIED
rsync_command.append(RSYNC_FILTER_OPTION + addtl_filter_options)

if up:
# Build --exclude-from argument.
Expand Down Expand Up @@ -281,6 +285,10 @@ def _rsync(
command = ' '.join(rsync_command)
logger.debug(f'Running rsync command: {command}')

# RH MODIFIED: return command instead of running it
if return_cmd:
return command

backoff = common_utils.Backoff(initial_backoff=5, max_backoff_factor=5)
assert max_retry > 0, f'max_retry {max_retry} must be positive.'
while max_retry >= 0:
Expand Down
97 changes: 17 additions & 80 deletions runhouse/resources/hardware/sky_ssh_runner.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
import logging
import os
import pathlib
import shlex
import time
from typing import Dict, List, Optional, Tuple, Union

from runhouse.constants import DEFAULT_DOCKER_CONTAINER_NAME

from runhouse.logger import get_logger

from runhouse.resources.hardware.sky import common_utils, log_lib, subprocess_utils
from runhouse.resources.hardware.sky import log_lib, subprocess_utils

from runhouse.resources.hardware.sky.command_runner import (
_DEFAULT_CONNECT_TIMEOUT,
GIT_EXCLUDE,
KubernetesCommandRunner,
RSYNC_DISPLAY_OPTION,
RSYNC_EXCLUDE_OPTION,
RSYNC_FILTER_OPTION,
ssh_options_list,
SSHCommandRunner,
SshMode,
Expand Down Expand Up @@ -257,29 +251,6 @@ def rsync(
Raises:
exceptions.CommandError: rsync command failed.
"""
# Build command.
# TODO(zhwu): This will print a per-file progress bar (with -P),
# shooting a lot of messages to the output. --info=progress2 is used
# to get a total progress bar, but it requires rsync>=3.1.0 and Mac
# OS has a default rsync==2.6.9 (16 years old).
rsync_command = ["rsync", RSYNC_DISPLAY_OPTION]

# RH MODIFIED: add --filter option
addtl_filter_options = f" --filter='{filter_options}'" if filter_options else ""
rsync_command.append(RSYNC_FILTER_OPTION + addtl_filter_options)

if up:
# The source is a local path, so we need to resolve it.
# --exclude-from
resolved_source = pathlib.Path(source).expanduser().resolve()
if (resolved_source / GIT_EXCLUDE).exists():
# Ensure file exists; otherwise, rsync will error out.
rsync_command.append(
RSYNC_EXCLUDE_OPTION.format(
shlex.quote(str(resolved_source / GIT_EXCLUDE))
)
)

if self._docker_ssh_proxy_command is not None:
docker_ssh_proxy_command = self._docker_ssh_proxy_command(["ssh"])
else:
Expand All @@ -294,54 +265,18 @@ def rsync(
disable_control_master=self.disable_control_master,
)
)
rsync_command.append(f'-e "ssh {ssh_options}"')
# To support spaces in the path, we need to quote source and target.
# rsync doesn't support '~' in a quoted local path, but it is ok to
# have '~' in a quoted remote path.
if up:
full_source_str = str(resolved_source)
if resolved_source.is_dir():
full_source_str = os.path.join(full_source_str, "")
rsync_command.extend(
[
f"{full_source_str!r}",
f"{self.ssh_user}@{self.ip}:{target!r}",
]
)
else:
rsync_command.extend(
[
f"{self.ssh_user}@{self.ip}:{source!r}",
f"{os.path.expanduser(target)!r}",
]
)
command = " ".join(rsync_command)

# RH MODIFIED: return command instead of running it
if return_cmd:
return command

backoff = common_utils.Backoff(initial_backoff=5, max_backoff_factor=5)
while max_retry >= 0:
returncode, _, stderr = log_lib.run_with_log(
command,
log_path=log_path,
stream_logs=stream_logs,
shell=True,
require_outputs=True,
)
if returncode == 0:
break
max_retry -= 1
time.sleep(backoff.current_backoff())

direction = "up" if up else "down"
error_msg = (
f"Failed to rsync {direction}: {source} -> {target}. "
"Ensure that the network is stable, then retry."
)
subprocess_utils.handle_returncode(
returncode, command, error_msg, stderr=stderr, stream_logs=stream_logs
rsh_option = f"ssh {ssh_options}"
return self._rsync(
source,
target,
node_destination=f"{self.ssh_user}@{self.ip}",
up=up,
rsh_option=rsh_option,
log_path=log_path,
stream_logs=stream_logs,
max_retry=max_retry,
filter_options=filter_options,
return_cmd=return_cmd,
)


Expand Down Expand Up @@ -492,8 +427,8 @@ def rsync(
log_path: str = os.devnull,
stream_logs: bool = True,
max_retry: int = 1,
filter_options: bool = False, # RH MODIFIED # TODO
return_cmd: bool = False, # RH MODIFIED # TODO
filter_options: bool = False, # RH MODIFIED
return_cmd: bool = False, # RH MODIFIED
) -> None:
"""Uses 'rsync' to sync 'source' to 'target'.
Args:
Expand Down Expand Up @@ -542,4 +477,6 @@ def get_remote_home_dir() -> str:
# /~/xx, so we need to replace ~ with the remote home directory. We
# only need to do this when ~ is at the beginning of the path.
get_remote_home_dir=get_remote_home_dir,
filter_options=filter_options,
return_cmd=return_cmd,
)

0 comments on commit 2c70e61

Please sign in to comment.