From db92504ec341128f0952b1bef3b3b2c6b0f89550 Mon Sep 17 00:00:00 2001 From: ashors1 Date: Mon, 15 Jul 2024 15:51:31 +0000 Subject: [PATCH] Apply isort and black reformatting Signed-off-by: ashors1 --- .../language_modeling/megatron_bert_model.py | 79 ++++++++++--------- 1 file changed, 43 insertions(+), 36 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py index 701a3acdf8cc..701d24d5b942 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py @@ -206,8 +206,8 @@ def model_provider_func(self, pre_process, post_process): return model def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -300,7 +300,11 @@ def forward( model = self.model if self.mcore_bert: - output_tensor = model(input_ids, attention_mask, tokentype_ids=token_type_ids,) + output_tensor = model( + input_ids, + attention_mask, + tokentype_ids=token_type_ids, + ) else: output_tensor = model( input_ids, @@ -415,21 +419,24 @@ def training_step(self, dataloader_iter): self.log('lr', lr, batch_size=1) self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1) self.log( - 'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1, + 'consumed_samples', + self._compute_consumed_samples_after_training_step(), + prog_bar=True, + batch_size=1, ) return loss_mean[0] def _make_data_iterator_list(self, data_iterator: Iterator) -> List[Iterator]: - """ Convert data iterator into form expected by Megatron - With interleaved pipeline parallelism, Megatron expects a - list of one data iterator per model chunk. Each model - chunk independently gets data from its data iterator, so - we need to interact with the data iterator multiple times - for each microbatch step. Instead of incorporating this - logic into the data loader, we cache the iterator's output - to the first model chunk and reuse it in the other model - chunks. + """Convert data iterator into form expected by Megatron + With interleaved pipeline parallelism, Megatron expects a + list of one data iterator per model chunk. Each model + chunk independently gets data from its data iterator, so + we need to interact with the data iterator multiple times + for each microbatch step. Instead of incorporating this + logic into the data loader, we cache the iterator's output + to the first model chunk and reuse it in the other model + chunks. """ if not isinstance(self.model, list) or len(self.model) == 1: @@ -695,9 +702,9 @@ def build_train_valid_test_datasets(self): ] if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float): - train_valid_test_num_samples[ - 1 - ] = 1 # This is to make sure we only have one epoch on every validation iteration + train_valid_test_num_samples[1] = ( + 1 # This is to make sure we only have one epoch on every validation iteration + ) self._train_ds, self._validation_ds, self._test_ds = dataset_utils.build_train_valid_test_datasets( cfg=self.cfg, @@ -731,20 +738,20 @@ def build_train_valid_test_datasets(self): return self._train_ds, self._validation_ds, self._test_ds def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. + No need to call it here. """ return def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ return def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -814,12 +821,12 @@ def setup(self, stage=None): self.setup_transformer_engine_tp_groups() def setup_transformer_engine_tp_groups(self): - """ This should be called after model parallel groups have been initialized - and only needs to be called when using Transformer Engine. + """This should be called after model parallel groups have been initialized + and only needs to be called when using Transformer Engine. """ for module in self.get_bert_module_list(): """Set TP group - Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 """ # Deep iterate but skip self to avoid infinite recursion. for index, child in enumerate(module.modules()): @@ -841,9 +848,9 @@ def get_bert_module_list(self): return [self.model] def allreduce_sequence_parallel_gradients(self): - """ All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. - Modified from megatron-lm: - https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 + """All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. + Modified from megatron-lm: + https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 """ grads = [] @@ -923,10 +930,10 @@ def setup_test_data(self, cfg): self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch @@ -1146,10 +1153,10 @@ def on_load_checkpoint(self, checkpoint) -> None: parallel_state.set_virtual_pipeline_model_parallel_rank(0) def build_transformer_config(self) -> TransformerConfig: - """ Builds the megatron core gpt transformer config for the model. - For attributes in the nemo model config that are the same - as the megatron core TransformerConfig, we will use the value from the nemo model config. - For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. + """Builds the megatron core gpt transformer config for the model. + For attributes in the nemo model config that are the same + as the megatron core TransformerConfig, we will use the value from the nemo model config. + For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. """ activation = self.cfg.get('activation', 'gelu') assert activation == 'gelu', "Only gelu activation is support for BERT at the moment."