Skip to content

Commit

Permalink
[RLlib] Fix IMPALA/APPO learning behavior: Fix EnvRunner sync bug, GP…
Browse files Browse the repository at this point in the history
…U loader thread, enable local learner w/ GPU. (ray-project#48314)

Signed-off-by: mohitjain2504 <mohit.jain@dream11.com>
  • Loading branch information
sven1977 authored and mohitjain2504 committed Nov 15, 2024
1 parent 71e190c commit c6bbda9
Show file tree
Hide file tree
Showing 60 changed files with 540 additions and 450 deletions.
2 changes: 1 addition & 1 deletion doc/source/rllib/rllib-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ in roughly 5 minutes. It can be run as follows on a single g5.24xlarge (or g6.24
.. code-block:: bash
$ cd ray/rllib/tuned_examples/ppo
$ python atari_ppo.py --env=ale_py:ALE/Pong-v5 --num-gpus=4 --num-env-runners=95
$ python atari_ppo.py --env=ale_py:ALE/Pong-v5 --num-learners=4 --num-env-runners=95
Note that some of the files in this folder are used for RLlib's daily or weekly release tests as well.

Expand Down
6 changes: 3 additions & 3 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2716,7 +2716,7 @@

run:
timeout: 43200 # 12h
script: python learning_tests/tuned_examples/dreamerv3/atari_100k.py --framework=tf2 --env=ale_py:ALE/Pong-v5 --num-gpus=1 --stop-reward=15.0 --as-release-test
script: python learning_tests/tuned_examples/dreamerv3/atari_100k.py --framework=tf2 --env=ale_py:ALE/Pong-v5 --num-learners=1 --stop-reward=15.0 --as-release-test

alert: default

Expand Down Expand Up @@ -2751,7 +2751,7 @@

run:
timeout: 1200
script: python learning_tests/tuned_examples/ppo/atari_ppo.py --enable-new-api-stack --env=ale_py:ALE/Pong-v5 --num-gpus=4 --num-env-runners=95 --stop-reward=20.0 --as-release-test
script: python learning_tests/tuned_examples/ppo/atari_ppo.py --enable-new-api-stack --env=ale_py:ALE/Pong-v5 --num-learners=4 --num-env-runners=95 --stop-reward=20.0 --as-release-test

alert: default

Expand Down Expand Up @@ -2786,7 +2786,7 @@

run:
timeout: 7200
script: python learning_tests/tuned_examples/sac/halfcheetah_sac.py --enable-new-api-stack --num-gpus=4 --num-env-runners=8 --stop-reward=1000.0 --as-release-test
script: python learning_tests/tuned_examples/sac/halfcheetah_sac.py --enable-new-api-stack --num-learners=4 --num-env-runners=8 --stop-reward=1000.0 --as-release-test

alert: default

Expand Down
238 changes: 108 additions & 130 deletions rllib/BUILD

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1579,8 +1579,8 @@ def _env_runner_remote(worker, num, round, iter):
logger.warning(
"This evaluation iteration resulted in an empty set of episode summary "
"results! It's possible that your configured duration timesteps are not"
" enough to finish even a single episode. Your have configured "
f"{self.config.evaluation_duration}"
" enough to finish even a single episode. You have configured "
f"{self.config.evaluation_duration} "
f"{self.config.evaluation_duration_unit}. For 'timesteps', try "
"increasing this value via the `config.evaluation(evaluation_duration="
"...)` OR change the unit to 'episodes' via `config.evaluation("
Expand Down Expand Up @@ -3684,8 +3684,8 @@ def _run_one_training_iteration_and_evaluation_in_parallel_wo_thread(
logger.warning(
"This evaluation iteration resulted in an empty set of episode summary "
"results! It's possible that your configured duration timesteps are not"
" enough to finish even a single episode. Your have configured "
f"{self.config.evaluation_duration}"
" enough to finish even a single episode. You have configured "
f"{self.config.evaluation_duration} "
f"{self.config.evaluation_duration_unit}. For 'timesteps', try "
"increasing this value via the `config.evaluation(evaluation_duration="
"...)` OR change the unit to 'episodes' via `config.evaluation("
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def __init__(self, algo_class: Optional[type] = None):
self.num_gpus_per_env_runner = 0
self.custom_resources_per_env_runner = {}
self.validate_env_runners_after_construction = True
self.max_requests_in_flight_per_env_runner = 2
self.max_requests_in_flight_per_env_runner = 1
self.sample_timeout_s = 60.0
self.create_env_on_local_worker = False
self._env_to_module_connector = None
Expand Down
76 changes: 33 additions & 43 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
LEARNER_RESULTS,
LEARNER_UPDATE_TIMER,
MEAN_NUM_EPISODE_LISTS_RECEIVED,
MEAN_NUM_LEARNER_GROUP_RESULTS_RECEIVED,
MEAN_NUM_LEARNER_GROUP_UPDATE_CALLED,
NUM_AGENT_STEPS_SAMPLED,
NUM_AGENT_STEPS_SAMPLED_LIFETIME,
Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(self, algo_class=None):
self.vtrace_clip_rho_threshold = 1.0
self.vtrace_clip_pg_rho_threshold = 1.0
self.learner_queue_size = 3
self.max_requests_in_flight_per_env_runner = 2
self.max_requests_in_flight_per_env_runner = 1
self.max_requests_in_flight_per_aggregator_worker = 2
self.timeout_s_sampler_manager = 0.0
self.timeout_s_aggregator_manager = 0.0
Expand All @@ -159,10 +160,8 @@ def __init__(self, algo_class=None):
# global_norm, no matter the value of `grad_clip_by`.
self.grad_clip_by = "global_norm"

self.lr_schedule = None
self.vf_loss_coeff = 0.5
self.entropy_coeff = 0.01
self.entropy_coeff_schedule = None

# Override some of AlgorithmConfig's default values with IMPALA-specific values.
self.num_learners = 1
Expand All @@ -178,6 +177,8 @@ def __init__(self, algo_class=None):
# __sphinx_doc_end__
# fmt: on

self.lr_schedule = None # @OldAPIStack
self.entropy_coeff_schedule = None # @OldAPIStack
self.num_multi_gpu_tower_stacks = 1 # @OldAPIstack
self.minibatch_buffer_size = 1 # @OldAPIstack
self.replay_proportion = 0.0 # @OldAPIstack
Expand Down Expand Up @@ -413,15 +414,6 @@ def validate(self) -> None:
"than or equal to `total_train_batch_size` "
f"({self.total_train_batch_size})!"
)
# Make sure we have >=1 Learner and warn if `num_learners=0` (should only be
# used for debugging).
if self.num_learners == 0:
logger.warning(
f"{self} should only be run with `num_learners` >= 1! A value of 0 "
"(local learner) should only be used for debugging purposes as it "
"makes the algorithm non-asynchronous. When running with "
"`num_learners=0`, expect diminished learning capabilities."
)

elif isinstance(self.entropy_coeff, float) and self.entropy_coeff < 0.0:
raise ValueError("`entropy_coeff` must be >= 0.0")
Expand Down Expand Up @@ -644,18 +636,6 @@ def training_step(self) -> ResultDict:

# Log the average number of sample results (list of episodes) received.
self.metrics.log_value(MEAN_NUM_EPISODE_LISTS_RECEIVED, len(episode_refs))
self.metrics.log_value(
"_mean_num_episode_ts_received",
len(episode_refs)
* self.config.num_envs_per_env_runner
* self.config.get_rollout_fragment_length(),
)
self.metrics.log_value(
"_mean_num_episode_ts_received_using_reduced_metrics",
self.metrics.peek(
(ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED), default=0
),
)

# Log lifetime counts for env- and agent steps.
if env_runner_metrics:
Expand Down Expand Up @@ -695,6 +675,7 @@ def training_step(self) -> ResultDict:
)
rl_module_state = None
last_good_learner_results = None
num_learner_group_results_received = 0

for batch_ref_or_episode_list_ref in data_packages_for_learner_group:
if self.config.num_aggregation_workers:
Expand Down Expand Up @@ -727,7 +708,11 @@ def training_step(self) -> ResultDict:
)
if not do_async_updates:
learner_results = [learner_results]

for results_from_n_learners in learner_results:
if not results_from_n_learners[0]:
continue
num_learner_group_results_received += 1
for r in results_from_n_learners:
rl_module_state = r.pop(
"_rl_module_state_after_update", rl_module_state
Expand All @@ -737,6 +722,10 @@ def training_step(self) -> ResultDict:
key=LEARNER_RESULTS,
)
last_good_learner_results = results_from_n_learners
self.metrics.log_value(
key=MEAN_NUM_LEARNER_GROUP_RESULTS_RECEIVED,
value=num_learner_group_results_received,
)

# Update LearnerGroup's own stats.
self.metrics.log_dict(self.learner_group.get_stats(), key=LEARNER_GROUP)
Expand All @@ -754,10 +743,12 @@ def training_step(self) -> ResultDict:
# Figure out, whether we should sync/broadcast the (remote) EnvRunner states.
# Note: `learner_results` is a List of n (num async calls) Lists of m
# (num Learner workers) ResultDicts each.
self.metrics.log_value(
NUM_TRAINING_STEP_CALLS_SINCE_LAST_SYNCH_WORKER_WEIGHTS, 1, reduce="sum"
)
if last_good_learner_results:
# TODO (sven): Rename this metric into a more fitting name: ex.
# `NUM_LEARNER_UPDATED_SINCE_LAST_WEIGHTS_SYNC`
self.metrics.log_value(
NUM_TRAINING_STEP_CALLS_SINCE_LAST_SYNCH_WORKER_WEIGHTS, 1, reduce="sum"
)
# Merge available EnvRunner states into local worker's EnvRunner state.
# Broadcast merged EnvRunner state AND new model weights back to all remote
# EnvRunners that - in this call - had returned samples.
Expand All @@ -774,7 +765,6 @@ def training_step(self) -> ResultDict:
with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)):
self.env_runner_group.sync_env_runner_states(
config=self.config,
env_runner_indices_to_update=env_runner_indices_to_update,
env_steps_sampled=self.metrics.peek(
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
Expand Down Expand Up @@ -809,15 +799,15 @@ def _remote_sample_get_state_and_metrics(_worker):

# Perform asynchronous sampling on all (healthy) remote rollout workers.
if num_healthy_remote_workers > 0:
self.env_runner_group.foreach_worker_async(
_remote_sample_get_state_and_metrics
)
async_results: List[
Tuple[int, ObjectRef]
] = self.env_runner_group.fetch_ready_async_reqs(
timeout_seconds=self.config.timeout_s_sampler_manager,
return_obj_refs=False,
)
self.env_runner_group.foreach_worker_async(
_remote_sample_get_state_and_metrics
)
# Get results from the n different async calls and store those EnvRunner
# indices we should update.
results = []
Expand Down Expand Up @@ -847,7 +837,7 @@ def _remote_sample_get_state_and_metrics(_worker):
episode_refs,
connector_states,
env_runner_metrics,
list(env_runner_indices_to_update),
env_runner_indices_to_update,
)

def _pre_queue_episode_refs(
Expand Down Expand Up @@ -948,18 +938,18 @@ def default_resource_request(
# from RolloutWorkers (n rollout workers map to m
# aggregation workers, where m < n) and always use 1 CPU
# each.
"CPU": max(
cf.num_cpus_for_main_process,
cf.num_cpus_per_learner if cf.num_learners == 0 else 0,
)
+ cf.num_aggregation_workers,
# Ignore `cf.num_gpus` on the new API stack.
"CPU": (
max(
cf.num_cpus_for_main_process,
cf.num_cpus_per_learner if cf.num_learners == 0 else 0,
)
+ cf.num_aggregation_workers
),
# Use n GPUs if we have a local Learner (num_learners=0).
"GPU": (
0
if cf._fake_gpus
else cf.num_gpus
if not cf.enable_rl_module_and_learner
else 0
(cf.num_gpus_per_learner if cf.num_learners == 0 else 0)
if cf.enable_rl_module_and_learner
else (0 if cf._fake_gpus else cf.num_gpus)
),
}
]
Expand Down
83 changes: 62 additions & 21 deletions rllib/algorithms/impala/impala_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray.rllib.algorithms.impala.impala import LEARNER_RESULTS_CURR_ENTROPY_COEFF_KEY
from ray.rllib.core.columns import Columns
from ray.rllib.core.learner.learner import Learner
from ray.rllib.connectors.common import NumpyToTensor
from ray.rllib.connectors.learner import AddOneTsToEpisodesAndTruncate
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.utils.annotations import (
Expand All @@ -33,6 +34,7 @@
GPU_LOADER_QUEUE_WAIT_TIMER = "gpu_loader_queue_wait_timer"
GPU_LOADER_LOAD_TO_GPU_TIMER = "gpu_loader_load_to_gpu_timer"
LEARNER_THREAD_IN_QUEUE_WAIT_TIMER = "learner_thread_in_queue_wait_timer"
LEARNER_THREAD_ENV_STEPS_DROPPED = "learner_thread_env_steps_dropped"
LEARNER_THREAD_UPDATE_TIMER = "learner_thread_update_timer"
RAY_GET_EPISODES_TIMER = "ray_get_episodes_timer"
EPISODES_TO_BATCH_TIMER = "episodes_to_batch_timer"
Expand Down Expand Up @@ -60,14 +62,18 @@ def build(self) -> None:
)
)

# Extend all episodes by one artificual timestep to allow the value function net
# Extend all episodes by one artificial timestep to allow the value function net
# to compute the bootstrap values (and add a mask to the batch to know, which
# slots to mask out).
if (
self._learner_connector is not None
and self.config.add_default_connectors_to_learner_pipeline
):
self._learner_connector.prepend(AddOneTsToEpisodesAndTruncate())
# Leave all batches on the CPU (they'll be moved to the GPU, if applicable,
# by the n GPU loader threads.
numpy_to_tensor_connector = self._learner_connector[NumpyToTensor][0]
numpy_to_tensor_connector._device = "cpu" # TODO (sven): Provide API?

# Create and start the GPU-loader thread. It picks up train-ready batches from
# the "GPU-loader queue" and loads them to the GPU, then places the GPU batches
Expand All @@ -78,17 +84,18 @@ def build(self) -> None:
self._learner_thread_out_queue = Queue()

# Create and start the GPU loader thread(s).
self._gpu_loader_threads = [
_GPULoaderThread(
in_queue=self._gpu_loader_in_queue,
out_queue=self._learner_thread_in_queue,
device=self._device,
metrics_logger=self.metrics,
)
for _ in range(self.config.num_gpu_loader_threads)
]
for t in self._gpu_loader_threads:
t.start()
if self.config.num_gpus_per_learner > 0:
self._gpu_loader_threads = [
_GPULoaderThread(
in_queue=self._gpu_loader_in_queue,
out_queue=self._learner_thread_in_queue,
device=self._device,
metrics_logger=self.metrics,
)
for _ in range(self.config.num_gpu_loader_threads)
]
for t in self._gpu_loader_threads:
t.start()

# Create and start the Learner thread.
self._learner_thread = _LearnerThread(
Expand Down Expand Up @@ -144,11 +151,22 @@ def update_from_episodes(
)

# Queue the CPU batch to the GPU-loader thread.
self._gpu_loader_in_queue.put((batch, env_steps))
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE),
self._gpu_loader_in_queue.qsize(),
)
if self.config.num_gpus_per_learner > 0:
self._gpu_loader_in_queue.put((batch, env_steps))
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE),
self._gpu_loader_in_queue.qsize(),
)
else:
# Enqueue to Learner thread's in-queue.
_LearnerThread.enqueue(
self._learner_thread_in_queue,
MultiAgentBatch(
{mid: SampleBatch(b) for mid, b in batch.items()},
env_steps=env_steps,
),
self.metrics,
)

# Return all queued result dicts thus far (after reducing over them).
results = {}
Expand Down Expand Up @@ -200,6 +218,7 @@ def __init__(

self._in_queue = in_queue
self._out_queue = out_queue
self._ts_dropped = 0
self._device = device
self.metrics = metrics_logger

Expand Down Expand Up @@ -227,10 +246,8 @@ def _step(self) -> None:
policy_batches={mid: SampleBatch(b) for mid, b in batch_on_gpu.items()},
env_steps=env_steps,
)
self._out_queue.append(ma_batch_on_gpu)
self.metrics.log_value(
(ALL_MODULES, QUEUE_SIZE_LEARNER_THREAD_QUEUE), len(self._out_queue)
)
# Enqueue to Learner thread's in-queue.
_LearnerThread.enqueue(self._out_queue, ma_batch_on_gpu, self.metrics)


class _LearnerThread(threading.Thread):
Expand Down Expand Up @@ -296,3 +313,27 @@ def step(self):
(ALL_MODULES, QUEUE_SIZE_RESULTS_QUEUE),
self._out_queue.qsize(),
)

@staticmethod
def enqueue(learner_queue, batch, metrics_logger):
# Right-append to learner queue (a deque). If full, drops the leftmost
# (oldest) item in the deque. Note that we consume from the right
# (newest first), which is why the queue size should probably always be 1,
# otherwise we run into the danger of training with very old samples.
# ts_dropped = 0
# if len(learner_queue) == learner_queue.maxlen:
# ts_dropped = learner_queue.popleft().env_steps()
learner_queue.append(batch)
# TODO (sven): This metric will not show correctly on the Algo side (main
# logger), b/c of the bug in the metrics not properly "upstreaming" reduce=sum
# metrics (similarly: ENV_RUNNERS/NUM_ENV_STEPS_SAMPLED grows exponentially
# on the main algo's logger).
# metrics_logger.log_value(
# LEARNER_THREAD_ENV_STEPS_DROPPED, ts_dropped, reduce="sum"
# )

# Log current queue size.
metrics_logger.log_value(
(ALL_MODULES, QUEUE_SIZE_LEARNER_THREAD_QUEUE),
len(learner_queue),
)
Loading

0 comments on commit c6bbda9

Please sign in to comment.