Skip to content

Commit

Permalink
[RLlib] Change config.fault_tolerance default behavior (from `recre…
Browse files Browse the repository at this point in the history
…ate_failed_env_runners=False` to `True`). (#48286)
  • Loading branch information
sven1977 authored Oct 29, 2024
1 parent 43c3663 commit 44237c6
Show file tree
Hide file tree
Showing 19 changed files with 91 additions and 70 deletions.
2 changes: 1 addition & 1 deletion doc/source/rllib/rllib-advanced-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ of such environment behavior:

Note that with or without parallel evaluation, all
:ref:`fault tolerance settings <rllib-scaling-guide>`, such as
``ignore_env_runner_failures`` or ``recreate_failed_env_runners`` are respected and applied
``ignore_env_runner_failures`` or ``restart_failed_env_runners`` are respected and applied
to the failed evaluation workers.

Here's an example:
Expand Down
4 changes: 2 additions & 2 deletions doc/source/rllib/rllib-fault-tolerance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The two properties that RLlib supports here are self-recovery and elasticity:
* **Self-Recovery**: When possible, RLlib will attempt to restore any :py:class:`~ray.rllib.env.env_runner.EnvRunner` that was previously removed. During restoration, RLlib syncs the latest state over to the restored :py:class:`~ray.rllib.env.env_runner.EnvRunner` before new episodes can be sampled.


Worker fault tolerance can be turned on by setting config ``recreate_failed_env_runners`` to True.
Worker fault tolerance can be turned on by setting ``config.fault_tolerance(restart_failed_env_runners=True)``.

RLlib achieves this by utilizing a
`state-aware and fault tolerant actor manager <https://github.com/ray-project/ray/blob/master/rllib/utils/actor_manager.py>`__. Under the hood, RLlib relies on Ray Core :ref:`actor fault tolerance <actor-fault-tolerance>` to automatically recover failed worker actors.
Expand All @@ -57,7 +57,7 @@ errors to higher level components. You can do that easily by turning on config
So for on-policy algorithms, it may be better to recover at worker level to make sure
training progresses with elastic worker set while the environments are being reconstructed.
More specifically, use configs ``num_envs_per_env_runner=1``, ``restart_failed_sub_environments=False``,
and ``recreate_failed_env_runners=True``.
and ``restart_failed_env_runners=True``.


Fault Tolerance and Recovery Provided by Ray Tune
Expand Down
14 changes: 7 additions & 7 deletions doc/source/rllib/rllib-training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,16 @@ inference. Make sure to set ``num_gpus: 1`` if you want to use a GPU. If the lea
4. Finally, if both model and environment are compute intensive, then enable `remote worker envs <rllib-env.html#vectorized>`__ with `async batching <rllib-env.html#vectorized>`__ by setting ``remote_worker_envs: True`` and optionally ``remote_env_batch_wait_ms``. This batches inference on GPUs in the rollout workers while letting envs run asynchronously in separate actors, similar to the `SEED <https://ai.googleblog.com/2020/03/massively-scaling-reinforcement.html>`__ architecture. The number of workers and number of envs per worker should be tuned to maximize GPU utilization.

In case you are using lots of workers (``num_env_runners >> 10``) and you observe worker failures for whatever reasons, which normally interrupt your RLlib training runs, consider using
the config settings ``ignore_env_runner_failures=True``, ``recreate_failed_env_runners=True``, or ``restart_failed_sub_environments=True``:
the config settings ``ignore_env_runner_failures=True``, ``restart_failed_env_runners=True``, or ``restart_failed_sub_environments=True``:

``ignore_env_runner_failures``: When set to True, your Algorithm will not crash due to a single worker error but continue for as long as there is at least one functional worker remaining.
``recreate_failed_env_runners``: When set to True, your Algorithm will attempt to replace/recreate any failed worker(s) with newly created one(s). This way, your number of workers will never decrease, even if some of them fail from time to time.
``restart_failed_sub_environments``: When set to True and there is a failure in one of the vectorized sub-environments in one of your workers, the worker will try to recreate only the failed sub-environment and re-integrate the newly created one into your vectorized env stack on that worker.
``restart_failed_env_runners``: When set to True (default), your Algorithm will attempt to restart any failed EnvRunner and replace it with a newly created one. This way, your number of workers will never decrease, even if some of them fail from time to time.
``ignore_env_runner_failures``: When set to True, your Algorithm will not crash due to an EnvRunner error, but continue for as long as there is at least one functional worker remaining. This setting is ignored when ``restart_failed_env_runners=True``.
``restart_failed_sub_environments``: When set to True and there is a failure in one of the vectorized sub-environments in one of your EnvRunners, RLlib tries to recreate only the failed sub-environment and re-integrate the newly created one into your vectorized env stack on that EnvRunner.

Note that only one of ``ignore_env_runner_failures`` or ``recreate_failed_env_runners`` may be set to True (they are mutually exclusive settings). However,
Note that only one of ``ignore_env_runner_failures`` or ``restart_failed_env_runners`` should be set to True (they are mutually exclusive settings). However,
you can combine each of these with the ``restart_failed_sub_environments=True`` setting.
Using these options will make your training runs much more stable and more robust against occasional OOM or other similar "once in a while" errors on your workers
themselves or inside your environments.
Using these options will make your training runs much more stable and more robust against occasional OOM or other similar "once in a while" errors on the EnvRunners
themselves or inside your custom environments.


Debugging RLlib Experiments
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ def _env_runner_remote(worker, num, round, iter):
" too unstable, b) you have enough evaluation workers "
"(`config.evaluation(evaluation_num_env_runners=...)`) to cover for "
"occasional losses, and c) you use the `config.fault_tolerance("
"recreate_failed_env_runners=True)` setting."
"restart_failed_env_runners=True)` setting."
)

if not self.config.enable_env_runner_and_connector_v2:
Expand Down Expand Up @@ -1552,7 +1552,7 @@ def _env_runner_remote(worker, num, round, iter):
" too unstable, b) you have enough evaluation workers "
"(`config.evaluation(evaluation_num_env_runners=...)`) to cover for "
"occasional losses, and c) you use the `config.fault_tolerance("
"recreate_failed_env_runners=True)` setting."
"restart_failed_env_runners=True)` setting."
)

if not self.config.enable_env_runner_and_connector_v2:
Expand Down
73 changes: 46 additions & 27 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,26 +528,22 @@ def __init__(self, algo_class: Optional[type] = None):
self._evaluation_parallel_to_training_wo_thread = False

# `self.fault_tolerance()`
# TODO (sven): Rename to `restart_..` to match other attributes AND ray's
# `ray.remote(max_num_restarts=..)([class])` setting.
self.recreate_failed_env_runners = False
self.restart_failed_env_runners = True
self.ignore_env_runner_failures = False
# By default, restart failed worker a thousand times.
# This should be enough to handle normal transient failures.
# This also prevents infinite number of restarts in case
# the worker or env has a bug.
# This also prevents infinite number of restarts in case the worker or env has
# a bug.
self.max_num_env_runner_restarts = 1000
# Small delay between worker restarts. In case rollout or
# evaluation workers have remote dependencies, this delay can be
# adjusted to make sure we don't flood them with re-connection
# requests, and allow them enough time to recover.
# This delay also gives Ray time to stream back error logging
# and exceptions.
# Small delay between worker restarts. In case EnvRunners or eval EnvRunners
# have remote dependencies, this delay can be adjusted to make sure we don't
# flood them with re-connection requests, and allow them enough time to recover.
# This delay also gives Ray time to stream back error logging and exceptions.
self.delay_between_env_runner_restarts_s = 60.0
self.restart_failed_sub_environments = False
self.num_consecutive_env_runner_failures_tolerance = 100
self.env_runner_health_probe_timeout_s = 30
self.env_runner_restore_timeout_s = 1800
self.env_runner_health_probe_timeout_s = 30.0
self.env_runner_restore_timeout_s = 1800.0

# `self.rl_module()`
self._model_config = {}
Expand Down Expand Up @@ -3230,7 +3226,7 @@ def debugging(
def fault_tolerance(
self,
*,
recreate_failed_env_runners: Optional[bool] = NotProvided,
restart_failed_env_runners: Optional[bool] = NotProvided,
ignore_env_runner_failures: Optional[bool] = NotProvided,
max_num_env_runner_restarts: Optional[int] = NotProvided,
delay_between_env_runner_restarts_s: Optional[float] = NotProvided,
Expand All @@ -3239,6 +3235,7 @@ def fault_tolerance(
env_runner_health_probe_timeout_s: Optional[float] = NotProvided,
env_runner_restore_timeout_s: Optional[float] = NotProvided,
# Deprecated args.
recreate_failed_env_runners=DEPRECATED_VALUE,
ignore_worker_failures=DEPRECATED_VALUE,
recreate_failed_workers=DEPRECATED_VALUE,
max_num_worker_restarts=DEPRECATED_VALUE,
Expand All @@ -3250,8 +3247,8 @@ def fault_tolerance(
"""Sets the config's fault tolerance settings.
Args:
recreate_failed_env_runners: Whether - upon an EnvRunner failure - RLlib
tries to recreate the lost EnvRunner(s) as an identical copy of the
restart_failed_env_runners: Whether - upon an EnvRunner failure - RLlib
tries to restart the lost EnvRunner(s) as an identical copy of the
failed one(s). You should set this to True when training on SPOT
instances that may preempt any time. The new, recreated EnvRunner(s)
only differ from the failed one in their `self.recreated_worker=True`
Expand All @@ -3260,11 +3257,11 @@ def fault_tolerance(
setting is ignored.
ignore_env_runner_failures: Whether to ignore any EnvRunner failures
and continue running with the remaining EnvRunners. This setting is
ignored, if `recreate_failed_env_runners=True`.
ignored, if `restart_failed_env_runners=True`.
max_num_env_runner_restarts: The maximum number of times any EnvRunner
is allowed to be restarted (if `recreate_failed_env_runners` is True).
is allowed to be restarted (if `restart_failed_env_runners` is True).
delay_between_env_runner_restarts_s: The delay (in seconds) between two
consecutive EnvRunner restarts (if `recreate_failed_env_runners` is
consecutive EnvRunner restarts (if `restart_failed_env_runners` is
True).
restart_failed_sub_environments: If True and any sub-environment (within
a vectorized env) throws any error during env stepping, the
Expand All @@ -3274,7 +3271,7 @@ def fault_tolerance(
num_consecutive_env_runner_failures_tolerance: The number of consecutive
times an EnvRunner failure (also for evaluation) is tolerated before
finally crashing the Algorithm. Only useful if either
`ignore_env_runner_failures` or `recreate_failed_env_runners` is True.
`ignore_env_runner_failures` or `restart_failed_env_runners` is True.
Note that for `restart_failed_sub_environments` and sub-environment
failures, the EnvRunner itself is NOT affected and won't throw any
errors as the flawed sub-environment is silently restarted under the
Expand All @@ -3290,6 +3287,12 @@ def fault_tolerance(
Returns:
This updated AlgorithmConfig object.
"""
if recreate_failed_env_runners != DEPRECATED_VALUE:
deprecation_warning(
old="AlgorithmConfig.fault_tolerance(recreate_failed_env_runners)",
new="AlgorithmConfig.fault_tolerance(restart_failed_env_runners)",
error=True,
)
if ignore_worker_failures != DEPRECATED_VALUE:
deprecation_warning(
old="AlgorithmConfig.fault_tolerance(ignore_worker_failures)",
Expand All @@ -3299,7 +3302,7 @@ def fault_tolerance(
if recreate_failed_workers != DEPRECATED_VALUE:
deprecation_warning(
old="AlgorithmConfig.fault_tolerance(recreate_failed_workers)",
new="AlgorithmConfig.fault_tolerance(recreate_failed_env_runners)",
new="AlgorithmConfig.fault_tolerance(restart_failed_env_runners)",
error=True,
)
if max_num_worker_restarts != DEPRECATED_VALUE:
Expand Down Expand Up @@ -3339,8 +3342,8 @@ def fault_tolerance(

if ignore_env_runner_failures is not NotProvided:
self.ignore_env_runner_failures = ignore_env_runner_failures
if recreate_failed_env_runners is not NotProvided:
self.recreate_failed_env_runners = recreate_failed_env_runners
if restart_failed_env_runners is not NotProvided:
self.restart_failed_env_runners = restart_failed_env_runners
if max_num_env_runner_restarts is not NotProvided:
self.max_num_env_runner_restarts = max_num_env_runner_restarts
if delay_between_env_runner_restarts_s is not NotProvided:
Expand Down Expand Up @@ -5150,6 +5153,22 @@ def rollouts(self, *args, **kwargs):
def exploration(self, *args, **kwargs):
return self.env_runners(*args, **kwargs)

@property
@Deprecated(
new="AlgorithmConfig.fault_tolerance(restart_failed_env_runners=..)",
error=False,
)
def recreate_failed_env_runners(self):
return self.restart_failed_env_runners

@recreate_failed_env_runners.setter
def recreate_failed_env_runners(self, value):
deprecation_warning(
old="AlgorithmConfig.recreate_failed_env_runners",
new="AlgorithmConfig.restart_failed_env_runners",
error=True,
)

@property
@Deprecated(new="AlgorithmConfig._enable_new_api_stack", error=False)
def _enable_new_api_stack(self):
Expand Down Expand Up @@ -5225,18 +5244,18 @@ def ignore_worker_failures(self, value):
self.ignore_env_runner_failures = value

@property
@Deprecated(new="AlgorithmConfig.recreate_failed_env_runners", error=False)
@Deprecated(new="AlgorithmConfig.restart_failed_env_runners", error=False)
def recreate_failed_workers(self):
return self.recreate_failed_env_runners
return self.restart_failed_env_runners

@recreate_failed_workers.setter
def recreate_failed_workers(self, value):
deprecation_warning(
old="AlgorithmConfig.recreate_failed_workers",
new="AlgorithmConfig.recreate_failed_env_runners",
new="AlgorithmConfig.restart_failed_env_runners",
error=False,
)
self.recreate_failed_env_runners = value
self.restart_failed_env_runners = value

@property
@Deprecated(new="AlgorithmConfig.max_num_env_runner_restarts", error=False)
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def _process_data(_actor, _episodes):
waiting_processed_sample_batches,
ignore_ray_errors=(
self.config.ignore_env_runner_failures
or self.config.recreate_failed_env_runners
or self.config.restart_failed_env_runners
),
)

Expand Down Expand Up @@ -1153,7 +1153,7 @@ def _process_episodes(actor, batch):
waiting_processed_sample_batches,
ignore_ray_errors=(
self.config.ignore_env_runner_failures
or self.config.recreate_failed_env_runners
or self.config.restart_failed_env_runners
),
)

Expand Down Expand Up @@ -1223,7 +1223,7 @@ def aggregate_into_larger_batch():
if (
self.config.batch_mode == "truncate_episodes"
and self.config.enable_connectors
and self.config.recreate_failed_env_runners
and self.config.restart_failed_env_runners
):
if any(
SampleBatch.VF_PREDS in pb
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/tests/test_callbacks_on_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_on_workers_recreated_callback(self):
.callbacks(OnWorkersRecreatedCallbacks)
.env_runners(num_env_runners=3)
.fault_tolerance(
recreate_failed_env_runners=True,
restart_failed_env_runners=True,
delay_between_env_runner_restarts_s=0,
)
)
Expand Down
Loading

0 comments on commit 44237c6

Please sign in to comment.