Skip to content

Commit

Permalink
Apply isort and black reformatting
Browse files Browse the repository at this point in the history
Signed-off-by: ashors1 <ashors1@users.noreply.github.com>
  • Loading branch information
ashors1 committed Jul 15, 2024
1 parent 81302ba commit db92504
Showing 1 changed file with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ def model_provider_func(self, pre_process, post_process):
return model

def _validate_trainer(self):
""" Certain trainer configurations can break training.
Here we try to catch them and raise an error.
"""Certain trainer configurations can break training.
Here we try to catch them and raise an error.
"""
if self.trainer.accumulate_grad_batches > 1:
raise ValueError(
Expand Down Expand Up @@ -300,7 +300,11 @@ def forward(
model = self.model

if self.mcore_bert:
output_tensor = model(input_ids, attention_mask, tokentype_ids=token_type_ids,)
output_tensor = model(
input_ids,
attention_mask,
tokentype_ids=token_type_ids,
)
else:
output_tensor = model(
input_ids,
Expand Down Expand Up @@ -415,21 +419,24 @@ def training_step(self, dataloader_iter):
self.log('lr', lr, batch_size=1)
self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1)
self.log(
'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1,
'consumed_samples',
self._compute_consumed_samples_after_training_step(),
prog_bar=True,
batch_size=1,
)

return loss_mean[0]

def _make_data_iterator_list(self, data_iterator: Iterator) -> List[Iterator]:
""" Convert data iterator into form expected by Megatron
With interleaved pipeline parallelism, Megatron expects a
list of one data iterator per model chunk. Each model
chunk independently gets data from its data iterator, so
we need to interact with the data iterator multiple times
for each microbatch step. Instead of incorporating this
logic into the data loader, we cache the iterator's output
to the first model chunk and reuse it in the other model
chunks.
"""Convert data iterator into form expected by Megatron
With interleaved pipeline parallelism, Megatron expects a
list of one data iterator per model chunk. Each model
chunk independently gets data from its data iterator, so
we need to interact with the data iterator multiple times
for each microbatch step. Instead of incorporating this
logic into the data loader, we cache the iterator's output
to the first model chunk and reuse it in the other model
chunks.
"""

if not isinstance(self.model, list) or len(self.model) == 1:
Expand Down Expand Up @@ -695,9 +702,9 @@ def build_train_valid_test_datasets(self):
]

if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float):
train_valid_test_num_samples[
1
] = 1 # This is to make sure we only have one epoch on every validation iteration
train_valid_test_num_samples[1] = (
1 # This is to make sure we only have one epoch on every validation iteration
)

self._train_ds, self._validation_ds, self._test_ds = dataset_utils.build_train_valid_test_datasets(
cfg=self.cfg,
Expand Down Expand Up @@ -731,20 +738,20 @@ def build_train_valid_test_datasets(self):
return self._train_ds, self._validation_ds, self._test_ds

def backward(self, *args, **kwargs):
""" LightningModule hook to do backward.
We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core.
No need to call it here.
"""LightningModule hook to do backward.
We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core.
No need to call it here.
"""
return

def optimizer_zero_grad(self, *args, **kwargs):
""" LightningModule hook to zero grad.
We want this to do nothing as we are zeroing grads during the training_step.
"""LightningModule hook to zero grad.
We want this to do nothing as we are zeroing grads during the training_step.
"""
return

def _append_sequence_parallel_module_grads(self, module, grads):
""" Helper method for allreduce_sequence_parallel_gradients"""
"""Helper method for allreduce_sequence_parallel_gradients"""

for param in module.parameters():
sequence_parallel_param = getattr(param, 'sequence_parallel', False)
Expand Down Expand Up @@ -814,12 +821,12 @@ def setup(self, stage=None):
self.setup_transformer_engine_tp_groups()

def setup_transformer_engine_tp_groups(self):
""" This should be called after model parallel groups have been initialized
and only needs to be called when using Transformer Engine.
"""This should be called after model parallel groups have been initialized
and only needs to be called when using Transformer Engine.
"""
for module in self.get_bert_module_list():
"""Set TP group
Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398
Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398
"""
# Deep iterate but skip self to avoid infinite recursion.
for index, child in enumerate(module.modules()):
Expand All @@ -841,9 +848,9 @@ def get_bert_module_list(self):
return [self.model]

def allreduce_sequence_parallel_gradients(self):
""" All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used.
Modified from megatron-lm:
https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425
"""All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used.
Modified from megatron-lm:
https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425
"""

grads = []
Expand Down Expand Up @@ -923,10 +930,10 @@ def setup_test_data(self, cfg):
self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples)

def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any:
""" PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device
When using pipeline parallelism, we need the global batch to remain on the CPU,
since the memory overhead will be too high when using a large number of microbatches.
Microbatches are transferred from CPU to GPU inside the pipeline.
"""PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device
When using pipeline parallelism, we need the global batch to remain on the CPU,
since the memory overhead will be too high when using a large number of microbatches.
Microbatches are transferred from CPU to GPU inside the pipeline.
"""
return batch

Expand Down Expand Up @@ -1146,10 +1153,10 @@ def on_load_checkpoint(self, checkpoint) -> None:
parallel_state.set_virtual_pipeline_model_parallel_rank(0)

def build_transformer_config(self) -> TransformerConfig:
""" Builds the megatron core gpt transformer config for the model.
For attributes in the nemo model config that are the same
as the megatron core TransformerConfig, we will use the value from the nemo model config.
For attributes in TransformerConfig that are not in the nemo model config, we add custom logic.
"""Builds the megatron core gpt transformer config for the model.
For attributes in the nemo model config that are the same
as the megatron core TransformerConfig, we will use the value from the nemo model config.
For attributes in TransformerConfig that are not in the nemo model config, we add custom logic.
"""
activation = self.cfg.get('activation', 'gelu')
assert activation == 'gelu', "Only gelu activation is support for BERT at the moment."
Expand Down

0 comments on commit db92504

Please sign in to comment.