From 97181c96e5246784b4130139efacfdf5b1fe8ddd Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 10:47:18 -0800 Subject: [PATCH 01/23] Remove duplicate local_dir + rename to be the same as trial runner Signed-off-by: Justin Yu Fix constructor args Signed-off-by: Justin Yu Fix all references to _checkpoint_dir Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 33 +++++++++++------------ 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index aa7817dee87e..5ab4e19bb9dc 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -154,17 +154,16 @@ class _ExperimentCheckpointManager: def __init__( self, - checkpoint_dir: str, + local_checkpoint_dir: str, checkpoint_period: Union[int, float, str], start_time: float, session_str: str, syncer: Syncer, sync_trial_checkpoints: bool, - local_dir: str, - remote_dir: str, + remote_checkpoint_dir: str, sync_every_n_trial_checkpoints: Optional[int] = None, ): - self._checkpoint_dir = checkpoint_dir + self._local_checkpoint_dir = local_checkpoint_dir self._auto_checkpoint_enabled = checkpoint_period == "auto" if self._auto_checkpoint_enabled: self._checkpoint_period = 10.0 # Initial value @@ -176,8 +175,7 @@ def __init__( self._syncer = syncer self._sync_trial_checkpoints = sync_trial_checkpoints - self._local_dir = local_dir - self._remote_dir = remote_dir + self._remote_checkpoint_dir = remote_checkpoint_dir self._last_checkpoint_time = 0.0 self._last_sync_time = 0.0 @@ -225,7 +223,7 @@ def checkpoint( Args: force: Forces a checkpoint despite checkpoint_period. """ - if not self._checkpoint_dir: + if not self._local_checkpoint_dir: return force = force or self._should_force_cloud_sync @@ -243,12 +241,14 @@ def _serialize_and_write(): "timestamp": self._last_checkpoint_time, }, } - tmp_file_name = os.path.join(self._checkpoint_dir, ".tmp_checkpoint") + tmp_file_name = os.path.join(self._local_checkpoint_dir, ".tmp_checkpoint") with open(tmp_file_name, "w") as f: json.dump(runner_state, f, indent=2, cls=TuneFunctionEncoder) os.replace(tmp_file_name, checkpoint_file) - search_alg.save_to_dir(self._checkpoint_dir, session_str=self._session_str) + search_alg.save_to_dir( + self._local_checkpoint_dir, session_str=self._session_str + ) checkpoint_time_start = time.monotonic() with out_of_band_serialize_dataset(): @@ -274,14 +274,14 @@ def _serialize_and_write(): "`sync_timeout` in `SyncConfig`." ) synced = self._syncer.sync_up( - local_dir=self._local_dir, - remote_dir=self._remote_dir, + local_dir=self._local_checkpoint_dir, + remote_dir=self._remote_checkpoint_dir, exclude=exclude, ) else: synced = self._syncer.sync_up_if_needed( - local_dir=self._local_dir, - remote_dir=self._remote_dir, + local_dir=self._local_checkpoint_dir, + remote_dir=self._remote_checkpoint_dir, exclude=exclude, ) @@ -320,7 +320,7 @@ def _serialize_and_write(): ) self._last_checkpoint_time = time.time() - return self._checkpoint_dir + return self._local_checkpoint_dir @DeveloperAPI @@ -562,14 +562,13 @@ def end_experiment_callbacks(self) -> None: def _create_checkpoint_manager(self, sync_trial_checkpoints: bool = True): return _ExperimentCheckpointManager( - checkpoint_dir=self._local_checkpoint_dir, + local_checkpoint_dir=self._local_checkpoint_dir, checkpoint_period=self._checkpoint_period, start_time=self._start_time, session_str=self._session_str, syncer=self._syncer, sync_trial_checkpoints=sync_trial_checkpoints, - local_dir=self._local_checkpoint_dir, - remote_dir=self._remote_checkpoint_dir, + remote_checkpoint_dir=self._remote_checkpoint_dir, sync_every_n_trial_checkpoints=self._trial_checkpoint_config.num_to_keep, ) From 9f38a3495ea1af43b57609883f2e6fb201f82fe0 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 10:49:41 -0800 Subject: [PATCH 02/23] Add Trial.from_json_state alternate constructor Signed-off-by: Justin Yu --- python/ray/tune/experiment/trial.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index fa71ae26f4b4..9a2c502ce2d1 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -40,7 +40,7 @@ PlacementGroupFactory, resource_dict_to_pg_factory, ) -from ray.tune.utils.serialization import TuneFunctionEncoder +from ray.tune.utils.serialization import TuneFunctionDecoder, TuneFunctionEncoder from ray.tune.trainable.util import TrainableUtil from ray.tune.utils import date_str, flatten_dict from ray.util.annotations import DeveloperAPI @@ -972,3 +972,20 @@ def __setstate__(self, state): # TODO(ekl) this is kind of a hack. if not ray.util.client.ray.is_connected(): self.init_logdir() # Create logdir if it does not exist + @classmethod + def from_json_state(cls, json_state, stub: bool = False) -> "Trial": + trial_state = ( + json.loads(json_state, cls=TuneFunctionDecoder) + if isinstance(json_state, str) + else json_state + ) + + new_trial = Trial( + trial_state["trainable_name"], + stub=stub, + _setup_default_resource=False, + ) + + new_trial.__setstate__(trial_state) + + return new_trial From af2b1899219503f71405f160c3b83fac2e9dfba6 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 10:57:10 -0800 Subject: [PATCH 03/23] Only save around sync config + experiment dir name to construct remote ckpt dir Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 15 ++++++++++++--- python/ray/tune/tune.py | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 5ab4e19bb9dc..5e5be01da5dd 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -388,8 +388,8 @@ def __init__( search_alg: Optional[SearchAlgorithm] = None, scheduler: Optional[TrialScheduler] = None, local_checkpoint_dir: Optional[str] = None, - remote_checkpoint_dir: Optional[str] = None, sync_config: Optional[SyncConfig] = None, + experiment_dir_name: Optional[str] = None, stopper: Optional[Stopper] = None, resume: Union[str, bool] = False, server_port: Optional[int] = None, @@ -436,7 +436,7 @@ def __init__( # Manual override self._max_pending_trials = int(max_pending_trials) - sync_config = sync_config or SyncConfig() + self._sync_config = sync_config or SyncConfig() self.trial_executor.setup( max_pending_trials=self._max_pending_trials, @@ -485,7 +485,7 @@ def __init__( if self._local_checkpoint_dir: os.makedirs(self._local_checkpoint_dir, exist_ok=True) - self._remote_checkpoint_dir = remote_checkpoint_dir + self._experiment_dir_name = experiment_dir_name self._syncer = get_node_to_storage_syncer(sync_config) self._stopper = stopper or NoopStopper() @@ -584,6 +584,12 @@ def search_alg(self): def scheduler_alg(self): return self._scheduler_alg + @property + def _remote_checkpoint_dir(self): + if self._sync_config.upload_dir and self._experiment_dir_name: + return os.path.join(self._sync_config.upload_dir, self._experiment_dir_name) + return None + def _validate_resume( self, resume_type: Union[str, bool], driver_sync_trial_checkpoints=True ) -> Tuple[bool, Optional[_ResumeConfig]]: @@ -1622,6 +1628,9 @@ def __getstate__(self): "_syncer", "_callbacks", "_checkpoint_manager", + "_local_checkpoint_dir", + "_sync_config", + "_experiment_dir_name", ]: del state[k] state["launch_web_server"] = bool(self._server) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index cdcaa6a7fdb3..51cc38ef17ce 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -689,7 +689,7 @@ class and registered trainables. search_alg=search_alg, scheduler=scheduler, local_checkpoint_dir=experiments[0].checkpoint_dir, - remote_checkpoint_dir=experiments[0].remote_checkpoint_dir, + experiment_dir_name=experiments[0].dir_name, sync_config=sync_config, stopper=experiments[0].stopper, resume=resume, From d225599de6ab379a304a5d2c47dd79fa52edf594 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 10:58:06 -0800 Subject: [PATCH 04/23] Update upload dir and experiment name upon URI restore Signed-off-by: Justin Yu --- python/ray/tune/impl/tuner_internal.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/ray/tune/impl/tuner_internal.py b/python/ray/tune/impl/tuner_internal.py index b47f3321e474..675f41852e18 100644 --- a/python/ray/tune/impl/tuner_internal.py +++ b/python/ray/tune/impl/tuner_internal.py @@ -7,6 +7,7 @@ import tempfile from pathlib import Path from typing import Any, Callable, Dict, Optional, Type, Union, TYPE_CHECKING, Tuple +import urllib.parse import ray import ray.cloudpickle as pickle @@ -334,6 +335,14 @@ def _restore_from_path_or_uri( self._run_config.local_dir = str(experiment_path.parent) self._run_config.name = experiment_path.name else: + # Set the experiment `name` and `upload_dir` according to the URI + parsed_uri = urllib.parse.urlparse(path_or_uri) + remote_path = Path(parsed_uri.netloc + parsed_uri.path) + self._run_config.name = remote_path.name + self._run_config.sync_config.upload_dir = ( + parsed_uri.scheme + "://" + str(remote_path.parent) + ) + # If we synced, `experiment_checkpoint_dir` will contain a temporary # directory. Create an experiment checkpoint dir instead and move # our data there. From e0267a8630bb908dd0735549408088d1985692f9 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 11:03:31 -0800 Subject: [PATCH 05/23] Refactor relative checkpoint dir logic to be in local_dir setter instead Signed-off-by: Justin Yu --- python/ray/tune/experiment/trial.py | 60 +++++++++++++++-------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index 9a2c502ce2d1..658b785abcd7 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -293,7 +293,7 @@ def __init__( self.trainable_name = trainable_name self.trial_id = Trial.generate_id() if trial_id is None else trial_id self.config = config or {} - self.local_dir = local_dir # This remains unexpanded for syncing. + self._local_dir = local_dir # This remains unexpanded for syncing. # Parameters that Tune varies across searches. self.evaluated_params = evaluated_params or {} @@ -472,9 +472,39 @@ def get_runner_ip(self) -> Optional[str]: self.location = _Location(hostname, pid) return self.location.hostname + @property + def local_dir(self): + return self._local_dir + + @local_dir.setter + def local_dir(self, local_dir): + relative_checkpoint_dirs = [] + if self.logdir: + # Save the relative paths of persistent trial checkpoints, which are saved + # relative to the old `local_dir`/`logdir` + for checkpoint in self.get_trial_checkpoints(): + checkpoint_dir = checkpoint.dir_or_data + assert isinstance(checkpoint_dir, str) + relative_checkpoint_dirs.append( + os.path.relpath(checkpoint_dir, self.logdir) + ) + + # Update the underlying `_local_dir`, which also updates the trial `logdir` + self._local_dir = local_dir + + if self.logdir: + for checkpoint, relative_checkpoint_dir in zip( + self.get_trial_checkpoints(), relative_checkpoint_dirs + ): + # Reconstruct the checkpoint dir using the (possibly updated) + # trial logdir and the relative checkpoint directory. + checkpoint.dir_or_data = os.path.join( + self.logdir, relative_checkpoint_dir + ) + @property def logdir(self): - if not self.relative_logdir: + if not self.local_dir or not self.relative_logdir: return None return str(Path(self.local_dir).joinpath(self.relative_logdir)) @@ -922,46 +952,20 @@ def __getstate__(self): state["_state_valid"] = False state["_default_result_or_future"] = None - # Save the relative paths of persistent trial checkpoints - # When loading this trial state, the paths should be constructed again - # relative to the trial `logdir`, which may have been updated. - relative_checkpoint_dirs = [] - for checkpoint in self.get_trial_checkpoints(): - checkpoint_dir = checkpoint.dir_or_data - assert isinstance(checkpoint_dir, str) - relative_checkpoint_dirs.append( - os.path.relpath(checkpoint_dir, self.logdir) - ) - state["__relative_checkpoint_dirs"] = relative_checkpoint_dirs - return copy.deepcopy(state) def __setstate__(self, state): - if state["status"] == Trial.RUNNING: state["status"] = Trial.PENDING for key in self._nonjson_fields: if key in state: state[key] = cloudpickle.loads(hex_to_binary(state[key])) - # Retrieve the relative checkpoint dirs - relative_checkpoint_dirs = state.pop("__relative_checkpoint_dirs", None) - # Ensure that stub doesn't get overriden stub = state.pop("stub", True) self.__dict__.update(state) self.stub = stub or getattr(self, "stub", False) - if relative_checkpoint_dirs: - for checkpoint, relative_checkpoint_dir in zip( - self.get_trial_checkpoints(), relative_checkpoint_dirs - ): - # Reconstruct the checkpoint dir using the (possibly updated) - # trial logdir and the relative checkpoint directory. - checkpoint.dir_or_data = os.path.join( - self.logdir, relative_checkpoint_dir - ) - if not self.stub: validate_trainable(self.trainable_name) From 87c5452751f73850f774c92b58a93a3e5da683e5 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 11:15:58 -0800 Subject: [PATCH 06/23] Refactor trial loading logic to use Trial.from_json_state Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 33 ++++++++++++++++------- python/ray/tune/experiment/trial.py | 5 ---- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 5e5be01da5dd..3388652551b3 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -850,19 +850,34 @@ def resume( ) ) - trial_runner_data = runner_state["runner_data"] - # Don't overwrite the current `_local_checkpoint_dir` - # The current directory could be different from the checkpointed - # directory, if the experiment directory has changed. - trial_runner_data.pop("_local_checkpoint_dir", None) + # 1. Restore trial runner state + self.__setstate__(runner_state["runner_data"]) - self.__setstate__(trial_runner_data) + # 2. Restore search algorithm state if self._search_alg.has_checkpoint(self._local_checkpoint_dir): self._search_alg.restore_from_dir(self._local_checkpoint_dir) - trials = _load_trials_from_experiment_checkpoint( - runner_state, new_local_dir=self._local_checkpoint_dir - ) + # 3. Load trial table from experiment checkpoint + trials = [] + for trial_json_state in runner_state["checkpoints"]: + trial = Trial.from_json_state(trial_json_state) + + # The following properties may be updated on restoration + # Ex: moved local/cloud experiment directory + trial.local_dir = self._local_checkpoint_dir + trial.sync_config = self._sync_config + trial.experiment_dir_name = self._experiment_dir_name + + # Avoid creating logdir in client mode for returned trial results, + # since the dir might not be creatable locally. + # TODO(ekl) this is kind of a hack. + if not ray.util.client.ray.is_connected(): + trial.init_logdir() # Create logdir if it does not exist + + trial.refresh_default_resource_request() + trials.append(trial) + + # 4. Set trial statuses according to the resume configuration for trial in sorted(trials, key=lambda t: t.last_update_time, reverse=True): trial_to_add = trial if trial.status == Trial.ERROR: diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index 658b785abcd7..ed9123d9d2aa 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -971,11 +971,6 @@ def __setstate__(self, state): assert self.placement_group_factory - # Avoid creating logdir in client mode for returned trial results, - # since the dir might not be creatable locally. - # TODO(ekl) this is kind of a hack. - if not ray.util.client.ray.is_connected(): - self.init_logdir() # Create logdir if it does not exist @classmethod def from_json_state(cls, json_state, stub: bool = False) -> "Trial": trial_state = ( From e202dc00da78c7bb88cc280d9f02fb8a8422edfe Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 11:19:35 -0800 Subject: [PATCH 07/23] Switch ExperimentAnalysis trial loading to from_json_state as well Signed-off-by: Justin Yu --- python/ray/tune/analysis/experiment_analysis.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 2026107a9c43..eeb8b1fc20a3 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -143,13 +143,10 @@ def _load_checkpoints_from_latest(self, latest_checkpoint: List[str]) -> None: if "checkpoints" not in experiment_state: raise TuneError("Experiment state invalid; no checkpoints found.") + self._checkpoints_and_paths += [ - (_decode_checkpoint_from_experiment_state(cp), Path(path).parent) - for cp in experiment_state["checkpoints"] + (cp, Path(path).parent) for cp in experiment_state["checkpoints"] ] - self._checkpoints_and_paths = sorted( - self._checkpoints_and_paths, key=lambda tup: tup[0]["trial_id"] - ) def _get_latest_checkpoint(self, experiment_checkpoint_path: Path) -> List[str]: # Case 1: Dir specified, find latest checkpoint. @@ -799,11 +796,10 @@ def _get_trial_paths(self) -> List[str]: ) self.trials = [] _trial_paths = [] - for checkpoint, path in self._checkpoints_and_paths: + for trial_json_state, path in self._checkpoints_and_paths: try: - trial = _load_trial_from_checkpoint( - checkpoint, stub=True, new_local_dir=str(path) - ) + trial = Trial.from_json_state(trial_json_state, stub=True) + trial.local_dir = str(path) except Exception: logger.warning( f"Could not load trials from experiment checkpoint. " From dde5d875839eb09dd18fbeddcab1cc2fb0f85288 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 11:19:59 -0800 Subject: [PATCH 08/23] Add restore test from moved URI Signed-off-by: Justin Yu --- python/ray/tune/tests/test_tuner_restore.py | 66 ++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index b528ca80590a..9c52d441d731 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -18,7 +18,12 @@ ScalingConfig, session, ) -from ray.air._internal.remote_storage import delete_at_uri, download_from_uri +from ray.air._internal.remote_storage import ( + delete_at_uri, + download_from_uri, + upload_to_uri, + list_at_uri, +) from ray.train.data_parallel_trainer import DataParallelTrainer from ray.tune import Callback, Trainable from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint @@ -746,6 +751,63 @@ def test_tuner_restore_from_moved_experiment_path( assert not old_local_dir.exists() +def test_tuner_restore_from_moved_cloud_uri(ray_start_2_cpus, tmp_path): + """Test that moving a""" + + def failing_fn(config): + data = {"score": 1} + session.report(data, checkpoint=Checkpoint.from_dict(data)) + raise RuntimeError("Failing!") + + tuner = Tuner( + failing_fn, + run_config=RunConfig( + name="exp_dir", + local_dir=str(tmp_path / "ray_results"), + sync_config=tune.SyncConfig(upload_dir="memory:///original"), + ), + tune_config=TuneConfig(trial_dirname_creator=lambda _: "test"), + ) + tuner.fit() + + # mv memory:///original/exp_dir memory:///moved/new_exp_dir + download_from_uri( + "memory:///original/exp_dir", str(tmp_path / "moved" / "new_exp_dir") + ) + delete_at_uri("memory:///original") + upload_to_uri(str(tmp_path / "moved"), "memory:///moved") + + tuner = Tuner.restore("memory:///moved/new_exp_dir", resume_errored=True) + # This is needed because the mock memory:/// filesystem doesn't + # sync checkpoints properly, so this just copies the original checkpoint + # to the new local directory. + # NOTE: A new local directory is used since the experiment name got modified. + shutil.move( + tmp_path / "ray_results/exp_dir/test/checkpoint_000000", + tmp_path / "ray_results/new_exp_dir/test/checkpoint_000000", + ) + results = tuner.fit() + + assert list_at_uri("memory:///") == ["moved"] + num_experiment_checkpoints = len( + [ + path + for path in list_at_uri("memory:///moved/new_exp_dir") + if path.startswith("experiment_state") + ] + ) + assert num_experiment_checkpoints == 2 + + num_trial_checkpoints = len( + [ + path + for path in os.listdir(results[0].log_dir) + if path.startswith("checkpoint_") + ] + ) + assert num_trial_checkpoints == 2 + + def test_restore_from_relative_path(ray_start_4_cpus, chdir_tmpdir): tuner = Tuner( lambda config: session.report({"score": 1}), @@ -812,7 +874,7 @@ def on_trial_result(self, runner, trial, result): @pytest.mark.parametrize("use_air_trainer", [True, False]) -def test_checkpoints_saved_after_resume(tmp_path, use_air_trainer): +def test_checkpoints_saved_after_resume(ray_start_2_cpus, tmp_path, use_air_trainer): """Checkpoints saved after experiment restore should pick up at the correct iteration and should not overwrite the checkpoints from the original run. Old checkpoints should still be deleted if the total number of checkpoints From f6299a3f71fdcbcf2f35eb7a923424f7bc4aeaf0 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 11:21:51 -0800 Subject: [PATCH 09/23] Remove load trial utilities Signed-off-by: Justin Yu --- .../ray/tune/analysis/experiment_analysis.py | 5 +- python/ray/tune/execution/trial_runner.py | 61 +------------------ 2 files changed, 2 insertions(+), 64 deletions(-) diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index eeb8b1fc20a3..e3d33239ff7e 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -30,10 +30,7 @@ TRAINING_ITERATION, ) from ray.tune.experiment import Trial -from ray.tune.execution.trial_runner import ( - _find_newest_experiment_checkpoint, - _load_trial_from_checkpoint, -) +from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint from ray.tune.trainable.util import TrainableUtil from ray.tune.utils.util import unflattened_lookup diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 3388652551b3..565094ffe327 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -1,6 +1,6 @@ from collections import defaultdict from dataclasses import dataclass -from typing import Any, DefaultDict, List, Mapping, Optional, Union, Tuple, Set +from typing import DefaultDict, List, Optional, Union, Tuple, Set import click from datetime import datetime @@ -67,65 +67,6 @@ def _find_newest_experiment_checkpoint(ckpt_dir) -> Optional[str]: return max(full_paths) -def _load_trial_from_checkpoint( - trial_cp: dict, stub: bool = False, new_local_dir: Optional[str] = None -) -> Trial: - """Create a Trial from the state stored in the experiment checkpoint. - - Args: - trial_cp: Trial state from the experiment checkpoint, which is loaded - from the trial's `Trial.get_json_state`. - stub: Whether or not to validate the trainable name when creating the Trial. - Used for testing purposes for creating mocks. - new_local_dir: If set, this `local_dir` will overwrite what's saved in the - `trial_cp` state. Used in the case that the trial directory has moved. - The Trial `logdir` and the persistent trial checkpoints will have their - paths updated relative to this new directory. - - Returns: - new_trial: New trial with state loaded from experiment checkpoint - """ - new_trial = Trial( - trial_cp["trainable_name"], - stub=stub, - _setup_default_resource=False, - ) - if new_local_dir: - trial_cp["local_dir"] = new_local_dir - new_trial.__setstate__(trial_cp) - new_trial.refresh_default_resource_request() - return new_trial - - -def _load_trials_from_experiment_checkpoint( - experiment_checkpoint: Mapping[str, Any], - stub: bool = False, - new_local_dir: Optional[str] = None, -) -> List[Trial]: - """Create trial objects from experiment checkpoint. - - Given an experiment checkpoint (TrialRunner state dict), return - list of trials. See `_ExperimentCheckpointManager.checkpoint` for - what's saved in the TrialRunner state dict. - """ - checkpoints = [ - json.loads(cp, cls=TuneFunctionDecoder) if isinstance(cp, str) else cp - for cp in experiment_checkpoint["checkpoints"] - ] - - trials = [] - for trial_cp in checkpoints: - trials.append( - _load_trial_from_checkpoint( - trial_cp, - stub=stub, - new_local_dir=new_local_dir, - ) - ) - - return trials - - @dataclass class _ResumeConfig: resume_unfinished: bool = True From 7f810bdde00a38a426fa94638e96cc39185e73f6 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 16:21:43 -0800 Subject: [PATCH 10/23] Fix sync_config=None case Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 565094ffe327..ec91112608d0 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -381,7 +381,7 @@ def __init__( self.trial_executor.setup( max_pending_trials=self._max_pending_trials, - trainable_kwargs={"sync_timeout": sync_config.sync_timeout}, + trainable_kwargs={"sync_timeout": self._sync_config.sync_timeout}, ) self._metric = metric @@ -428,7 +428,7 @@ def __init__( self._experiment_dir_name = experiment_dir_name - self._syncer = get_node_to_storage_syncer(sync_config) + self._syncer = get_node_to_storage_syncer(self._sync_config) self._stopper = stopper or NoopStopper() self._resumed = False From 936174c30dfdb7ec97f67adcd6eea63787f817fb Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 13 Jan 2023 16:52:16 -0800 Subject: [PATCH 11/23] Fix upload dir parsing to include query string Signed-off-by: Justin Yu --- python/ray/tune/impl/tuner_internal.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/impl/tuner_internal.py b/python/ray/tune/impl/tuner_internal.py index 675f41852e18..38dd889d8912 100644 --- a/python/ray/tune/impl/tuner_internal.py +++ b/python/ray/tune/impl/tuner_internal.py @@ -337,11 +337,13 @@ def _restore_from_path_or_uri( else: # Set the experiment `name` and `upload_dir` according to the URI parsed_uri = urllib.parse.urlparse(path_or_uri) - remote_path = Path(parsed_uri.netloc + parsed_uri.path) + remote_path = Path(os.path.normpath(parsed_uri.netloc + parsed_uri.path)) + upload_dir = parsed_uri._replace( + netloc="", path=str(remote_path.parent) + ).geturl() + self._run_config.name = remote_path.name - self._run_config.sync_config.upload_dir = ( - parsed_uri.scheme + "://" + str(remote_path.parent) - ) + self._run_config.sync_config.upload_dir = upload_dir # If we synced, `experiment_checkpoint_dir` will contain a temporary # directory. Create an experiment checkpoint dir instead and move From 20104dc03f6843aa62f0da89aa64776a7bd874e7 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 16:40:30 -0800 Subject: [PATCH 12/23] Sort trials in results by trial id Signed-off-by: Justin Yu --- python/ray/tune/analysis/experiment_analysis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index e3d33239ff7e..241d7d2d9189 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -792,7 +792,6 @@ def _get_trial_paths(self) -> List[str]: "out of sync, as checkpointing is periodic." ) self.trials = [] - _trial_paths = [] for trial_json_state, path in self._checkpoints_and_paths: try: trial = Trial.from_json_state(trial_json_state, stub=True) @@ -807,7 +806,9 @@ def _get_trial_paths(self) -> List[str]: ) continue self.trials.append(trial) - _trial_paths.append(str(trial.logdir)) + + self.trials.sort(key=lambda trial: trial.trial_id) + _trial_paths = [str(trial.logdir) for trial in self.trials] if not _trial_paths: raise TuneError("No trials found.") From bce61b4f7cc9e5253bae606a170cf6187aadeb01 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 16:40:53 -0800 Subject: [PATCH 13/23] Strict input type for from_json_state Signed-off-by: Justin Yu --- python/ray/tune/experiment/trial.py | 32 +++++++++++++---------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index ed9123d9d2aa..88f37b8a0a48 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -931,6 +931,20 @@ def get_json_state(self) -> str: self._state_valid = True return self._state_json + @classmethod + def from_json_state(cls, json_state: str, stub: bool = False) -> "Trial": + trial_state = json.loads(json_state, cls=TuneFunctionDecoder) + + new_trial = Trial( + trial_state["trainable_name"], + stub=stub, + _setup_default_resource=False, + ) + + new_trial.__setstate__(trial_state) + + return new_trial + def __getstate__(self): """Memento generator for Trial. @@ -970,21 +984,3 @@ def __setstate__(self, state): validate_trainable(self.trainable_name) assert self.placement_group_factory - - @classmethod - def from_json_state(cls, json_state, stub: bool = False) -> "Trial": - trial_state = ( - json.loads(json_state, cls=TuneFunctionDecoder) - if isinstance(json_state, str) - else json_state - ) - - new_trial = Trial( - trial_state["trainable_name"], - stub=stub, - _setup_default_resource=False, - ) - - new_trial.__setstate__(trial_state) - - return new_trial From 3ca887d9bae2327f7fb28dda2d9d504af230d9e6 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 16:53:25 -0800 Subject: [PATCH 14/23] Clear memory filesys fixture for tests that use it Signed-off-by: Justin Yu --- python/ray/tune/tests/test_tuner_restore.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index 9c52d441d731..4a119d38da84 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -59,6 +59,12 @@ def chdir_tmpdir(tmpdir): os.chdir(old_cwd) +@pytest.fixture +def clear_memory_filesys(): + yield + delete_at_uri("memory:///") + + def _train_fn_sometimes_failing(config): # Fails if failing is set and marker file exists. # Hangs if hanging is set and marker file exists. @@ -370,7 +376,7 @@ def test_tuner_resume_errored_only(ray_start_2_cpus, tmpdir): assert sorted([r.metrics.get("it", 0) for r in results]) == sorted([2, 1, 3, 0]) -def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir): +def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir, clear_memory_filesys): """Check that restoring Tuner() objects from cloud storage works""" tuner = Tuner( lambda config: 1, @@ -424,7 +430,7 @@ def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir): [None, "memory:///test/test_tuner_restore_latest_available_checkpoint"], ) def test_tuner_restore_latest_available_checkpoint( - ray_start_4_cpus, tmpdir, upload_uri + ray_start_4_cpus, tmpdir, upload_uri, clear_memory_filesys ): """Resuming errored trials should pick up from previous state""" fail_marker = tmpdir / "fail_marker" @@ -751,8 +757,11 @@ def test_tuner_restore_from_moved_experiment_path( assert not old_local_dir.exists() -def test_tuner_restore_from_moved_cloud_uri(ray_start_2_cpus, tmp_path): - """Test that moving a""" +def test_tuner_restore_from_moved_cloud_uri( + ray_start_2_cpus, tmp_path, clear_memory_filesys +): + """Test that that restoring an experiment that was moved to a new remote URI + resumes and continues saving new results at that URI.""" def failing_fn(config): data = {"score": 1} From 12068a89561b3e315dcf306b5cd9bf1a3c8b8fde Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 16:56:57 -0800 Subject: [PATCH 15/23] Remove useless check (trial logdir is loaded from the checkpoint) Signed-off-by: Justin Yu --- python/ray/tune/tests/test_experiment_analysis.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/ray/tune/tests/test_experiment_analysis.py b/python/ray/tune/tests/test_experiment_analysis.py index dc5e32ec9f33..c89202b50d2b 100644 --- a/python/ray/tune/tests/test_experiment_analysis.py +++ b/python/ray/tune/tests/test_experiment_analysis.py @@ -91,14 +91,8 @@ def testStats(self): def testTrialDataframe(self): checkpoints = self.ea._checkpoints_and_paths idx = random.randint(0, len(checkpoints) - 1) - logdir_from_checkpoint = str( - checkpoints[idx][1].joinpath(checkpoints[idx][0]["relative_logdir"]) - ) logdir_from_trial = self.ea.trials[idx].logdir - - self.assertEqual(logdir_from_checkpoint, logdir_from_trial) - - trial_df = self.ea.trial_dataframes[logdir_from_checkpoint] + trial_df = self.ea.trial_dataframes[logdir_from_trial] self.assertTrue(isinstance(trial_df, pd.DataFrame)) self.assertEqual(trial_df.shape[0], 1) From 687ea5e823702f3153596b92a441f6cfd6ff4272 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 17:07:48 -0800 Subject: [PATCH 16/23] Fix tests related to from_json_state Signed-off-by: Justin Yu --- .../tune/tests/test_trial_relative_logdir.py | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/python/ray/tune/tests/test_trial_relative_logdir.py b/python/ray/tune/tests/test_trial_relative_logdir.py index 41f7c6c46610..da05ded1f1fb 100644 --- a/python/ray/tune/tests/test_trial_relative_logdir.py +++ b/python/ray/tune/tests/test_trial_relative_logdir.py @@ -13,9 +13,7 @@ import ray from ray import tune from ray.air._internal.checkpoint_manager import CheckpointStorage, _TrackedCheckpoint -from ray.tune.execution.trial_runner import _load_trial_from_checkpoint from ray.tune.experiment import Trial -from ray.tune.utils.serialization import TuneFunctionDecoder def train(config): @@ -259,8 +257,9 @@ def testRelativeLogdirWithJson(self): def test_load_trial_from_json_state(tmpdir): - """Check that `Trial.get_json_state` and `_load_trial_from_checkpoint` - for saving and loading a Trial is done correctly.""" + """Check that serializing a trial to a JSON string with `Trial.get_json_state` + and then creating a new trial using the `Trial.from_json_state` alternate + constructor loads the trial with equivalent state.""" trial = Trial( "MockTrainable", stub=True, trial_id="abcd1234", local_dir=str(tmpdir) ) @@ -276,22 +275,37 @@ def test_load_trial_from_json_state(tmpdir): ) ) - json_cp = trial.get_json_state() - trial_cp = json.loads(json_cp, cls=TuneFunctionDecoder) # After loading, the trial state should be the same - new_trial = _load_trial_from_checkpoint(trial_cp.copy(), stub=True) - assert new_trial.get_json_state() == json_cp + json_state = trial.get_json_state() + new_trial = Trial.from_json_state(json_state, stub=True) + assert new_trial.get_json_state() == json_state + + +def test_change_trial_local_dir(tmpdir): + trial = Trial( + "MockTrainable", stub=True, trial_id="abcd1234", local_dir=str(tmpdir) + ) + trial.init_logdir() + trial.status = Trial.TERMINATED + + checkpoint_logdir = os.path.join(trial.logdir, "checkpoint_00000") + trial.checkpoint_manager.on_checkpoint( + _TrackedCheckpoint( + dir_or_data=checkpoint_logdir, + storage_mode=CheckpointStorage.PERSISTENT, + metrics={"training_iteration": 1}, + ) + ) + + assert trial.logdir.startswith(str(tmpdir)) + assert trial.get_trial_checkpoints()[0].dir_or_data.startswith(str(tmpdir)) # Specify a new local dir, and the logdir/checkpoint path should be updated with tempfile.TemporaryDirectory() as new_local_dir: - new_trial = _load_trial_from_checkpoint( - trial_cp.copy(), stub=True, new_local_dir=new_local_dir - ) + trial.local_dir = new_local_dir - assert new_trial.logdir.startswith(new_local_dir) - assert new_trial.get_trial_checkpoints()[0].dir_or_data.startswith( - new_local_dir - ) + assert trial.logdir.startswith(new_local_dir) + assert trial.get_trial_checkpoints()[0].dir_or_data.startswith(new_local_dir) if __name__ == "__main__": From c65999fd6881a52df9396e85c3e3070ae3b7e0c2 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 17:18:20 -0800 Subject: [PATCH 17/23] Update TrialRunner docstring, remove remote_checkpoint_dir kwargs from trial runner tests Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 17 +++++++++-------- python/ray/tune/tests/test_trial_runner_3.py | 4 ---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index ec91112608d0..0479dd6eeb4d 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -291,14 +291,15 @@ class TrialRunner: search_alg: SearchAlgorithm for generating Trial objects. scheduler: Defaults to FIFOScheduler. - local_checkpoint_dir: Path where - global checkpoints are stored and restored from. - remote_checkpoint_dir: Remote path where - global checkpoints are stored and restored from. Used - if `resume` == REMOTE. - sync_config: See `tune.py:run`. - stopper: Custom class for stopping whole experiments. See - ``Stopper``. + local_checkpoint_dir: Path where global experiment state checkpoints + are saved and restored from. + sync_config: See :class:`~ray.tune.syncer.SyncConfig`. + Within sync config, the `upload_dir` specifies cloud storage, and + experiment state checkpoints will be synced to the `remote_checkpoint_dir`: + `{sync_config.upload_dir}/{experiment_name}`. + experiment_dir_name: Experiment directory name. + See :class:`~ray.tune.experiment.Experiment`. + stopper: Custom class for stopping whole experiments. See ``Stopper``. resume: see `tune.py:run`. server_port: Port number for launching TuneServer. fail_fast: Finishes as soon as a trial fails if True. diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 9b0c10f6db4b..a50b714553de 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -905,7 +905,6 @@ def delete(self, remote_dir: str) -> bool: sync_config=SyncConfig( upload_dir="fake", syncer=CustomSyncer(), sync_period=0 ), - remote_checkpoint_dir="fake", trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) runner.add_trial(Trial("__fake", config={"user_checkpoint_freq": 1})) @@ -951,7 +950,6 @@ def delete(self, remote_dir: str) -> bool: runner = TrialRunner( local_checkpoint_dir=self.tmpdir, sync_config=SyncConfig(upload_dir="fake", syncer=syncer), - remote_checkpoint_dir="fake", trial_checkpoint_config=checkpoint_config, checkpoint_period=100, # Only rely on forced syncing trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), @@ -1019,7 +1017,6 @@ def testForcedCloudCheckpointSyncTimeout(self): runner = TrialRunner( local_checkpoint_dir=self.tmpdir, sync_config=SyncConfig(upload_dir="fake", syncer=syncer), - remote_checkpoint_dir="fake", ) # Checkpoint for the first time starts the first sync in the background runner.checkpoint(force=True) @@ -1047,7 +1044,6 @@ def testPeriodicCloudCheckpointSyncTimeout(self): runner = TrialRunner( local_checkpoint_dir=self.tmpdir, sync_config=SyncConfig(upload_dir="fake", syncer=syncer), - remote_checkpoint_dir="fake", ) with freeze_time() as frozen: From f16e5137f1989d37d6f0486d1968e402d5ce9411 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 17:34:41 -0800 Subject: [PATCH 18/23] Simplify mocks in test_api Signed-off-by: Justin Yu --- python/ray/tune/tests/test_api.py | 75 ++----------------- python/ray/tune/tests/test_trial_scheduler.py | 2 + 2 files changed, 9 insertions(+), 68 deletions(-) diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index b9085eeccc6f..d109366d57fb 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -1806,55 +1806,22 @@ def train(config, reporter): self.assertEqual(trial.last_result["mean_accuracy"], float("inf")) def testSearcherSchedulerStr(self): - def train(config): - tune.report(metric=1) - capture = {} class MockTrialRunner(TrialRunner): - def __init__( - self, - search_alg=None, - scheduler=None, - local_checkpoint_dir=None, - remote_checkpoint_dir=None, - sync_config=None, - stopper=None, - resume=False, - server_port=None, - fail_fast=False, - checkpoint_period=None, - trial_executor=None, - callbacks=None, - metric=None, - trial_checkpoint_config=None, - driver_sync_trial_checkpoints=True, - ): - # should be converted from strings at this case - # and not None + def __init__(self, search_alg=None, scheduler=None, **kwargs): + # should be converted from strings at this case and not None capture["search_alg"] = search_alg capture["scheduler"] = scheduler super().__init__( search_alg=search_alg, scheduler=scheduler, - local_checkpoint_dir=local_checkpoint_dir, - remote_checkpoint_dir=remote_checkpoint_dir, - sync_config=sync_config, - stopper=stopper, - resume=resume, - server_port=server_port, - fail_fast=fail_fast, - checkpoint_period=checkpoint_period, - trial_executor=trial_executor, - callbacks=callbacks, - metric=metric, - trial_checkpoint_config=trial_checkpoint_config, - driver_sync_trial_checkpoints=True, + **kwargs, ) with patch("ray.tune.tune.TrialRunner", MockTrialRunner): tune.run( - train, + lambda config: tune.report(metric=1), search_alg="random", scheduler="async_hyperband", metric="metric", @@ -1889,42 +1856,14 @@ def train(config): capture = {} class MockTrialRunner(TrialRunner): - def __init__( - self, - search_alg=None, - scheduler=None, - local_checkpoint_dir=None, - remote_checkpoint_dir=None, - sync_config=None, - stopper=None, - resume=False, - server_port=None, - fail_fast=False, - checkpoint_period=None, - trial_executor=None, - callbacks=None, - metric=None, - trial_checkpoint_config=None, - driver_sync_trial_checkpoints=True, - ): + def __init__(self, search_alg=None, scheduler=None, **kwargs): + # should be converted from strings at this case and not None capture["search_alg"] = search_alg capture["scheduler"] = scheduler super().__init__( search_alg=search_alg, scheduler=scheduler, - local_checkpoint_dir=local_checkpoint_dir, - remote_checkpoint_dir=remote_checkpoint_dir, - sync_config=sync_config, - stopper=stopper, - resume=resume, - server_port=server_port, - fail_fast=fail_fast, - checkpoint_period=checkpoint_period, - trial_executor=trial_executor, - callbacks=callbacks, - metric=metric, - trial_checkpoint_config=trial_checkpoint_config, - driver_sync_trial_checkpoints=driver_sync_trial_checkpoints, + **kwargs, ) with patch("ray.tune.tune.TrialRunner", MockTrialRunner): diff --git a/python/ray/tune/tests/test_trial_scheduler.py b/python/ray/tune/tests/test_trial_scheduler.py index 11a7aa37a670..e77585c78313 100644 --- a/python/ray/tune/tests/test_trial_scheduler.py +++ b/python/ray/tune/tests/test_trial_scheduler.py @@ -855,6 +855,8 @@ def __init__(self, i, config): self.resources = Resources(1, 0) self.custom_trial_name = None self.custom_dirname = None + self._local_dir = None + self.relative_logdir = None self._default_result_or_future = None self.checkpoint_manager = _CheckpointManager( checkpoint_config=CheckpointConfig( From 69a7221728f0a93874824355360b9e149204c027 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 17:35:29 -0800 Subject: [PATCH 19/23] Remove unnecessary import Signed-off-by: Justin Yu --- python/ray/tune/tests/test_trial_relative_logdir.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/tune/tests/test_trial_relative_logdir.py b/python/ray/tune/tests/test_trial_relative_logdir.py index da05ded1f1fb..f2f388ec6858 100644 --- a/python/ray/tune/tests/test_trial_relative_logdir.py +++ b/python/ray/tune/tests/test_trial_relative_logdir.py @@ -1,4 +1,3 @@ -import json import os import shutil import sys From fdeaa1967cfebfae06d9bc31370bd6e916467912 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 23:01:49 -0800 Subject: [PATCH 20/23] Remove really old backwards compatibility test Signed-off-by: Justin Yu --- .../tests/test_experiment_analysis_mem.py | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/python/ray/tune/tests/test_experiment_analysis_mem.py b/python/ray/tune/tests/test_experiment_analysis_mem.py index 315ceaca1fec..bb77ce055709 100644 --- a/python/ray/tune/tests/test_experiment_analysis_mem.py +++ b/python/ray/tune/tests/test_experiment_analysis_mem.py @@ -56,30 +56,6 @@ def load_checkpoint(self, checkpoint_path): def tearDown(self): shutil.rmtree(self.test_dir, ignore_errors=True) - def testInitLegacy(self): - """Should still work if checkpoints are not json strings""" - experiment_checkpoint_path = os.path.join( - self.test_dir, "experiment_state.json" - ) - checkpoint_data = { - "checkpoints": [ - { - "trial_id": "abcd1234", - "status": Trial.TERMINATED, - "trainable_name": "MockTrainable", - "local_dir": self.test_dir, - "relative_logdir": "MockTrainable_0_id=3_2020-07-12", - } - ] - } - - with open(experiment_checkpoint_path, "w") as f: - f.write(json.dumps(checkpoint_data)) - - experiment_analysis = ExperimentAnalysis(experiment_checkpoint_path) - self.assertEqual(len(experiment_analysis._checkpoints_and_paths), 1) - self.assertTrue(experiment_analysis.trials) - def testInit(self): trial = Trial( "MockTrainable", stub=True, trial_id="abcd1234", local_dir=self.test_dir From b54c30cbe7f9603a46ea67c41bb1928288c6173d Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 17 Jan 2023 23:02:51 -0800 Subject: [PATCH 21/23] Remove unnecessary import Signed-off-by: Justin Yu --- python/ray/tune/tests/test_experiment_analysis_mem.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/tune/tests/test_experiment_analysis_mem.py b/python/ray/tune/tests/test_experiment_analysis_mem.py index bb77ce055709..1033f7ff38af 100644 --- a/python/ray/tune/tests/test_experiment_analysis_mem.py +++ b/python/ray/tune/tests/test_experiment_analysis_mem.py @@ -1,4 +1,3 @@ -import json import unittest import shutil import tempfile From 2132cfb186d70d420b3780a245217a3d982b745b Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 23 Jan 2023 22:25:15 -0800 Subject: [PATCH 22/23] Remove unused decode fn Signed-off-by: Justin Yu --- python/ray/tune/analysis/experiment_analysis.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 241d7d2d9189..da57fe46c949 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -876,7 +876,3 @@ def make_stub_if_needed(trial: Trial) -> Trial: state["trials"] = [make_stub_if_needed(t) for t in state["trials"]] return state - - -def _decode_checkpoint_from_experiment_state(cp: Union[str, dict]) -> dict: - return json.loads(cp, cls=TuneFunctionDecoder) if isinstance(cp, str) else cp From 3257b9a9be9c52673ab5817ab9bf688f398b1500 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 23 Jan 2023 22:37:46 -0800 Subject: [PATCH 23/23] Clarify why the checkpoint gets moved Signed-off-by: Justin Yu --- python/ray/tune/tests/test_tuner_restore.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index 4a119d38da84..edca293e1dec 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -760,7 +760,7 @@ def test_tuner_restore_from_moved_experiment_path( def test_tuner_restore_from_moved_cloud_uri( ray_start_2_cpus, tmp_path, clear_memory_filesys ): - """Test that that restoring an experiment that was moved to a new remote URI + """Test that restoring an experiment that was moved to a new remote URI resumes and continues saving new results at that URI.""" def failing_fn(config): @@ -787,9 +787,11 @@ def failing_fn(config): upload_to_uri(str(tmp_path / "moved"), "memory:///moved") tuner = Tuner.restore("memory:///moved/new_exp_dir", resume_errored=True) - # This is needed because the mock memory:/// filesystem doesn't - # sync checkpoints properly, so this just copies the original checkpoint - # to the new local directory. + # Just for the test, since we're using `memory://` to mock a remote filesystem, + # the checkpoint needs to be copied to the new local directory. + # This is because the trainable actor uploads its checkpoints to a + # different `memory://` filesystem than the driver and is not + # downloaded along with the other parts of the experiment dir. # NOTE: A new local directory is used since the experiment name got modified. shutil.move( tmp_path / "ray_results/exp_dir/test/checkpoint_000000",