Skip to content

Commit

Permalink
Add Mcore DistributedDataParallel and distributed optimizer into Nemo (
Browse files Browse the repository at this point in the history
…NVIDIA#9034)

* merge mcore dist optim

Signed-off-by: Gao Deng <gdeng@nvidia.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* clean up

Signed-off-by: Gao Deng <gdeng@nvidia.com>

* address comments

Signed-off-by: Gao Deng <gdeng@nvidia.com>

* fix import and CodeQL comments

Signed-off-by: Gao Deng <gdeng@nvidia.com>

* remove two type check

Signed-off-by: Gao Deng <gdeng@nvidia.com>

---------

Signed-off-by: Gao Deng <gdeng@nvidia.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Eric Harper <complex451@gmail.com>
  • Loading branch information
3 people authored May 7, 2024
1 parent f53b5d7 commit 451d459
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

try:
from megatron.core import ModelParallelConfig, parallel_state
from megatron.core.distributed import DistributedDataParallel as McoreDDP
from megatron.core.transformer.module import Float16Module as MCoreFloat16Module
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.utils import init_method_normal, scaled_init_method_normal
Expand Down Expand Up @@ -147,7 +148,8 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True):
# set the megatron core model parallel config
self.model_parallel_config: ModelParallelConfig = self.build_model_parallel_config()

self.with_distributed_adam = cfg.optim.get('name') == 'distributed_fused_adam'
self.use_mcore_dist_optim = cfg.optim.get('name') == 'mcore_distributed_optim'
self.with_distributed_adam = cfg.optim.get('name') == 'distributed_fused_adam' or self.use_mcore_dist_optim
self.with_megatron_fused_adam = cfg.optim.get('name') == 'megatron_fused_adam'

# used in NVIDIA NGC PyTorch containers
Expand Down Expand Up @@ -301,7 +303,6 @@ def _wrap_model_for_O2(self):
}

args = mcore_args if is_mcore_model else nemo_args

# Model wrapper to convert both model and inputs to half precision
if isinstance(self.model, list):
converted_model = []
Expand All @@ -312,13 +313,12 @@ def _wrap_model_for_O2(self):
else:
args['module'] = self.model
self.model = Float16Wrapper(**args)

args.pop('module')

def get_model_module_list(self):
if isinstance(self.model, list):
return [
model.module if isinstance(model, (Float16Module, MCoreFloat16Module)) else model
model.module if isinstance(model, (Float16Module, MCoreFloat16Module, McoreDDP)) else model
for model in self.model
]
elif isinstance(self.model, (Float16Module, MCoreFloat16Module)):
Expand Down Expand Up @@ -612,7 +612,7 @@ def configure_gradient_clipping(self, *args, **kwargs):
if clip_val <= 0:
return

if self.with_megatron_fused_adam:
if self.with_megatron_fused_adam or self.use_mcore_dist_optim:
# Gradient clipping is done in optimizer step
return

Expand Down Expand Up @@ -847,7 +847,7 @@ def configure_optimizers(self):
)

# Configure distributed optimizer
if self.with_distributed_adam:
if self.with_distributed_adam and not self.use_mcore_dist_optim:

# Initialize param buckets if explicitly provided
if getattr(self, 'distributed_adam_buckets', None) is not None:
Expand Down Expand Up @@ -930,7 +930,10 @@ def _validate_and_override_config(self):
# async grad allreduce. This should be fixed!
# For now we must disable it whenever using the baseline implementaion.
# The distributed adam from apex does work with gradient accumulation fusion.
distributed_fused_adam = self.cfg.optim.get('name', 'fused_adam') == 'distributed_fused_adam'
distributed_fused_adam = (
self.cfg.optim.get('name', 'fused_adam') == 'distributed_fused_adam'
or self.cfg.optim.get('name', 'fused_adam') == 'mcore_distributed_optim'
)
pipeline_model_parallel_size = self.cfg.get('pipeline_model_parallel_size', 1)
data_parallel_size = app_state.data_parallel_size

Expand Down
90 changes: 77 additions & 13 deletions nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
from megatron.core.datasets.utils import get_blend_from_list
from megatron.core.dist_checkpointing.dict_utils import dict_list_map_inplace
from megatron.core.dist_checkpointing.mapping import LocalNonpersitentObject, ShardedObject
from megatron.core.distributed import DistributedDataParallel as McoreDDP
from megatron.core.distributed import DistributedDataParallelConfig, finalize_model_grads

# NeMo's implementation of the get_gpt_layer_ammo_spec function is temporarily used
# from megatron.core.inference.gpt.model_specs import get_gpt_layer_ammo_spec
Expand All @@ -103,7 +105,12 @@
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.transformer.module import Float16Module as MCoreFloat16Module
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.utils import drain_embedding_wgrad_compute, init_method_normal, scaled_init_method_normal
from megatron.core.utils import (
drain_embedding_wgrad_compute,
get_model_config,
init_method_normal,
scaled_init_method_normal,
)

HAVE_MEGATRON_CORE = True

Expand Down Expand Up @@ -303,9 +310,11 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
if not self.megatron_amp_O2 and self.cfg.get('expert_model_parallel_size', 1) > 1:
raise ValueError('Expert parallelism is only supported when using megatron_amp_O2')

# TODO(akoumparouli): this is temporary and will be removed in the future.
if self.cfg.get('expert_model_parallel_size', 1) > 1 and self.with_distributed_adam:
raise ValueError('Expert parallelism is currently not supporting distributed optimizer')
if not self.use_mcore_dist_optim:
raise ValueError(
'Expert parallelism is currently not supporting Apex distributed optimizer, use Mcore distributed optimizer instead'
)

self.transformer_engine = cfg.get('transformer_engine', False)
if self.megatron_amp_O2 and not self.transformer_engine:
Expand All @@ -332,7 +341,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
)

# if we're not using interleaved, then self.model is a module.
if self.cfg.get('virtual_pipeline_model_parallel_size', None) is None:
if self.cfg.get('virtual_pipeline_model_parallel_size', None) is None and (not self.use_mcore_dist_optim):
self.model = self.model[0]

if self.megatron_amp_O2:
Expand Down Expand Up @@ -495,9 +504,39 @@ def setup_optimizer_param_groups(self):
else:
self._optimizer_param_groups = get_params_for_weight_decay_optimization(self.model)

def setup_mcore_distributed_parallel(self):
"""Set up mcore distributed data parallel """
if self.with_distributed_adam and self.use_mcore_dist_optim:
config = get_model_config(self.model[0])
ddp_config = DistributedDataParallelConfig(
grad_reduce_in_fp32=(self.cfg.optim.get('grad_sync_dtype', 'fp32') == 'fp32'),
overlap_grad_reduce=self.cfg.optim.get('overlap_grad_sync', False),
use_distributed_optimizer=True,
check_for_nan_in_grad=self.cfg.optim.get('check_for_nan_in_grad', False),
# mcore bucket_size is based on num of parameters, therefore not
# using bucket_cap_mb to configure bucket_size here
bucket_size=self.cfg.optim.get('ddp_bucket_size', None),
)
self.model = [
McoreDDP(
config,
ddp_config,
model_chunk,
data_parallel_group=parallel_state.get_data_parallel_group(with_context_parallel=True),
expert_data_parallel_group=parallel_state.get_data_modulo_expert_parallel_group(),
# Turn off bucketing for model_chunk 2 onwards, since communication for these
# model chunks is overlapped with compute anyway.
disable_bucketing=(model_chunk_idx > 0),
)
for (model_chunk_idx, model_chunk) in enumerate(self.model)
]

# (TODO) Broadcast params from data parallel src rank to other data parallel ranks.
# by calling model_module.broadcast_params() if the model is randomly initialized.

def configure_optimizers(self):

if self.with_distributed_adam:
if self.with_distributed_adam and not self.use_mcore_dist_optim:

# Special handling for embedding grads
modules = self.get_model_module_list()
Expand Down Expand Up @@ -597,16 +636,32 @@ def fwd_bwd_step(self, dataloader_iter, forward_only, first_val_step=None):
if forward_only:
if self.validation_param_sync_overlap:
param_sync_func = self.sync_overlap_parameters
else:
elif not self.use_mcore_dist_optim:
no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,)
grad_sync_func = self.reduce_overlap_gradients
param_sync_func = self.sync_overlap_parameters
else:
if self.cfg.optim.get("overlap_grad_sync", False):
no_sync_func = [model_chunk.no_sync for model_chunk in self.model]
no_sync_func = no_sync_func[0] if len(self.model) == 1 else no_sync_func

if self.cfg.optim.get("delay_grad_reduce", True):
grad_sync_func = [model_chunk.start_grad_sync for model_chunk in self.model]
grad_sync_func = grad_sync_func[0] if len(self.model) == 1 else grad_sync_func
if self.cfg.optim.get("overlap_param_sync", False) and self.cfg.optim.get("delay_param_gather", False):
param_sync_func = [
lambda x, model_index=model_index: self._optimizer.finish_param_sync(model_index, x)
for model_index in range(len(self.model))
]
param_sync_func = param_sync_func[0] if len(self.model) == 1 else param_sync_func

# pipeline schedules will get these from self.model.config
for module in self.get_model_module_list():
module.config.no_sync_func = no_sync_func
module.config.grad_sync_func = grad_sync_func
module.config.param_sync_func = param_sync_func
if self.use_mcore_dist_optim:
module.config.finalize_model_grads_func = finalize_model_grads

# run forward and backwards passes for an entire global batch
# we do this inside training_step to support pipeline parallelism
Expand Down Expand Up @@ -700,10 +755,15 @@ def training_step(self, dataloader_iter):
if self.prev_global_batch_size != current_global_batch_size and self.prev_global_batch_size:
self.trainer.should_stop = True

# zero out the mcore grad buf
if self.use_mcore_dist_optim:
for model_chunk in self.model:
model_chunk.zero_grad_buffer()

# we zero grads here because we also call backward in the megatron-core fwd/bwd functions
self._optimizer.zero_grad()

if self.with_distributed_adam:
if self.with_distributed_adam and not self.use_mcore_dist_optim:
# hack to enable overlapping param sync and forward compute
# note: the distributed optimizer monkey-patches each
# parameter's __getattribute__ function so that it can
Expand Down Expand Up @@ -779,10 +839,12 @@ def training_step(self, dataloader_iter):
# Reduce the gradients omitted from FSDP-sharding
self.allreduce_fsdp_sharding_omitted_gradients()
elif self.with_distributed_adam:
# synchronize asynchronous grad reductions
# note: not necessary, but reduces performance degradation
# from multiple simultaneous NCCL calls
self._optimizer._finish_bucket_grad_sync()
if not self.use_mcore_dist_optim:
# synchronize asynchronous grad reductions
# note: not necessary, but reduces performance degradation
# from multiple simultaneous NCCL calls
self._optimizer._finish_bucket_grad_sync()
# else: Mcore distributed optim calls finalize_model_grads to finish grad sync
elif self.megatron_amp_O2:
# when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously)
if (
Expand All @@ -798,8 +860,10 @@ def training_step(self, dataloader_iter):
self.allreduce_gradients() # @sangkug we think this is causing memory to blow up (hurts perf)
self.megatron_timer_stop('gradient_allreduce')

if self.cfg.get('pipeline_model_parallel_size', 1) > 1 and self.cfg.get(
'share_embeddings_and_output_weights', True
if (
not self.use_mcore_dist_optim
and self.cfg.get('pipeline_model_parallel_size', 1) > 1
and self.cfg.get('share_embeddings_and_output_weights', True)
):
self.megatron_timer_start('allreduce_first_last_embeddings', log_level=1)
# when using pipeline parallelism the first and last stage must keep embeddings in sync
Expand Down
8 changes: 7 additions & 1 deletion nemo/collections/nlp/parts/megatron_trainer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def _training_strategy(self) -> Union[NLPDDPStrategy, NLPFSDPStrategy]:
if self.cfg.model.get('fsdp', False):
assert (
not self.cfg.model.optim.get('name') == 'distributed_fused_adam'
and not self.cfg.model.optim.get('name') == 'mcore_distributed_optim'
), 'Distributed optimizer cannot be used with FSDP.'
sharded_checkpoint = self.cfg.model.get('fsdp_sharded_checkpoint', False)
if self.cfg.model.get('tensor_model_parallel_size', 1) > 1:
Expand Down Expand Up @@ -100,7 +101,12 @@ def _plugins(self) -> list:
"""
megatron_amp_O2 = self.cfg.model.get('megatron_amp_O2', False)
with_distributed_adam = (
self.cfg.model.optim.get('name') == 'distributed_fused_adam' if self.cfg.model.get('optim') else False
(
self.cfg.model.optim.get('name') == 'distributed_fused_adam'
or self.cfg.model.optim.get('name') == 'mcore_distributed_optim'
)
if self.cfg.model.get('optim')
else False
)

plugins = []
Expand Down
1 change: 1 addition & 0 deletions nemo/collections/nlp/parts/nlp_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def configure_ddp(self):
hasattr(self.model, 'with_distributed_adam') and self.model.with_distributed_adam
):
# do not use DDP if using megatron amp O2 or distributed optimizer
self.model.setup_mcore_distributed_parallel()
self._model = self.model
else:
app_state = AppState()
Expand Down
52 changes: 47 additions & 5 deletions nemo/core/classes/modelPT.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@

import hydra
import torch

try:
from megatron.core.optimizer import OptimizerConfig, get_megatron_optimizer
from megatron.core.utils import get_model_config

HAVE_MEGATRON_CORE = True

except (ImportError, ModuleNotFoundError):

HAVE_MEGATRON_CORE = False

from omegaconf import DictConfig, OmegaConf, open_dict
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.utilities import model_summary, rank_zero_only
Expand All @@ -32,7 +43,7 @@
from nemo.core import optim
from nemo.core.classes.common import Model
from nemo.core.connectors.save_restore_connector import SaveRestoreConnector
from nemo.core.optim import prepare_lr_scheduler
from nemo.core.optim import McoreDistributedOptimizer, prepare_lr_scheduler
from nemo.utils import logging, model_utils
from nemo.utils.app_state import AppState
from nemo.utils.debug_hook import register_debug_hooks
Expand Down Expand Up @@ -570,6 +581,31 @@ def setup_multiple_test_data(self, test_data_config: Union[DictConfig, Dict]):
if self._test_dl is not None and type(self._test_dl) in [list, tuple]:
self._test_names = ['test_{}_'.format(idx) for idx in range(len(self._test_dl))]

def setup_megatron_optimization(self, optim_config: Union[Dict[str, Any], DictConfig]):
"""
Setup mcore optimizer config.
Args:
optim_config: Nemo optim args used to set up Mcore optimizer options.
"""

config = get_model_config(self.model[0])

megatron_optim_config = OptimizerConfig(
fp16=config.fp16,
bf16=config.bf16,
params_dtype=config.params_dtype,
lr=optim_config['lr'],
weight_decay=optim_config['weight_decay'],
adam_beta1=optim_config['betas'][0],
adam_beta2=optim_config['betas'][1],
clip_grad=self.trainer.gradient_clip_val,
use_distributed_optimizer=self.use_mcore_dist_optim,
overlap_grad_reduce=self.cfg.optim.get('overlap_grad_sync', False),
overlap_param_gather=self.cfg.optim.get('overlap_param_sync', False),
)
return megatron_optim_config

def setup_optimization(
self, optim_config: Optional[Union[DictConfig, Dict]] = None, optim_kwargs: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -718,14 +754,20 @@ def setup_optimization(
raise e

else:
optimizer = optim.get_optimizer(optimizer_name)
optimizer = optimizer(self._optimizer_param_groups, **optimizer_args)
if optimizer_name == 'mcore_distributed_optim':
# setup megatron_optim_config and get Mcore based optimizer with the wrapper
megatron_optim_config = self.setup_megatron_optimization(optimizer_args)
_megatron_optimizer = get_megatron_optimizer(megatron_optim_config, self.model,)
optimizer = McoreDistributedOptimizer(_megatron_optimizer)

logging.info("Optimizer config = %s", str(optimizer))
else:
optimizer = optim.get_optimizer(optimizer_name)
optimizer = optimizer(self._optimizer_param_groups, **optimizer_args)

logging.info("Optimizer config = %s", str(optimizer))

self._optimizer = optimizer

# Try to instantiate scheduler for optimizer
self._scheduler = prepare_lr_scheduler(
optimizer=self._optimizer, scheduler_config=scheduler_config, train_dataloader=self._train_dl
)
Expand Down
1 change: 1 addition & 0 deletions nemo/core/optim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
WarmupPolicy,
prepare_lr_scheduler,
)
from nemo.core.optim.mcore_optim import McoreDistributedOptimizer
from nemo.core.optim.novograd import Novograd
from nemo.core.optim.optimizer_with_main_params import MainParamsOptimizerWrapper
from nemo.core.optim.optimizers import get_optimizer, parse_optimizer_args, register_optimizer
Loading

0 comments on commit 451d459

Please sign in to comment.