From 783f67d5c8501fc20afafcc5b1fdfc5b732266fc Mon Sep 17 00:00:00 2001 From: Sangkug Lym Date: Tue, 11 Feb 2025 18:23:48 -0800 Subject: [PATCH] linting fix Signed-off-by: Sangkug Lym --- .../language_modeling/megatron_base_model.py | 40 +++++---- .../language_modeling/megatron_gpt_model.py | 84 +++++++++++-------- 2 files changed, 72 insertions(+), 52 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index e0ca5d9315af..3df28212c899 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -101,15 +101,17 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): if not HAVE_MEGATRON_CORE: raise ImportError( - "megatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." + "megatron-core was not found. Please see the NeMo README for installation instructions: " + "https://github.com/NVIDIA/NeMo#megatron-gpt." ) if trainer is None: - raise ValueError(f"Trainer cannot be None for Megatron-based models. Please provide a PTL trainer object.") + raise ValueError("Trainer cannot be None for Megatron-based models. Please provide a PTL trainer object.") if cfg.get('use_flash_attention', False) and not HAVE_FLASH_ATTENTION: raise ImportError( - "flash_attn was not found. Please see the installation instructions: https://github.com/HazyResearch/flash-attention." + "flash_attn was not found. Please see the installation instructions: " + "https://github.com/HazyResearch/flash-attention." "If you use flash_attn with triton. Please install triton==2.0.0.dev20221202." ) @@ -256,7 +258,7 @@ def setup_transformer_engine_tp_groups(self): """ for module in self.get_model_module_list(): """Set TP group - Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 # pylint: disable=line-too-long """ # Deep iterate but skip self to avoid infinite recursion. for index, child in enumerate(module.modules()): @@ -274,7 +276,7 @@ def setup_transformer_engine_cp_groups(self): for module in self.get_model_module_list(): """Set context parallel running - Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py # pylint: disable=line-too-long """ # Deep iterate but skip self to avoid infinite recursion. for index, child in enumerate(module.modules()): @@ -349,7 +351,8 @@ def _reconfigure_limit_batches(self, limit_batches, dataloader, mode): """ Reconfigure trainer.limit_val_batches for pretraining """ - # Override limit_batches in terms of num microbatches and so there are limit_batches//num_micro_batches num of global batches + # Override limit_batches in terms of num microbatches + # and so there are limit_batches//num_micro_batches num of global batches if isinstance(limit_batches, int): limit_batches *= get_num_microbatches() else: @@ -618,7 +621,8 @@ def _vocab_size_with_padding(self, orig_vocab_size, make_vocab_size_divisible_by multiple = make_vocab_size_divisible_by * tensor_model_parallel_size after = ((after + multiple - 1) // multiple) * multiple logging.info( - f'Padded vocab_size: {after}, original vocab_size: {orig_vocab_size}, dummy tokens: {after - orig_vocab_size}.' + f"Padded vocab_size: {after}, original vocab_size: {orig_vocab_size}, " + f"dummy tokens: {after - orig_vocab_size}." ) return after @@ -673,7 +677,7 @@ def configure_gradient_clipping(self, *args, **kwargs): def allreduce_gradients(self): """Reduce gradients across data parallel ranks. - Modified from megatron-lm: https://github.com/NVIDIA/Megatron-LM/blob/d41696840ed0a7edb7e0499eb82a48ae112d9bb3/megatron/model/distributed.py#L188 + Modified from megatron-lm: https://github.com/NVIDIA/Megatron-LM/blob/d41696840ed0a7edb7e0499eb82a48ae112d9bb3/megatron/model/distributed.py#L188 # pylint: disable=line-too-long """ # Bucketize and all-reduce buckets = {} @@ -845,7 +849,8 @@ def configure_optimizers(self): # TODO: contiguous grad bucket for fp16 is also planned to be supported contiguous_grad_bucket = False raise ValueError( - "fp16 training is not yet supported with O2. Please set megatron_amp_O2 to False in the model config." + "fp16 training is not yet supported with O2." + "Please set megatron_amp_O2 to False in the model config." ) # if using tensor parallel only, we automatically use async grad all-reduce @@ -983,7 +988,8 @@ def _validate_and_override_config(self): if self.cfg.get('sequence_parallel', False) and self.cfg.get('tensor_model_parallel_size', 1) == 1: logging.info( - "Sequence parallel should only be used with tensor parallel size > 1. Setting sequence parallel to False" + "Sequence parallel should only be used with tensor parallel size > 1. " + "Setting sequence parallel to False" ) with open_dict(self.cfg): self.cfg.sequence_parallel = False @@ -1002,7 +1008,8 @@ def _validate_and_override_config(self): if self.cfg.get('gradient_accumulation_fusion', False): if data_parallel_size > 1 and pipeline_model_parallel_size == 1 and not distributed_fused_adam: logging.info( - "When not using pipeline model parallel, gradient accumulation fusion can only be used with distributed_fused_adam." + "When not using pipeline model parallel, " + "gradient accumulation fusion can only be used with distributed_fused_adam." ) with open_dict(self.cfg): self.cfg.gradient_accumulation_fusion = False @@ -1123,7 +1130,8 @@ def _get_total_params_across_model_parallel_groups_enc_dec(self, model): parallel_state.get_pipeline_model_parallel_rank() == self.cfg.get('pipeline_model_parallel_split_rank', 0) or parallel_state.is_pipeline_last_stage() ): - # If the current rank is the in the decoder first stage (decoder emb) or last rank (output layer), subtract those weights since it is already accounted for in the encoder first stage. + # If the current rank is the in the decoder first stage (decoder emb) or last rank (output layer), + # subtract those weights since it is already accounted for in the encoder first stage. # TODO: If we support embedding untying with PP > 1, we will need to update this. num_word_embedding_parameters = sum([p.nelement() for p in model.word_embeddings_weight()]) num_parameters_on_device -= num_word_embedding_parameters @@ -1180,7 +1188,7 @@ def build_model_parallel_config(self) -> ModelParallelConfig: config_mapping = { "perform_initialization": True, # initailize weights when constructing the module "fp16": self.torch_dtype == torch.float16 - and megatron_amp_O2, # NeMo does not currently support fp16 training with megatron amp O2, eval and inference is supported + and megatron_amp_O2, # fp16 training with megatron amp O2 not supported, eval and inference is supported "bf16": self.torch_dtype == torch.bfloat16 and megatron_amp_O2, "params_dtype": self.params_dtype, "timers": self.megatron_timers, @@ -1229,7 +1237,8 @@ def build_model_parallel_config(self) -> ModelParallelConfig: setattr(model_parallel_config, 'hidden_size', self.cfg.hidden_size) except AttributeError: logging.warning( - f'hidden_size not found in {self.cfg}. Set this in model_parallel_config if using pipeline parallelism.' + f'hidden_size not found in {self.cfg}. ' + 'Set this in model_parallel_config if using pipeline parallelism.' ) return model_parallel_config @@ -1312,7 +1321,8 @@ def find_frozen_submodules(model): logging.debug(f"Ignoring state {submodule} in FSDP.") self.trainer.strategy.kwargs['ignored_states'] = frozen_submodules # FSDP requires uniform status of require_grads - # Diffusion models like SD has frozen parts and needs to be added to 'ignored_states' from sharding for FSDP to work + # Diffusion models like SD has frozen parts and needs to be added to 'ignored_states' + # from sharding for FSDP to work self.model = self.trainer.strategy._setup_model(self.model) # Move the CPU-initialized model (with `use_cpu_initialization=True`) to GPU, which is to avoid # out-of-memory carash before sharding. In case of GPU-initialized model, this is no-op. diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 71941c706d33..6b3da1f35493 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -17,7 +17,6 @@ import queue import warnings from contextlib import nullcontext -from dataclasses import fields from functools import cache, partial from importlib.metadata import version from typing import Any, Dict, Iterator, List, Optional, Union @@ -25,9 +24,7 @@ import packaging import torch from lightning.pytorch.accelerators import CPUAccelerator -from lightning.pytorch.loops.fetchers import _DataFetcherWrapper from lightning.pytorch.trainer.trainer import Trainer -from omegaconf import OmegaConf from omegaconf.dictconfig import DictConfig from nemo.collections.common.parts.utils import apply_rope_scaling, extend_instance @@ -69,7 +66,7 @@ TextGeneration, ) from nemo.collections.nlp.parts import utils_funcs -from nemo.collections.nlp.parts.utils_funcs import activation_to_func, get_last_rank +from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes import Exportable from nemo.core.classes.common import PretrainedModelInfo from nemo.core.neural_types import ChannelType, NeuralType @@ -78,8 +75,7 @@ from nemo.utils.te_utils import is_float8tensor, te_version try: - import megatron.core as core - from megatron.core import InferenceParams, parallel_state, tensor_parallel + from megatron.core import InferenceParams, parallel_state from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder from megatron.core.datasets.gpt_dataset import GPTDataset, GPTDatasetConfig, MockGPTDataset from megatron.core.datasets.utils import get_blend_from_list @@ -101,9 +97,7 @@ from megatron.core.utils import ( drain_embedding_wgrad_compute, get_model_config, - init_method_normal, is_te_min_version, - scaled_init_method_normal, ) HAVE_MEGATRON_CORE = True @@ -143,14 +137,14 @@ def mcore_supports_moe() -> bool: if not HAVE_MEGATRON_CORE: return False try: - from megatron.core.transformer.moe.router import TopKRouter + from megatron.core.transformer.moe.router import TopKRouter # noqa: F401 return True except ImportError: return False -## TODO: This function will not work if TE is not installed +# TODO: This function will not work if TE is not installed def get_specs(spec_name, transformer_config=None, use_te=True, hyena_cfg: Dict = None, fp8=False): from nemo.collections.nlp.models.language_modeling.megatron.gemma2.gemma2_spec import get_gemma2_layer_spec @@ -331,7 +325,8 @@ class MegatronGPTModel(MegatronBaseModel, TextGeneration): def __init__(self, cfg: DictConfig, trainer: Trainer): if not HAVE_MEGATRON_CORE: logging.warning( - "megatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." + "megatron-core was not found. Please see the NeMo README for installation instructions:" + "https://github.com/NVIDIA/NeMo#megatron-gpt." ) # this prevents base constructor from initializing tokenizer self.tokenizer = None @@ -371,7 +366,8 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): if self.cfg.get('expert_model_parallel_size', 1) > 1 and self.with_distributed_adam: if not self.use_mcore_dist_optim: raise ValueError( - 'Expert parallelism is currently not supporting Apex distributed optimizer, use Mcore distributed optimizer instead' + 'Expert parallelism is currently not supporting Apex distributed optimizer,' + 'use Mcore distributed optimizer instead' ) if self.cfg.optim.get('overlap_param_gather_with_optimizer_step', False): @@ -424,7 +420,8 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): if self.megatron_amp_O2: if not self.with_distributed_adam and not self.cfg.get("use_cpu_initialization", False): - # Pre-allocate the model on GPU to have master parameters allocated on the same device with matching data type + # Pre-allocate the model on GPU to have master parameters allocated + # on the same device with matching data type if isinstance(self.model, list): for module in self.model: module.cuda(torch.cuda.current_device()) @@ -471,7 +468,8 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): self.reset_lr_steps = self.cfg.get('reset_lr_steps', False) if self.reset_lr and (not self.with_distributed_adam or not self.megatron_amp_O2): raise ValueError( - 'Learning rate reset feature is only supported with the distributed optmizer and megatron_amp_O2 for now.' + 'Learning rate reset feature is only supported with the distributed optmizer' + 'and megatron_amp_O2 for now.' ) # default to false since this doesn't work with sequence parallelism currently @@ -805,7 +803,8 @@ def initialize_ub_func(self): ub_cfgs = self.cfg.get('ub_tp_comm_overlap_cfg', None) if ub_cfgs is None: warnings.warn( - "Couldn't find TP config. Please check the path correctness. Initializing TP comm overlap with the default config." + "Couldn't find TP config. Please check the path correctness." + "Initializing TP comm overlap with the default config." ) input_shape = [ @@ -1002,7 +1001,7 @@ def training_step(self, dataloader_iter): batch_size=1, ) - ## logging + # logging if self.log_train_loss: # When using pipeline parallelism, loss is calculated only in the last pipeline stage and # it should be casted to other pipeline stages for logging. @@ -1043,11 +1042,11 @@ def training_step(self, dataloader_iter): if self.rampup_batch_size: self.prev_global_batch_size = current_global_batch_size self.prev_consumed_samples = consumed_samples - num_microbatch_calculator.update( + num_microbatch_calculator.update( # noqa: F821 consumed_samples=consumed_samples, consistency_check=False, ) - current_global_batch_size = num_microbatch_calculator.current_global_batch_size + current_global_batch_size = num_microbatch_calculator.current_global_batch_size # noqa: F821 self.log('global_batch_size', current_global_batch_size, prog_bar=True, rank_zero_only=True, batch_size=1) self.if_first_step = 1 @@ -1120,7 +1119,7 @@ def allreduce_fsdp_sharding_omitted_gradients(self): def allreduce_first_last_embeddings(self): - # Modified from megatron-lm: https://github.com/NVIDIA/Megatron-LM/blob/d41696840ed0a7edb7e0499eb82a48ae112d9bb3/megatron/training.py#L407 + # Modified from megatron-lm: https://github.com/NVIDIA/Megatron-LM/blob/d41696840ed0a7edb7e0499eb82a48ae112d9bb3/megatron/training.py#L407 # pylint: disable=line-too-long # All-reduce word_embeddings' grad across first and last stages to ensure # that word_embeddings parameters stay in sync. # This should only run for models that support pipelined model parallelism @@ -1141,7 +1140,8 @@ def allreduce_first_last_embeddings(self): word_embeddings_weight = ( module.shared_embedding_or_output_weight() if self.mcore_gpt else module.word_embeddings_weight() ) - # (@adithyare) adapter training now extends MegatronGPTModel so we have to add this check here to ensure we do not perform all_reduce when grad is None. + # (@adithyare) adapter training now extends MegatronGPTModel so we have to add this + # check here to ensure we do not perform all_reduce when grad is None. # grad can be None when performing PeFT training. if word_embeddings_weight.requires_grad: if self.megatron_amp_O2: @@ -1351,7 +1351,8 @@ def fwd_output_and_loss_func(dataloader_iter, model, checkpoint_activations_all_ import transformer_engine_torch as tex except ModuleNotFoundError as e: logging.error( - "Please update Transformer Engine to >= 1.10 to use Context Parallel with THD format data" + "Please update Transformer Engine to >= 1.10 " + "to use Context Parallel with THD format data" ) raise e cp_rank = parallel_state.get_context_parallel_rank() @@ -1402,7 +1403,8 @@ def loss_func(output_tensor): loss_for_ub = self.loss_func(batch['loss_mask'], batch['num_valid_tokens_in_ub'], output_tensor) cp_size = parallel_state.get_context_parallel_world_size() if isinstance(loss_for_ub, dict): - # TODO: need a better way to check if loss_func is returning more stuff than just loss... (@adithyare) + # TODO: need a better way to check if loss_func is returning + # more stuff than just loss... (@adithyare) if set(loss_for_ub.keys()) == set( ["loss", "query_hs", "pos_doc_hs", "pos_cs", "neg_cs", "diff_cs"] @@ -1459,7 +1461,8 @@ def loss_func(output_tensor): torch.tensor([num_valid_tokens_in_ub]).cuda().clone().detach(), ] ) - # Could potentially reduce num_valid_samples_in_microbatch and use that to aggregate instead of len(self._validation_ds) + # Could potentially reduce num_valid_samples_in_microbatch and use that to + # aggregate instead of len(self._validation_ds) torch.distributed.all_reduce( loss_sum_and_ub_size_all_gpu, group=parallel_state.get_data_parallel_group() ) @@ -1641,10 +1644,12 @@ def build_train_valid_test_datasets(self): test_iters * global_batch_size, ] - # The line below exploits a quirk in mcore dataset construction, to make number of epochs for validation and test equal to 1 - # The mcore dataset implementation uses the number N we provide via train_valid_test_num_samples to derive parameter E such that + # The line below exploits a quirk in mcore dataset construction, to make number of epochs + # for validation and test equal to 1. The mcore dataset implementation uses the number N we + # provide via train_valid_test_num_samples to derive parameter E such that # E = argmin_e e * N_d >= N, or equivalently E = ceildiv(N, N_d) - # Where N_d is the total number of samples in a dataset (files), and N is the requested number of samples (provided for every split in the list below). + # Where N_d is the total number of samples in a dataset (files), and N is the requested + # number of samples (provided for every split in the list below). # Setting N = 1 we force E to be 1 as well legacy_dataset = self.cfg.data.get("legacy_dataset", False) if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float): @@ -1723,7 +1728,7 @@ def build_train_valid_test_datasets(self): logging.info(f'Length of val dataset: {len(self._validation_ds)}') if self._test_ds is not None: logging.info(f'Length of test dataset: {len(self._test_ds)}') - logging.info(f'Finished building GPT datasets.') + logging.info('Finished building GPT datasets.') return self._train_ds, self._validation_ds, self._test_ds @@ -1815,7 +1820,8 @@ def setup(self, stage=None): self.setup_test_data(self.cfg.data) # Override limit_train_batches in terms of num of microbatches self._reconfigure_limit_batches(self.trainer.limit_train_batches, self._train_dl, 'train') - # Override limit_val_batches to be a multiple of num microbatches to prevent val_step from exiting in between a step + # Override limit_val_batches to be a multiple of num microbatches to prevent + # val_step from exiting in between a step self._reconfigure_limit_batches(self.trainer.limit_val_batches, self._validation_dl, 'val') # Data cache generation only @@ -1835,7 +1841,8 @@ def setup_training_data(self, cfg): if hasattr(self, '_train_ds'): consumed_samples = self.compute_consumed_samples(0) logging.info( - f'Setting up train dataloader with len(len(self._train_ds)): {len(self._train_ds)} and consumed samples: {consumed_samples}' + 'Setting up train dataloader with len(len(self._train_ds)): ' + f'{len(self._train_ds)} and consumed samples: {consumed_samples}' ) self._train_dl = self.build_pretraining_data_loader(self._train_ds, consumed_samples) @@ -1843,12 +1850,13 @@ def setup_validation_data(self, cfg): if hasattr(self, '_validation_ds'): consumed_samples = 0 logging.info( - f'Setting up validation dataloader with len(len(self._validation_ds)): {len(self._validation_ds)} and consumed samples: {consumed_samples}' + 'Setting up validation dataloader with len(len(self._validation_ds)): ' + f'{len(self._validation_ds)} and consumed samples: {consumed_samples}' ) drop_last = True if not self.validation_drop_last: - logging.info(f'Drop last in validation dataset is set to False') + logging.info('Drop last in validation dataset is set to False') drop_last = False pad_samples_to_global_batch_size = False if self.cfg.data.get('pad_samples_to_global_batch_size', False): @@ -1864,7 +1872,8 @@ def setup_test_data(self, cfg): if self._test_ds is not None: consumed_samples = 0 logging.info( - f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' + 'Setting up test dataloader with len(len(self._test_ds)): ' + f'{len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples) else: @@ -1934,7 +1943,7 @@ def list_available_models(self): return None def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device # pylint: disable=line-too-long When using pipeline parallelism, we need the global batch to remain on the CPU, since the memory overhead will be too high when using a large number of microbatches. Microbatches are transferred from CPU to GPU inside the pipeline. @@ -1947,7 +1956,7 @@ def _validate_trainer(self): """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( - f'Gradient accumulation is done within training_step. trainer.accumulate_grad_batches must equal 1' + 'Gradient accumulation is done within training_step. trainer.accumulate_grad_batches must equal 1' ) @classmethod @@ -1961,7 +1970,7 @@ def list_available_models(cls) -> Optional[PretrainedModelInfo]: result.append( PretrainedModelInfo( pretrained_model_name="megatron_gpt_345m", - location="https://api.ngc.nvidia.com/v2/models/nvidia/nemo/megatron_gpt_345m/versions/1/files/megatron_gpt_345m.nemo", + location="https://api.ngc.nvidia.com/v2/models/nvidia/nemo/megatron_gpt_345m/versions/1/files/megatron_gpt_345m.nemo", # pylint: disable=line-too-long description="345M parameter GPT generative Megatron model.", ) ) @@ -2010,7 +2019,8 @@ def on_load_checkpoint(self, checkpoint) -> None: missing_keys, expected_keys = module.load_state_dict(checkpoint_state_dict, strict=False) if all(s.endswith('_extra_state') for s in missing_keys): logging.warning( - f'Loding checkpoint created with Transformer Engine version lower than 1.13. Missing layers {missing_keys} will be ignored.' + 'Loding checkpoint created with Transformer Engine version lower than 1.13.' + f'Missing layers {missing_keys} will be ignored.' ) else: raise e @@ -2235,7 +2245,7 @@ def build_transformer_config(self) -> TransformerConfig: elif self.cfg.get('fp8_hybrid', False): fp8 = 'hybrid' else: - raise ValueError(f"fp8 enabled but fp8_format (fp8_e4m3 | fp8_hybrid) is not set.") + raise ValueError("fp8 enabled but fp8_format (fp8_e4m3 | fp8_hybrid) is not set.") if self.cfg.get('enable_cuda_graph', False): assert HAVE_TE, "Transformer Engine is required for cudagraphs."