From f097c4a0b0379713a0391267f4a26fb9e68e2919 Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Wed, 14 Dec 2022 16:54:26 +0900 Subject: [PATCH 01/18] per-micro-batch input loader --- .../megatron/data_samplers.py | 133 +++++++++++++----- .../language_modeling/megatron_base_model.py | 15 +- .../language_modeling/megatron_gpt_model.py | 109 +++++++------- nemo/utils/exp_manager.py | 3 +- 4 files changed, 157 insertions(+), 103 deletions(-) diff --git a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py index fd6120ff47fa..a2e7145adb52 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py @@ -14,16 +14,54 @@ """Dataloaders.""" +import abc +from typing import Optional import torch from nemo.utils import logging -class MegatronPretrainingSampler: +class BaseMegatronSampler: def __init__( - self, total_samples, consumed_samples, micro_batch_size, data_parallel_rank, data_parallel_size, drop_last=True - ): + self, + total_samples: int, + consumed_samples: int, + micro_batch_size: int, + data_parallel_rank: int, + data_parallel_size: int, + drop_last: bool, + global_batch_size: Optional[int] = None, + pad_samples_to_global_batch_size: Optional[bool] = False, + ) -> None: + # Sanity checks. + if total_samples <= 0: + raise RuntimeError("no sample to consume: {}".format(total_samples)) + if consumed_samples >= total_samples: + raise RuntimeError("no samples left to consume: {}, {}".format(consumed_samples, total_samples)) + if micro_batch_size <= 0: + raise RuntimeError(f"micro_batch_size size must be greater than 0, but {micro_batch_size}") + if data_parallel_size <= 0: + raise RuntimeError(f"data parallel size must be greater than 0, but {data_parallel_size}") + if data_parallel_rank >= data_parallel_size: + raise RuntimeError( + "data_parallel_rank should be smaller than data size, but {} >= {}".format( + data_parallel_rank, data_parallel_size + ) + ) + if global_batch_size is not None: + if global_batch_size % (self.micro_batch_size * data_parallel_size) != 0: + raise RuntimeError( + f"`global_batch_size` ({self._global_batch_size}) is not divisible by " + f"`micro_batch_size ({self.micro_batch_size}) x data_parallel_size " + f"({data_parallel_size})`" + ) + if self.pad_samples_to_global_batch_size and self.global_batch_size is None: + raise RuntimeError( + f"`pad_samples_to_global_batch_size` can be `True` only when " + f"`global_batch_size` is set to an integer value" + ) + # Keep a copy of input params for later use. self.total_samples = total_samples self.consumed_samples = consumed_samples @@ -31,26 +69,29 @@ def __init__( self.data_parallel_rank = data_parallel_rank self.micro_batch_times_data_parallel_size = self.micro_batch_size * data_parallel_size self.drop_last = drop_last + self.global_batch_size = global_batch_size + self.pad_samples_to_global_batch_size = pad_samples_to_global_batch_size logging.info( f'Instantiating MegatronPretrainingSampler with total_samples: {total_samples} and consumed_samples: {consumed_samples}' ) - # Sanity checks. - assert self.total_samples > 0, 'no sample to consume: {}'.format(self.total_samples) - assert self.consumed_samples < self.total_samples, 'no samples left to consume: {}, {}'.format( - self.consumed_samples, self.total_samples - ) - assert self.micro_batch_size > 0 - assert data_parallel_size > 0 - assert self.data_parallel_rank < data_parallel_size, ( - 'data_parallel_rank should be smaller than data size: {}, ' - '{}'.format(self.data_parallel_rank, data_parallel_size) - ) - def __len__(self): - return (self.total_samples - self.consumed_samples - 1) // self.micro_batch_times_data_parallel_size + 1 + num_available_samples: int = self.total_samples - self.consumed_samples + if self.global_batch_size is not None: + if self.drop_last: + return num_available_samples // self.global_batch_size + else: + return (num_available_samples + self.global_batch_size - 1) // self.global_batch_size + else: + return (num_available_samples - 1) // self.micro_batch_times_data_parallel_size + 1 + + @abc.abstractmethod + def __iter__(self): + ... + +class MegatronPretrainingSampler(BaseMegatronSampler): def get_start_end_idx(self): start_idx = self.data_parallel_rank * self.micro_batch_size end_idx = start_idx + self.micro_batch_size @@ -68,32 +109,42 @@ def __iter__(self): # Check the last partial batch and see drop_last is set if len(batch) > 0 and not self.drop_last: - start_idx, end_idx = self.get_start_end_idx() - yield batch[start_idx:end_idx] - + if self.pad_samples_to_global_batch_size: + for i in range(self.data_parallel_rank, self.global_batch_size, self.micro_batch_times_data_parallel_size): + indices = [batch[j] for j in range(i, max(len(batch), i + self.micro_batch_size))] + num_pad = self.micro_batch_size - len(indices) + indices = indices + [-1] * num_pad + yield indices + else: + start_idx, end_idx = self.get_start_end_idx() + yield batch[start_idx:end_idx] -class MegatronPretrainingRandomSampler: - def __init__(self, total_samples, consumed_samples, micro_batch_size, data_parallel_rank, data_parallel_size): - # Keep a copy of input params for later use. - self.total_samples = total_samples - self.consumed_samples = consumed_samples - self.micro_batch_size = micro_batch_size - self.data_parallel_rank = data_parallel_rank - self.data_parallel_size = data_parallel_size - self.micro_batch_times_data_parallel_size = self.micro_batch_size * data_parallel_size - self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size - # Sanity checks. - assert self.total_samples > 0, 'no sample to consume: {}'.format(self.total_samples) - assert self.micro_batch_size > 0 - assert data_parallel_size > 0 - assert self.data_parallel_rank < data_parallel_size, ( - 'data_parallel_rank should be smaller than data size: {}, ' - '{}'.format(self.data_parallel_rank, data_parallel_size) +class MegatronPretrainingRandomSampler(BaseMegatronSampler): + def __init__( + self, + total_samples: int, + consumed_samples: int, + micro_batch_size: int, + data_parallel_rank: int, + data_parallel_size: int, + drop_last: bool, + global_batch_size: Optional[int] = None, + pad_samples_to_global_batch_size: Optional[bool] = False, + ) -> None: + super().__init__( + total_samples=total_samples, + consumed_samples=consumed_samples, + micro_batch_size=micro_batch_size, + data_parallel_rank=data_parallel_rank, + data_parallel_size=data_parallel_size, + drop_last=drop_last, + global_batch_size=global_batch_size, + pad_samples_to_global_batch_size=pad_samples_to_global_batch_size, ) - - def __len__(self): - return self.total_samples + assert self.pad_samples_to_global_batch_size == False, \ + "`MegatronPretrainingRandomSampler` does not support sample padding" + self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size def __iter__(self): active_total_samples = self.total_samples - self.last_batch_size @@ -119,3 +170,7 @@ def __iter__(self): self.consumed_samples += self.micro_batch_times_data_parallel_size yield batch batch = [] + + # Check the last partial batch and see drop_last is set + if len(batch) > 0 and not self.drop_last: + yield batch diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 7f2c0befce6b..e68ff9b74069 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -122,6 +122,15 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): "default_on_epoch": False, } + # Convert the global-batch-based profile index to micro-batch index + if hasattr(self, '_nsys_profile_enabled'): + grad_accum_steps = ( + cfg.get('global_batch_size') // + (cfg.get('micro_batch_size') * parallel_state.get_data_parallel_world_size()) + ) + self._nsys_profile_start_step *= grad_accum_steps + self._nsys_profile_end_step *= grad_accum_steps + def _enable_nvidia_optimizations(self): "These optimizations are present in NVIDIA NGC PyTorch Containers" @@ -242,7 +251,7 @@ def configure_gradient_clipping(self, *args, **kwargs): parameters = self._get_parameters() grad_norm = clip_grad_norm_fp32(parameters=parameters, max_norm=clip_val) - self.log('grad_norm', grad_norm, rank_zero_only=True) + self.log('grad_norm', grad_norm, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) def allreduce_gradients(self): """Reduce gradients across data parallel ranks. @@ -282,8 +291,8 @@ def reduce_overlap_gradients(self): p for p in self._optimizer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) ) - def on_train_batch_end(self, outputs, batch, batch_idx: int, unused: Optional[int] = 0) -> None: - super().on_train_batch_end(outputs, batch, batch_idx) + def on_train_batch_end(self, outputs, dataloader_iter: Any, batch_idx: int, unused: Optional[int] = 0) -> None: + super().on_train_batch_end(outputs, dataloader_iter, batch_idx) # TODO: Replace with newer override for scheduler.step() instead of # search for plugins for fp16 GradScalar diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 4215b32e9ed3..dfc201580eb6 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -21,9 +21,9 @@ from pytorch_lightning.trainer.trainer import Trainer from nemo.collections.nlp.data.language_modeling.megatron.gpt_dataset import build_train_valid_test_datasets -from nemo.collections.nlp.data.language_modeling.megatron.megatron_batch_samplers import ( - MegatronPretrainingBatchSampler, - MegatronPretrainingRandomBatchSampler, +from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import ( + MegatronPretrainingSampler, + MegatronPretrainingRandomSampler, ) from nemo.collections.nlp.models.language_modeling.megatron.gpt_model import GPTModel from nemo.collections.nlp.models.language_modeling.megatron_base_model import MegatronBaseModel @@ -302,7 +302,7 @@ def _get_fwd_bwd_function(self): fwd_bwd_function = forward_backward_no_pipelining return fwd_bwd_function - def training_step(self, batch, batch_idx): + def training_step(self, dataloader_iter, batch_idx): """ Our dataloaders produce a micro-batch and then we fetch a number of microbatches depending on the global batch size and model parallel size @@ -333,16 +333,6 @@ def training_step(self, batch, batch_idx): for param in module.embedding.parameters(): param.data_ptr() - if parallel_state.is_pipeline_first_stage(ignore_virtual=True) or parallel_state.is_pipeline_last_stage( - ignore_virtual=True - ): - # we prepare the micro batches for the apex fwd/bwd function - batch_for_pipeline = self.process_global_batch(batch) - else: - # The intermediate pipeline stages do not need any inputs from data loader - # GPT3 uses decoder with AttnMask:causal, thus doesn't need attention_mask - batch_for_pipeline = None - tensor_shape = [self.cfg.encoder_seq_length, self.cfg.micro_batch_size, self.cfg.hidden_size] # handle asynchronous grad reduction @@ -369,7 +359,7 @@ def training_step(self, batch, batch_idx): losses_reduced_per_micro_batch = fwd_bwd_function( forward_step_func=self.get_forward_output_and_loss_func(), - batch=batch_for_pipeline, + batch=dataloader_iter, model=self.model, forward_only=False, tensor_shape=tensor_shape, @@ -424,18 +414,32 @@ def training_step(self, batch, batch_idx): if self.cfg.precision == 16: loss_scale = self.trainer.precision_plugin.scaler._scale if loss_scale is not None: - self.log('loss_scale', loss_scale) + self.log('loss_scale', loss_scale, batch_size=self.cfg.micro_batch_size) - self.log('reduced_train_loss', loss_mean, prog_bar=True, rank_zero_only=True) + self.log( + 'reduced_train_loss', + loss_mean, + prog_bar=True, + rank_zero_only=True, + batch_size=self.cfg.micro_batch_size + ) lr = self._optimizer.param_groups[0]['lr'] - self.log('lr', lr, rank_zero_only=True) - self.log('global_step', self.trainer.global_step, prog_bar=True, rank_zero_only=True) + self.log('lr', lr, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) + self.log( + 'global_step', + self.trainer.global_step, + prog_bar=True, + rank_zero_only=True, + batch_size=self.cfg.micro_batch_size + ) + # TODO: make sure compute_consumed_samples works for pipeline parallelism self.log( 'consumed_samples', self.compute_consumed_samples(self.trainer.global_step - self.init_global_step), prog_bar=True, rank_zero_only=True, + batch_size=self.cfg.micro_batch_size, ) return loss_mean @@ -516,39 +520,39 @@ def allreduce_first_last_embeddings(self): grad = word_embeddings_weight.grad torch.distributed.all_reduce(grad, group=parallel_state.get_embedding_group()) - def get_forward_output_and_loss_func(self, validation_step=False): - def fwd_output_and_loss_func(batch, model, checkpoint_activations_all_layers=None): + def get_forward_output_and_loss_func(self): + def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_layers=None): + # GPT3 uses only causal mask, which doesn't need attention mask if parallel_state.get_pipeline_model_parallel_world_size() == 1: - batch = [x.cuda(non_blocking=True) for x in batch] - tokens, labels, loss_mask, attention_mask, position_ids = batch - attention_mask = attention_mask[0:1] + batch = next(dataloader_iter) + for k in batch.keys(): + batch[k] = batch[k].cuda(non_blocking=True) if k not in ['attention_mask'] else None else: - # GPT3 uses only causal mask, which doesn't need attention mask if parallel_state.is_pipeline_first_stage(): - # Fist pipeline stage needs only the tokens and position_ids - tokens = batch[0].cuda(non_blocking=True) - position_ids = batch[4].cuda(non_blocking=True) - labels, loss_mask, attention_mask = None, None, None + batch = next(dataloader_iter) + # First pipeline stage needs only the tokens and position_ids + for k in batch.keys(): + batch[k] = batch[k].cuda(non_blocking=True) if k in ['tokens', 'position_ids'] else None elif parallel_state.is_pipeline_last_stage(): + batch = next(dataloader_iter) # Last pipeline stage needs only the labels and loss_mask - labels = batch[1].cuda(non_blocking=True) - loss_mask = batch[2].cuda(non_blocking=True) - tokens, attention_mask, position_ids = None, None, None + for k in batch.keys(): + batch[k] = batch[k].cuda(non_blocking=True) if k in ['labels', 'loss_mask'] else None else: # Intermediate pipeline stage doesn't need any inputs - tokens, labels, loss_mask, attention_mask, position_ids = None, None, None, None, None + batch = {k:None for k in ['tokens', 'position_ids', 'attention_mask', 'labels']} output_tensor = model( - tokens, - position_ids, - attention_mask, - labels, - checkpoint_activations_all_layers=checkpoint_activations_all_layers, + batch['tokens'], + batch['position_ids'], + batch['attention_mask'], + batch['labels'], + checkpoint_activations_all_layers=checkpoint_activations_all_layers ) def loss_func(output_tensor): # Loss for a micro-batch (ub) - loss_for_ub = self.loss_func(loss_mask, output_tensor) + loss_for_mb = self.loss_func(batch['loss_mask'], output_tensor) if validation_step and not self.cfg.data.get('validation_drop_last', True): num_valid_tokens_in_ub = loss_mask.sum() if loss_for_ub.isnan(): @@ -606,7 +610,7 @@ def id_func(output_tensor): return fwd_output_only_func - def validation_step(self, batch, batch_idx): + def validation_step(self, dataloader_iter, batch_idx): """ Our dataloaders produce a micro-batch and then we fetch a number of microbatches depending on the global batch size and model parallel size @@ -614,7 +618,6 @@ def validation_step(self, batch, batch_idx): The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ - batch_for_pipeline = self.process_global_batch(batch) tensor_shape = [self.cfg.encoder_seq_length, self.cfg.micro_batch_size, self.cfg.hidden_size] # run forward passes for an entire global batch @@ -623,7 +626,7 @@ def validation_step(self, batch, batch_idx): losses_reduced_per_micro_batch = fwd_bwd_function( forward_step_func=self.get_forward_output_and_loss_func(validation_step=True), - batch=batch_for_pipeline, + batch=dataloader_iter, model=self.model, forward_only=True, tensor_shape=tensor_shape, @@ -671,7 +674,7 @@ def validation_epoch_end(self, outputs): # we can only log on one rank if it is rank zero so we broadcast from last rank torch.distributed.broadcast(averaged_loss, get_last_rank()) - self.log('val_loss', averaged_loss, prog_bar=True, rank_zero_only=True) + self.log('val_loss', averaged_loss, prog_bar=True, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) def test_step(self, batch, batch_idx): return self.validation_step(batch, batch_idx) @@ -687,18 +690,6 @@ def loss_func(self, loss_mask, output_tensor): loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum() # sequence level nll return loss - def process_global_batch(self, global_batch, global_batch_size=None): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. - """ - return [ - global_batch["tokens"], - global_batch["labels"], - global_batch["loss_mask"], - global_batch["attention_mask"], - global_batch["position_ids"], - ] - def build_train_valid_test_datasets(self): logging.info('Building GPT datasets.') if self.trainer.limit_val_batches > 1.0 and isinstance(self.trainer.limit_val_batches, float): @@ -750,26 +741,24 @@ def build_pretraining_data_loader( # Megatron sampler if hasattr(self.cfg.data, 'dataloader_type') and self.cfg.data.dataloader_type is not None: if self.cfg.data.dataloader_type == 'single': - batch_sampler = MegatronPretrainingBatchSampler( + batch_sampler = MegatronPretrainingSampler( total_samples=len(dataset), consumed_samples=consumed_samples, micro_batch_size=self.cfg.micro_batch_size, - global_batch_size=self.cfg.global_batch_size, data_parallel_rank=parallel_state.get_data_parallel_rank(), data_parallel_size=parallel_state.get_data_parallel_world_size(), drop_last=drop_last, + global_batch_size=self.cfg.global_batch_size, pad_samples_to_global_batch_size=pad_samples_to_global_batch_size, ) elif self.cfg.data.dataloader_type == 'cyclic': - batch_sampler = MegatronPretrainingRandomBatchSampler( + batch_sampler = MegatronPretrainingRandomSampler( total_samples=len(dataset), consumed_samples=consumed_samples, micro_batch_size=self.cfg.micro_batch_size, - global_batch_size=self.cfg.global_batch_size, data_parallel_rank=parallel_state.get_data_parallel_rank(), data_parallel_size=parallel_state.get_data_parallel_world_size(), drop_last=self.cfg.get('drop_last', True), - pad_samples_to_global_batch_size=pad_samples_to_global_batch_size, ) else: raise ValueError('cfg.data.dataloader_type must be "single" or "cyclic"') diff --git a/nemo/utils/exp_manager.py b/nemo/utils/exp_manager.py index 4b1668fc319d..769fb24680ea 100644 --- a/nemo/utils/exp_manager.py +++ b/nemo/utils/exp_manager.py @@ -185,7 +185,8 @@ def _on_batch_start(self, name): def _on_batch_end(self, name, pl_module): self.timer.stop(name) - pl_module.log(name, self.timer[name], on_step=True, on_epoch=False) + # Set the `batch_size=1` as WAR for `dataloader_iter`, which is not used for any metric + pl_module.log(name, self.timer[name], on_step=True, on_epoch=False, batch_size=1) def on_train_batch_start(self, trainer, pl_module, batch, batch_idx): self._on_batch_start("train_step_timing") From 14cdcd45e0e130228aceabc9c189a752e7ad495f Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Wed, 14 Dec 2022 16:54:26 +0900 Subject: [PATCH 02/18] per-micro-batch input loader set arg default val --- .../nlp/data/language_modeling/megatron/data_samplers.py | 2 +- .../nlp/models/language_modeling/megatron_base_model.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py index a2e7145adb52..0a1b71223e7b 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py @@ -128,7 +128,7 @@ def __init__( micro_batch_size: int, data_parallel_rank: int, data_parallel_size: int, - drop_last: bool, + drop_last: bool = True, global_batch_size: Optional[int] = None, pad_samples_to_global_batch_size: Optional[bool] = False, ) -> None: diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index e68ff9b74069..b02f65f4f15c 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -124,9 +124,10 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): # Convert the global-batch-based profile index to micro-batch index if hasattr(self, '_nsys_profile_enabled'): + mp_size = cfg.get('tensor_model_parallel_size', 1) * cfg.get('pipeline_model_parallel_size', 1) + data_parallel_world_size = trainer.world_size // mp_size grad_accum_steps = ( - cfg.get('global_batch_size') // - (cfg.get('micro_batch_size') * parallel_state.get_data_parallel_world_size()) + cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) ) self._nsys_profile_start_step *= grad_accum_steps self._nsys_profile_end_step *= grad_accum_steps From 3574676f0fac9408c4b2aa5f63b723b63a9e8c98 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Jan 2023 23:41:07 +0000 Subject: [PATCH 03/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../language_modeling/megatron/data_samplers.py | 9 ++++++--- .../language_modeling/megatron_base_model.py | 4 +--- .../language_modeling/megatron_gpt_model.py | 16 ++++++---------- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py index 0a1b71223e7b..427bef238602 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py @@ -110,7 +110,9 @@ def __iter__(self): # Check the last partial batch and see drop_last is set if len(batch) > 0 and not self.drop_last: if self.pad_samples_to_global_batch_size: - for i in range(self.data_parallel_rank, self.global_batch_size, self.micro_batch_times_data_parallel_size): + for i in range( + self.data_parallel_rank, self.global_batch_size, self.micro_batch_times_data_parallel_size + ): indices = [batch[j] for j in range(i, max(len(batch), i + self.micro_batch_size))] num_pad = self.micro_batch_size - len(indices) indices = indices + [-1] * num_pad @@ -142,8 +144,9 @@ def __init__( global_batch_size=global_batch_size, pad_samples_to_global_batch_size=pad_samples_to_global_batch_size, ) - assert self.pad_samples_to_global_batch_size == False, \ - "`MegatronPretrainingRandomSampler` does not support sample padding" + assert ( + self.pad_samples_to_global_batch_size == False + ), "`MegatronPretrainingRandomSampler` does not support sample padding" self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size def __iter__(self): diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index b02f65f4f15c..c9c175f62f2f 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -126,9 +126,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): if hasattr(self, '_nsys_profile_enabled'): mp_size = cfg.get('tensor_model_parallel_size', 1) * cfg.get('pipeline_model_parallel_size', 1) data_parallel_world_size = trainer.world_size // mp_size - grad_accum_steps = ( - cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) - ) + grad_accum_steps = cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) self._nsys_profile_start_step *= grad_accum_steps self._nsys_profile_end_step *= grad_accum_steps diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index dfc201580eb6..d23a3f7cddaf 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -20,11 +20,11 @@ from omegaconf.dictconfig import DictConfig from pytorch_lightning.trainer.trainer import Trainer -from nemo.collections.nlp.data.language_modeling.megatron.gpt_dataset import build_train_valid_test_datasets from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import ( - MegatronPretrainingSampler, MegatronPretrainingRandomSampler, + MegatronPretrainingSampler, ) +from nemo.collections.nlp.data.language_modeling.megatron.gpt_dataset import build_train_valid_test_datasets from nemo.collections.nlp.models.language_modeling.megatron.gpt_model import GPTModel from nemo.collections.nlp.models.language_modeling.megatron_base_model import MegatronBaseModel from nemo.collections.nlp.modules.common.megatron.module import Float16Module @@ -417,11 +417,7 @@ def training_step(self, dataloader_iter, batch_idx): self.log('loss_scale', loss_scale, batch_size=self.cfg.micro_batch_size) self.log( - 'reduced_train_loss', - loss_mean, - prog_bar=True, - rank_zero_only=True, - batch_size=self.cfg.micro_batch_size + 'reduced_train_loss', loss_mean, prog_bar=True, rank_zero_only=True, batch_size=self.cfg.micro_batch_size ) lr = self._optimizer.param_groups[0]['lr'] self.log('lr', lr, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) @@ -430,7 +426,7 @@ def training_step(self, dataloader_iter, batch_idx): self.trainer.global_step, prog_bar=True, rank_zero_only=True, - batch_size=self.cfg.micro_batch_size + batch_size=self.cfg.micro_batch_size, ) # TODO: make sure compute_consumed_samples works for pipeline parallelism @@ -540,14 +536,14 @@ def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_ batch[k] = batch[k].cuda(non_blocking=True) if k in ['labels', 'loss_mask'] else None else: # Intermediate pipeline stage doesn't need any inputs - batch = {k:None for k in ['tokens', 'position_ids', 'attention_mask', 'labels']} + batch = {k: None for k in ['tokens', 'position_ids', 'attention_mask', 'labels']} output_tensor = model( batch['tokens'], batch['position_ids'], batch['attention_mask'], batch['labels'], - checkpoint_activations_all_layers=checkpoint_activations_all_layers + checkpoint_activations_all_layers=checkpoint_activations_all_layers, ) def loss_func(output_tensor): From dab2118f48b308b6b02bc158471b49c9aa616e5a Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Wed, 18 Jan 2023 17:37:53 -0800 Subject: [PATCH 04/18] minor fix --- .../data/language_modeling/megatron/data_samplers.py | 10 +++++----- .../nlp/models/language_modeling/megatron_gpt_model.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py index 427bef238602..5bc386875a36 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py @@ -50,13 +50,13 @@ def __init__( ) ) if global_batch_size is not None: - if global_batch_size % (self.micro_batch_size * data_parallel_size) != 0: + if global_batch_size % (micro_batch_size * data_parallel_size) != 0: raise RuntimeError( - f"`global_batch_size` ({self._global_batch_size}) is not divisible by " - f"`micro_batch_size ({self.micro_batch_size}) x data_parallel_size " + f"`global_batch_size` ({global_batch_size}) is not divisible by " + f"`micro_batch_size ({micro_batch_size}) x data_parallel_size " f"({data_parallel_size})`" ) - if self.pad_samples_to_global_batch_size and self.global_batch_size is None: + if pad_samples_to_global_batch_size and global_batch_size is None: raise RuntimeError( f"`pad_samples_to_global_batch_size` can be `True` only when " f"`global_batch_size` is set to an integer value" @@ -145,7 +145,7 @@ def __init__( pad_samples_to_global_batch_size=pad_samples_to_global_batch_size, ) assert ( - self.pad_samples_to_global_batch_size == False + pad_samples_to_global_batch_size == False ), "`MegatronPretrainingRandomSampler` does not support sample padding" self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index d23a3f7cddaf..73bdcb34461a 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -516,7 +516,7 @@ def allreduce_first_last_embeddings(self): grad = word_embeddings_weight.grad torch.distributed.all_reduce(grad, group=parallel_state.get_embedding_group()) - def get_forward_output_and_loss_func(self): + def get_forward_output_and_loss_func(self, validation_step=False): def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_layers=None): # GPT3 uses only causal mask, which doesn't need attention mask if parallel_state.get_pipeline_model_parallel_world_size() == 1: From 4979f7ec8fe2c2dd5c87582f93c4a739ab3f23c2 Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Thu, 19 Jan 2023 10:47:17 -0800 Subject: [PATCH 05/18] apply per-microbatch-loader to only GPT --- .../language_modeling/megatron_base_model.py | 12 +---- .../language_modeling/megatron_gpt_model.py | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index c9c175f62f2f..65cfc2eb3d9d 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -122,14 +122,6 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): "default_on_epoch": False, } - # Convert the global-batch-based profile index to micro-batch index - if hasattr(self, '_nsys_profile_enabled'): - mp_size = cfg.get('tensor_model_parallel_size', 1) * cfg.get('pipeline_model_parallel_size', 1) - data_parallel_world_size = trainer.world_size // mp_size - grad_accum_steps = cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) - self._nsys_profile_start_step *= grad_accum_steps - self._nsys_profile_end_step *= grad_accum_steps - def _enable_nvidia_optimizations(self): "These optimizations are present in NVIDIA NGC PyTorch Containers" @@ -290,8 +282,8 @@ def reduce_overlap_gradients(self): p for p in self._optimizer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) ) - def on_train_batch_end(self, outputs, dataloader_iter: Any, batch_idx: int, unused: Optional[int] = 0) -> None: - super().on_train_batch_end(outputs, dataloader_iter, batch_idx) + def on_train_batch_end(self, outputs, batch, batch_idx: int, unused: Optional[int] = 0) -> None: + super().on_train_batch_end(outputs, batch, batch_idx) # TODO: Replace with newer override for scheduler.step() instead of # search for plugins for fp16 GradScalar diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 73bdcb34461a..3ec582184847 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -19,6 +19,7 @@ import torch from omegaconf.dictconfig import DictConfig from pytorch_lightning.trainer.trainer import Trainer +from pytorch_lightning.plugins.precision.native_amp import NativeMixedPrecisionPlugin from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import ( MegatronPretrainingRandomSampler, @@ -139,6 +140,16 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): # configuration used for inference self._inference_config = None + # Convert the global-batch-based profile index to micro-batch index + if hasattr(self, '_nsys_profile_enabled'): + mp_size = cfg.get('tensor_model_parallel_size', 1) * cfg.get('pipeline_model_parallel_size', 1) + data_parallel_world_size = trainer.world_size // mp_size + grad_accum_steps = ( + cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) + ) + self._nsys_profile_start_step *= grad_accum_steps + self._nsys_profile_end_step *= grad_accum_steps + def set_inference_config(self, inference_config): self._inference_config = inference_config @@ -991,3 +1002,41 @@ def parameters(self): return itertools.chain.from_iterable(module.parameters() for module in self.model) else: return self.model.parameters() + + def on_train_batch_end(self, outputs, dataloader_iter: Any, batch_idx: int, unused: Optional[int] = 0) -> None: + super().on_train_batch_end(outputs, dataloader_iter, batch_idx) + + # TODO: Replace with newer override for scheduler.step() instead of + # search for plugins for fp16 GradScalar + if self.trainer.precision_plugin is not None and isinstance( + self.trainer.precision_plugin, NativeMixedPrecisionPlugin + ): + precision_plugin = self.trainer.precision_plugin + + if ( + hasattr(precision_plugin, 'scaler') + and precision_plugin.scaler is not None + and isinstance(precision_plugin.scaler, GradScaler) + ): + grad_scaler = precision_plugin.scaler + + # If the grad scaler skipped its optimizer step due to infs/nans, + # decrement the step of all schedulers. + if grad_scaler.optimizer_update_skipped is not None and grad_scaler.optimizer_update_skipped is True: + scheduler_cfgs = self.trainer.lr_scheduler_configs + + if not scheduler_cfgs or not self.trainer.lightning_module.automatic_optimization: + return + + for scheduler_cfg in scheduler_cfgs: + # Decrement the counter by 2, then perform a scheduler.step() to perform a no-up + # as well as update the optimizer lr in all param groups + scheduler_cfg.scheduler.last_epoch -= 2 + scheduler_cfg.scheduler.step() + + # Removing the line below because it messes up train_valid_test_num_samples calculation. + # self.trainer.fit_loop.max_steps = self.trainer.fit_loop.max_steps + 1 + + # Reset the optimizer update skipped to `None` - this is to prevent scheduler no-ops during + # accumulated gradient updates. + grad_scaler.optimizer_update_skipped = None From be04e8cf5e947f06434c25e58719d63ec348a156 Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Fri, 3 Feb 2023 16:21:23 -0800 Subject: [PATCH 06/18] update docstring on micro-batch input loader --- .../nlp/models/language_modeling/megatron_gpt_model.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 3ec582184847..45c77b99e446 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -315,12 +315,9 @@ def _get_fwd_bwd_function(self): def training_step(self, dataloader_iter, batch_idx): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + We pass the dataloader iterator function to the micro-batch scheduler. + The input batch to each micro-batch is fetched using the dataloader function + in the micro-batch fwd function. """ # we zero grads here because we also call backward in the apex fwd/bwd functions From 8acf409bbdf191a010d08e8f710081ecf1134aca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 3 Feb 2023 19:00:09 +0000 Subject: [PATCH 07/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_gpt_model.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 45c77b99e446..cbfb85a834e4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -18,8 +18,8 @@ import numpy as np import torch from omegaconf.dictconfig import DictConfig -from pytorch_lightning.trainer.trainer import Trainer from pytorch_lightning.plugins.precision.native_amp import NativeMixedPrecisionPlugin +from pytorch_lightning.trainer.trainer import Trainer from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import ( MegatronPretrainingRandomSampler, @@ -144,9 +144,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): if hasattr(self, '_nsys_profile_enabled'): mp_size = cfg.get('tensor_model_parallel_size', 1) * cfg.get('pipeline_model_parallel_size', 1) data_parallel_world_size = trainer.world_size // mp_size - grad_accum_steps = ( - cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) - ) + grad_accum_steps = cfg.get('global_batch_size') // (cfg.get('micro_batch_size') * data_parallel_world_size) self._nsys_profile_start_step *= grad_accum_steps self._nsys_profile_end_step *= grad_accum_steps From cf4d85f506533c728bfa60fbe3e0faf5e7a0422e Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Fri, 3 Feb 2023 16:28:34 -0800 Subject: [PATCH 08/18] fixed the default arg val --- .../nlp/data/language_modeling/megatron/data_samplers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py index 5bc386875a36..e809c55dbb8a 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py @@ -30,7 +30,7 @@ def __init__( micro_batch_size: int, data_parallel_rank: int, data_parallel_size: int, - drop_last: bool, + drop_last: bool = True, global_batch_size: Optional[int] = None, pad_samples_to_global_batch_size: Optional[bool] = False, ) -> None: From 6561d3b5090186acee04f147991fca464c4bc8b7 Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Fri, 3 Feb 2023 17:34:23 -0800 Subject: [PATCH 09/18] fix batch size to 1 at log stat registration --- .../language_modeling/megatron_base_model.py | 2 +- .../models/language_modeling/megatron_gpt_model.py | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 65cfc2eb3d9d..f3ca604a0dbc 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -242,7 +242,7 @@ def configure_gradient_clipping(self, *args, **kwargs): parameters = self._get_parameters() grad_norm = clip_grad_norm_fp32(parameters=parameters, max_norm=clip_val) - self.log('grad_norm', grad_norm, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) + self.log('grad_norm', grad_norm, rank_zero_only=True, batch_size=1) def allreduce_gradients(self): """Reduce gradients across data parallel ranks. diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index cbfb85a834e4..d519b462a165 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -420,19 +420,17 @@ def training_step(self, dataloader_iter, batch_idx): if self.cfg.precision == 16: loss_scale = self.trainer.precision_plugin.scaler._scale if loss_scale is not None: - self.log('loss_scale', loss_scale, batch_size=self.cfg.micro_batch_size) + self.log('loss_scale', loss_scale, batch_size=1) - self.log( - 'reduced_train_loss', loss_mean, prog_bar=True, rank_zero_only=True, batch_size=self.cfg.micro_batch_size - ) + self.log('reduced_train_loss', loss_mean, prog_bar=True, rank_zero_only=True, batch_size=1) lr = self._optimizer.param_groups[0]['lr'] - self.log('lr', lr, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) + self.log('lr', lr, rank_zero_only=True, batch_size=1) self.log( 'global_step', self.trainer.global_step, prog_bar=True, rank_zero_only=True, - batch_size=self.cfg.micro_batch_size, + batch_size=1, ) # TODO: make sure compute_consumed_samples works for pipeline parallelism @@ -441,7 +439,7 @@ def training_step(self, dataloader_iter, batch_idx): self.compute_consumed_samples(self.trainer.global_step - self.init_global_step), prog_bar=True, rank_zero_only=True, - batch_size=self.cfg.micro_batch_size, + batch_size=1, ) return loss_mean @@ -676,7 +674,7 @@ def validation_epoch_end(self, outputs): # we can only log on one rank if it is rank zero so we broadcast from last rank torch.distributed.broadcast(averaged_loss, get_last_rank()) - self.log('val_loss', averaged_loss, prog_bar=True, rank_zero_only=True, batch_size=self.cfg.micro_batch_size) + self.log('val_loss', averaged_loss, prog_bar=True, rank_zero_only=True, batch_size=1) def test_step(self, batch, batch_idx): return self.validation_step(batch, batch_idx) From f3eb580df73fcaaa8ea267683f84c9f5343d358b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 4 Feb 2023 01:35:26 +0000 Subject: [PATCH 10/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_gpt_model.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index d519b462a165..0fd6b4529048 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -426,11 +426,7 @@ def training_step(self, dataloader_iter, batch_idx): lr = self._optimizer.param_groups[0]['lr'] self.log('lr', lr, rank_zero_only=True, batch_size=1) self.log( - 'global_step', - self.trainer.global_step, - prog_bar=True, - rank_zero_only=True, - batch_size=1, + 'global_step', self.trainer.global_step, prog_bar=True, rank_zero_only=True, batch_size=1, ) # TODO: make sure compute_consumed_samples works for pipeline parallelism From 4efc72e31fe2a0cd2395f918e156059fbe0d9603 Mon Sep 17 00:00:00 2001 From: ericharper Date: Tue, 7 Feb 2023 12:16:42 -0700 Subject: [PATCH 11/18] update container for CI Signed-off-by: ericharper --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 9dcaecc4359b..9ee99c9a5f0b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -4509,4 +4509,4 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"''' cleanWs() } } -} \ No newline at end of file +} From 8d82f15748961a276ca5ad4603c7944da0b6e038 Mon Sep 17 00:00:00 2001 From: ericharper Date: Tue, 7 Feb 2023 15:52:41 -0700 Subject: [PATCH 12/18] update container in jenkinsfile Signed-off-by: ericharper --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 9ee99c9a5f0b..f36839598941 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,7 +1,7 @@ pipeline { agent { docker { - image 'nvcr.io/nvidia/pytorch:23.01-py3' + image 'nemo_containers:23.01_apex_c3d575f2478cd379b3c2d81f41edde39791b5d92' args '--device=/dev/nvidia0 --gpus all --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g' } } From 817cc89627d79134fd430e1aca6dd0e28d3beef4 Mon Sep 17 00:00:00 2001 From: ericharper Date: Tue, 7 Feb 2023 12:16:42 -0700 Subject: [PATCH 13/18] update container for CI Signed-off-by: ericharper fix merge conflict --- Jenkinsfile | 2 +- .../nlp/models/language_modeling/megatron_gpt_model.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 9dcaecc4359b..9ee99c9a5f0b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -4509,4 +4509,4 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"''' cleanWs() } } -} \ No newline at end of file +} diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 0fd6b4529048..1829a405c6b2 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -550,9 +550,9 @@ def loss_func(output_tensor): # Loss for a micro-batch (ub) loss_for_mb = self.loss_func(batch['loss_mask'], output_tensor) if validation_step and not self.cfg.data.get('validation_drop_last', True): - num_valid_tokens_in_ub = loss_mask.sum() + num_valid_tokens_in_ub = batch['loss_mask'].sum() if loss_for_ub.isnan(): - assert loss_mask.count_nonzero() == 0, 'Got NaN loss with non-empty input' + assert batch['loss_mask'].count_nonzero() == 0, 'Got NaN loss with non-empty input' loss_sum_for_ub = torch.zeros_like(num_valid_tokens_in_ub) else: loss_sum_for_ub = num_valid_tokens_in_ub * loss_for_ub From d23b7757e0f935dacde2840f234193c632a2b3be Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Tue, 7 Feb 2023 16:04:40 -0800 Subject: [PATCH 14/18] revert Jenkinsfile --- Jenkinsfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index f36839598941..37679ac338e7 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,7 +1,7 @@ pipeline { agent { docker { - image 'nemo_containers:23.01_apex_c3d575f2478cd379b3c2d81f41edde39791b5d92' + image 'nvcr.io/nvidia/pytorch:23.01-py3' args '--device=/dev/nvidia0 --gpus all --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g' } } @@ -4510,3 +4510,4 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"''' } } } + From 8b5ac9a7230f4f8a67d50ac7ba06782a75e2d53d Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Wed, 8 Feb 2023 09:51:56 -0800 Subject: [PATCH 15/18] Revert "revert Jenkinsfile" This reverts commit d23b7757e0f935dacde2840f234193c632a2b3be. --- Jenkinsfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 37679ac338e7..f36839598941 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,7 +1,7 @@ pipeline { agent { docker { - image 'nvcr.io/nvidia/pytorch:23.01-py3' + image 'nemo_containers:23.01_apex_c3d575f2478cd379b3c2d81f41edde39791b5d92' args '--device=/dev/nvidia0 --gpus all --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g' } } @@ -4510,4 +4510,3 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"''' } } } - From 8068ad7cd10663210ed12ef1189dd83c648d33a5 Mon Sep 17 00:00:00 2001 From: Tim Moon <4406448+timmoon10@users.noreply.github.com> Date: Wed, 8 Feb 2023 13:01:48 -0800 Subject: [PATCH 16/18] Update nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py Signed-off-by: Tim Moon <4406448+timmoon10@users.noreply.github.com> --- .../nlp/models/language_modeling/megatron_gpt_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 1829a405c6b2..62a8846701e4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -548,7 +548,7 @@ def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_ def loss_func(output_tensor): # Loss for a micro-batch (ub) - loss_for_mb = self.loss_func(batch['loss_mask'], output_tensor) + loss_for_ub = self.loss_func(batch['loss_mask'], output_tensor) if validation_step and not self.cfg.data.get('validation_drop_last', True): num_valid_tokens_in_ub = batch['loss_mask'].sum() if loss_for_ub.isnan(): From 001a4678b590f8cb5e7c12e47c94fb68c5eba7c9 Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Thu, 9 Feb 2023 11:03:32 -0800 Subject: [PATCH 17/18] add GradScaler --- .../nlp/models/language_modeling/megatron_gpt_model.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 62a8846701e4..21eb0f1e1602 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -48,6 +48,7 @@ TextGeneration, ) from nemo.collections.nlp.parts.utils_funcs import get_last_rank +from nemo.collections.nlp.parts.nlp_overrides import GradScaler from nemo.core.classes.common import PretrainedModelInfo from nemo.utils import logging From c2a17630c29dce33c50bf7e4093eb8c333fa3491 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 9 Feb 2023 19:06:44 +0000 Subject: [PATCH 18/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_gpt_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 21eb0f1e1602..bdd3374a2c16 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -47,8 +47,8 @@ SamplingParam, TextGeneration, ) -from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.collections.nlp.parts.nlp_overrides import GradScaler +from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes.common import PretrainedModelInfo from nemo.utils import logging