Skip to content

Commit

Permalink
[RLlib] Add "shuffle batch per epoch" option. (#47458)
Browse files Browse the repository at this point in the history
  • Loading branch information
sven1977 authored Sep 17, 2024
1 parent ee320aa commit ed5b382
Show file tree
Hide file tree
Showing 113 changed files with 591 additions and 603 deletions.
36 changes: 36 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,13 @@ def __init__(self, algo_class: Optional[type] = None):
# Simple logic for now: If None, use `train_batch_size`.
self.train_batch_size_per_learner = None
self.train_batch_size = 32 # @OldAPIStack

# These setting have been adopted from the original PPO batch settings:
# num_sgd_iter, minibatch_size, and shuffle_sequences.
self.num_epochs = 1
self.minibatch_size = None
self.shuffle_batch_per_epoch = False

# TODO (sven): Unsolved problem with RLModules sometimes requiring settings from
# the main AlgorithmConfig. We should not require the user to provide those
# settings in both, the AlgorithmConfig (as property) AND the model config
Expand Down Expand Up @@ -2064,6 +2071,9 @@ def training(
grad_clip_by: Optional[str] = NotProvided,
train_batch_size: Optional[int] = NotProvided,
train_batch_size_per_learner: Optional[int] = NotProvided,
num_epochs: Optional[int] = NotProvided,
minibatch_size: Optional[int] = NotProvided,
shuffle_batch_per_epoch: Optional[bool] = NotProvided,
model: Optional[dict] = NotProvided,
optimizer: Optional[dict] = NotProvided,
max_requests_in_flight_per_sampler_worker: Optional[int] = NotProvided,
Expand All @@ -2073,6 +2083,8 @@ def training(
] = NotProvided,
add_default_connectors_to_learner_pipeline: Optional[bool] = NotProvided,
learner_config_dict: Optional[Dict[str, Any]] = NotProvided,
# Deprecated args.
num_sgd_iter=DEPRECATED_VALUE,
) -> "AlgorithmConfig":
"""Sets the training related configuration.
Expand Down Expand Up @@ -2122,6 +2134,15 @@ def training(
stack, this setting should no longer be used. Instead, use
`train_batch_size_per_learner` (in combination with
`num_learners`).
num_epochs: The number of complete passes over the entire train batch (per
Learner). Each pass might be further split into n minibatches (if
`minibatch_size` provided).
minibatch_size: The size of minibatches to use to further split the train
batch into.
shuffle_batch_per_epoch: Whether to shuffle the train batch once per epoch.
If the train batch has a time rank (axis=1), shuffling will only take
place along the batch axis to not disturb any intact (episode)
trajectories.
model: Arguments passed into the policy model. See models/catalog.py for a
full list of the available model options.
TODO: Provide ModelConfig objects instead of dicts.
Expand Down Expand Up @@ -2168,6 +2189,14 @@ def training(
Returns:
This updated AlgorithmConfig object.
"""
if num_sgd_iter != DEPRECATED_VALUE:
deprecation_warning(
old="config.training(num_sgd_iter=..)",
new="config.training(num_epochs=..)",
error=False,
)
num_epochs = num_sgd_iter

if gamma is not NotProvided:
self.gamma = gamma
if lr is not NotProvided:
Expand All @@ -2185,6 +2214,13 @@ def training(
self.train_batch_size_per_learner = train_batch_size_per_learner
if train_batch_size is not NotProvided:
self.train_batch_size = train_batch_size
if num_epochs is not NotProvided:
self.num_epochs = num_epochs
if minibatch_size is not NotProvided:
self.minibatch_size = minibatch_size
if shuffle_batch_per_epoch is not NotProvided:
self.shuffle_batch_per_epoch = shuffle_batch_per_epoch

if model is not NotProvided:
self.model.update(model)
if (
Expand Down
32 changes: 16 additions & 16 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,19 @@ def __init__(self, algo_class=None):
self.use_kl_loss = False
self.kl_coeff = 1.0
self.kl_target = 0.01
# TODO (sven): Activate once v-trace sequences in non-RNN batch are solved.
# If we switch this on right now, the shuffling would destroy the rollout
# sequences (non-zero-padded!) needed in the batch for v-trace.
# self.shuffle_batch_per_epoch = True

# Override some of IMPALAConfig's default values with APPO-specific values.
self.num_env_runners = 2
self.min_time_s_per_iteration = 10
self.num_gpus = 0
self.num_multi_gpu_tower_stacks = 1
self.minibatch_buffer_size = 1
self.num_sgd_iter = 1
self.target_network_update_freq = 1
self.replay_proportion = 0.0
self.replay_buffer_num_slots = 100
self.learner_queue_size = 16
self.learner_queue_timeout = 300
self.max_sample_requests_in_flight_per_worker = 2
self.broadcast_interval = 1

self.grad_clip = 40.0
# Note: Only when using enable_rl_module_and_learner=True can the clipping mode
# be configured by the user. On the old API stack, RLlib will always clip by
Expand All @@ -140,6 +137,12 @@ def __init__(self, algo_class=None):
# Add constructor kwargs here (if any).
}

self.num_gpus = 0 # @OldAPIStack
self.num_multi_gpu_tower_stacks = 1 # @OldAPIStack
self.minibatch_buffer_size = 1 # @OldAPIStack
self.replay_proportion = 0.0 # @OldAPIStack
self.replay_buffer_num_slots = 100 # @OldAPIStack

# __sphinx_doc_end__
# fmt: on

Expand Down Expand Up @@ -185,13 +188,10 @@ def training(
target_network_update_freq: The frequency to update the target policy and
tune the kl loss coefficients that are used during training. After
setting this parameter, the algorithm waits for at least
`target_network_update_freq * minibatch_size * num_sgd_iter` number of
samples to be trained on by the learner group before updating the target
networks and tuned the kl loss coefficients that are used during
training.
NOTE: This parameter is only applicable when using the Learner API
(enable_rl_module_and_learner=True).
`target_network_update_freq` number of environment samples to be trained
on before updating the target networks and tune the kl loss
coefficients. NOTE: This parameter is only applicable when using the
Learner API (enable_rl_module_and_learner=True).
Returns:
This updated AlgorithmConfig object.
Expand Down Expand Up @@ -292,7 +292,7 @@ def training_step(self) -> ResultDict:

# Update the target network and the KL coefficient for the APPO-loss.
# The target network update frequency is calculated automatically by the product
# of `num_sgd_iter` setting (usually 1 for APPO) and `minibatch_buffer_size`.
# of `num_epochs` setting (usually 1 for APPO) and `minibatch_buffer_size`.
if self.config.enable_rl_module_and_learner:
if NUM_TARGET_UPDATES in train_results:
self._counters[NUM_TARGET_UPDATES] += train_results[NUM_TARGET_UPDATES]
Expand All @@ -309,7 +309,7 @@ def training_step(self) -> ResultDict:
)
]
target_update_freq = (
self.config.num_sgd_iter * self.config.minibatch_buffer_size
self.config.num_epochs * self.config.minibatch_buffer_size
)
if cur_ts - last_update > target_update_freq:
self._counters[NUM_TARGET_UPDATES] += 1
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/appo/appo_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ def after_gradient_based_update(self, *, timesteps: Dict[str, Any]) -> None:
# TODO (avnish) Using steps trained here instead of sampled ... I'm not sure
# why the other implementation uses sampled.
# The difference in steps sampled/trained is pretty
# much always going to be larger than self.config.num_sgd_iter *
# much always going to be larger than self.config.num_epochs *
# self.config.minibatch_buffer_size unless the number of steps collected
# is really small. The thing is that the default rollout fragment length
# is 50, so the minibatch buffer size * num_sgd_iter is going to be
# is 50, so the minibatch buffer size * num_epochs is going to be
# have to be 50 to even meet the threshold of having delayed target
# updates.
# We should instead have the target / kl threshold update be based off
# of the train_batch_size * some target update frequency * num_sgd_iter.
# of the train_batch_size * some target update frequency * num_epochs.

last_update_ts_key = (module_id, LAST_TARGET_UPDATE_TS)
if timestep - self.metrics.peek(
Expand Down
8 changes: 4 additions & 4 deletions rllib/algorithms/cql/cql.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def _training_step_new_api_stack(self) -> ResultDict:
# Sampling from offline data.
with self.metrics.log_time((TIMERS, OFFLINE_SAMPLING_TIMER)):
# Return an iterator in case we are using remote learners.
batch = self.offline_data.sample(
batch_or_iterator = self.offline_data.sample(
num_samples=self.config.train_batch_size_per_learner,
num_shards=self.config.num_learners,
return_iterator=self.config.num_learners > 1,
Expand All @@ -315,9 +315,9 @@ def _training_step_new_api_stack(self) -> ResultDict:
# Updating the policy.
with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
# TODO (simon, sven): Check, if we should execute directly s.th. like
# update_from_iterator.
learner_results = self.learner_group.update_from_batch(
batch,
# `LearnerGroup.update_from_iterator()`.
learner_results = self.learner_group._update(
batch=batch_or_iterator,
minibatch_size=self.config.train_batch_size_per_learner,
num_iters=self.config.dataset_num_iters_per_learner,
)
Expand Down
51 changes: 14 additions & 37 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def __init__(self, algo_class=None):
self.vtrace_clip_pg_rho_threshold = 1.0
self.num_multi_gpu_tower_stacks = 1 # @OldAPIstack
self.minibatch_buffer_size = 1 # @OldAPIstack
self.num_sgd_iter = 1
self.replay_proportion = 0.0 # @OldAPIstack
self.replay_buffer_num_slots = 0 # @OldAPIstack
self.learner_queue_size = 3
Expand Down Expand Up @@ -168,10 +167,10 @@ def __init__(self, algo_class=None):
self._lr_vf = 0.0005 # @OldAPIstack

# Override some of AlgorithmConfig's default values with IMPALA-specific values.
self.num_learners = 1
self.rollout_fragment_length = 50
self.train_batch_size = 500 # @OldAPIstack
self.train_batch_size_per_learner = 500
self._minibatch_size = "auto"
self.num_env_runners = 2
self.num_gpus = 1 # @OldAPIstack
self.lr = 0.0005
Expand Down Expand Up @@ -200,8 +199,6 @@ def training(
num_gpu_loader_threads: Optional[int] = NotProvided,
num_multi_gpu_tower_stacks: Optional[int] = NotProvided,
minibatch_buffer_size: Optional[int] = NotProvided,
minibatch_size: Optional[Union[int, str]] = NotProvided,
num_sgd_iter: Optional[int] = NotProvided,
replay_proportion: Optional[float] = NotProvided,
replay_buffer_num_slots: Optional[int] = NotProvided,
learner_queue_size: Optional[int] = NotProvided,
Expand Down Expand Up @@ -252,15 +249,7 @@ def training(
- This enables us to preload data into these stacks while another stack
is performing gradient calculations.
minibatch_buffer_size: How many train batches should be retained for
minibatching. This conf only has an effect if `num_sgd_iter > 1`.
minibatch_size: The size of minibatches that are trained over during
each SGD iteration. If "auto", will use the same value as
`train_batch_size`.
Note that this setting only has an effect if
`enable_rl_module_and_learner=True` and it must be a multiple of
`rollout_fragment_length` or `sequence_length` and smaller than or equal
to `train_batch_size`.
num_sgd_iter: Number of passes to make over each train batch.
minibatching. This conf only has an effect if `num_epochs > 1`.
replay_proportion: Set >0 to enable experience replay. Saved samples will
be replayed with a p:1 proportion to new data samples.
replay_buffer_num_slots: Number of sample batches to store for replay.
Expand Down Expand Up @@ -330,8 +319,6 @@ def training(
self.num_multi_gpu_tower_stacks = num_multi_gpu_tower_stacks
if minibatch_buffer_size is not NotProvided:
self.minibatch_buffer_size = minibatch_buffer_size
if num_sgd_iter is not NotProvided:
self.num_sgd_iter = num_sgd_iter
if replay_proportion is not NotProvided:
self.replay_proportion = replay_proportion
if replay_buffer_num_slots is not NotProvided:
Expand Down Expand Up @@ -374,8 +361,6 @@ def training(
self._separate_vf_optimizer = _separate_vf_optimizer
if _lr_vf is not NotProvided:
self._lr_vf = _lr_vf
if minibatch_size is not NotProvided:
self._minibatch_size = minibatch_size

return self

Expand Down Expand Up @@ -452,14 +437,14 @@ def validate(self) -> None:
# Learner API specific checks.
if (
self.enable_rl_module_and_learner
and self._minibatch_size != "auto"
and self.minibatch_size is not None
and not (
(self.minibatch_size % self.rollout_fragment_length == 0)
and self.minibatch_size <= self.total_train_batch_size
)
):
raise ValueError(
f"`minibatch_size` ({self._minibatch_size}) must either be 'auto' "
f"`minibatch_size` ({self._minibatch_size}) must either be None "
"or a multiple of `rollout_fragment_length` "
f"({self.rollout_fragment_length}) while at the same time smaller "
"than or equal to `total_train_batch_size` "
Expand All @@ -474,20 +459,6 @@ def replay_ratio(self) -> float:
"""
return (1 / self.replay_proportion) if self.replay_proportion > 0 else 0.0

@property
def minibatch_size(self):
# If 'auto', use the train_batch_size (meaning each SGD iter is a single pass
# through the entire train batch). Otherwise, use user provided setting.
return (
(
self.train_batch_size_per_learner
if self.enable_env_runner_and_connector_v2
else self.train_batch_size
)
if self._minibatch_size == "auto"
else self._minibatch_size
)

@override(AlgorithmConfig)
def get_default_learner_class(self):
if self.framework_str == "torch":
Expand Down Expand Up @@ -539,7 +510,7 @@ class IMPALA(Algorithm):
2. If enabled, the replay buffer stores and produces batches of size
`rollout_fragment_length * num_envs_per_env_runner`.
3. If enabled, the minibatch ring buffer stores and replays batches of
size `train_batch_size` up to `num_sgd_iter` times per batch.
size `train_batch_size` up to `num_epochs` times per batch.
4. The learner thread executes data parallel SGD across `num_gpus` GPUs
on batches of size `train_batch_size`.
"""
Expand Down Expand Up @@ -734,6 +705,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
else:
learner_results = self.learner_group.update_from_episodes(
Expand All @@ -745,6 +719,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
if not do_async_updates:
learner_results = [learner_results]
Expand Down Expand Up @@ -1292,7 +1269,7 @@ def _learn_on_processed_samples(self) -> ResultDict:
),
},
async_update=async_update,
num_iters=self.config.num_sgd_iter,
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
)
if not async_update:
Expand Down Expand Up @@ -1531,7 +1508,7 @@ def make_learner_thread(local_worker, config):
lr=config["lr"],
train_batch_size=config["train_batch_size"],
num_multi_gpu_tower_stacks=config["num_multi_gpu_tower_stacks"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
num_data_load_threads=config["num_gpu_loader_threads"],
Expand All @@ -1540,7 +1517,7 @@ def make_learner_thread(local_worker, config):
learner_thread = LearnerThread(
local_worker,
minibatch_buffer_size=config["minibatch_buffer_size"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
)
Expand Down
Loading

0 comments on commit ed5b382

Please sign in to comment.