Skip to content

Commit

Permalink
[NeuronxDistributed] Add doc for pipeline parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
YangFei1990 authored and aws-mesharma committed Oct 27, 2023
1 parent f561c27 commit 340aa91
Show file tree
Hide file tree
Showing 12 changed files with 712 additions and 19 deletions.
3 changes: 2 additions & 1 deletion conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@
# Exclude private github from linkcheck. Readthedocs only exposes the ssh-agent to the 'checkout' build step, which is too early for the linkchecker to run.
linkcheck_ignore = [r'http://localhost:\d+/',r'https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/announcements/neuron2.x/dlami-pytorch-introduce.html' ,r'https://github\.com/aws-neuron/private-aws-neuron-sdk-staging/',r'https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/announcements/neuron2.x/dlami-pytorch-introduce.html', r'https://awsdocs-neuron-staging.readthedocs-hosted.com/en/latest/frameworks/tensorflow/tensorflow-neuronx/setup/tensorflow-neuronx-install.html#install-tensorflow-neuronx',r'https://github.com/aws-neuron/aws-neuron-samples/tree/master/torch-neuronx#inference',r'https://github.com/aws-neuron/aws-neuron-samples/tree/master/torch-neuronx#training', r'https://github.com/aws/deep-learning-containers/blob/master/available_images.md#neuron-containers',r'https://github.com/aws-neuron/aws-neuron-sagemaker-samples/tree/master/inference/inf2-bert-on-sagemaker'
,r'https://github.com/awslabs/multi-model-server/blob/master/docs/management_api.md',r'https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/training/dp_bert_hf_pretrain/run_dp_bert_large_hf_pretrain_bf16_s128.sh',r' https://github.com/pytorch/xla/blob/master/test/test_train_mp_mnist.py',r'https://github.com/pytorch/xla/blob/v1.10.0/TROUBLESHOOTING.md'
,r'https://github.com/tensorflow/docs/blob/master/site/en/r1/guide/saved_model.md',r'https://github.com/tensorflow/tensorflow/blob/master/tensorflow/compiler/xla/g3doc/index.md',r'https://github.com/pytorch/xla/blob/master/test/test_train_mp_mnist.py',r'https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/transformers-neuronx/inference/meta-llama-2-13b-sampling.ipynb',r'https://github.com/aws-neuron/aws-neuron-sdk/blob/master/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.ipynb',r'https://github.com/aws-neuron/aws-neuron-parallelcluster-samples/blob/master/examples/jobs/neuronx-nemo-megatron-llamav2-job.md', 'https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/training/tp_dp_gpt_neox_hf_pretrain/tp_dp_gpt_neox_20b_hf_pretrain/tp_dp_gpt_neox_20b_hf_pretrain.py#L273C1-L289C55', 'https://github.com/aws-neuron/neuronx-distributed/blob/main/src/neuronx_distributed/parallel_layers/layer_norm.py#L32']
,r'https://github.com/tensorflow/docs/blob/master/site/en/r1/guide/saved_model.md',r'https://github.com/tensorflow/tensorflow/blob/master/tensorflow/compiler/xla/g3doc/index.md',r'https://github.com/pytorch/xla/blob/master/test/test_train_mp_mnist.py',r'https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/transformers-neuronx/inference/meta-llama-2-13b-sampling.ipynb'
,r'https://github.com/aws-neuron/aws-neuron-sdk/blob/master/src/examples/pytorch/torch-neuronx/t5-inference-tutorial.ipynb',r'https://github.com/aws-neuron/aws-neuron-parallelcluster-samples/blob/master/examples/jobs/neuronx-nemo-megatron-llamav2-job.md',r'https://github.com/pytorch/PiPPy/blob/main/pippy/IR.py#L697', r'https://github.com/pytorch/pytorch/blob/main/torch/fx/_symbolic_trace.py#L241', r'https://github.com/pytorch/xla/blob/master/torch_xla/utils/checkpoint.py#L129', r'https://github.com/aws-neuron/neuronx-distributed/blob/main/src/neuronx_distributed/parallel_layers/layer_norm.py#L32', r'https://github.com/aws-neuron/aws-neuron-samples/blob/master/torch-neuronx/training/tp_dp_gpt_neox_hf_pretrain/tp_dp_gpt_neox_20b_hf_pretrain/tp_dp_gpt_neox_20b_hf_pretrain.py#L273C1-L289C55']
linkcheck_exclude_documents = [r'src/examples/.*', 'general/announcements/neuron1.x/announcements', r'release-notes/.*',r'containers/.*',r'general/.*']
nitpicky = True
144 changes: 134 additions & 10 deletions libraries/neuronx-distributed/api_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ API Reference Guide (``neuronx-distributed`` )
======================================================================

Neuronx-Distributed is XLA based library for distributed training and inference.
As part of this library, currently we support 2D parallelism: Tensor-Parallelism
and DataParallelism. We also support Zero1 optimizer to shard the optimizer weights.
As part of this library, we support 3D parallelism: Tensor-Parallelism, Pipeline-Parallelism
and Data-Parallelism. We also support Zero1 optimizer to shard the optimizer weights.
To support tensor-parallelism on Neuron, we adopted the Apex Library
built for CUDA devices. We modified the implementations to work with
XLA. This document enlist the different APIs and modules provided by the library
Expand All @@ -29,6 +29,8 @@ Parameters:

``tensor_model_parallel_size`` : This should set the number of tensor
parallel workers. Note the default value is set to 1
``pipeline_model_parallel_size`` : This should set the number of pipeline
parallel workers. Note the default value is set to 1

Other helper APIs:
''''''''''''''''''
Expand All @@ -40,6 +42,10 @@ Other helper APIs:
: Returns the tensor parallel world size.
- ``neuronx_distributed.parallel_state.get_tensor_model_parallel_rank()``
: Returns the rank of the worker within the tensor parallel group
- ``neuronx_distributed.parallel_state.get_pipeline_model_parallel_size()``
: Returns the pipeline parallel world size.
- ``neuronx_distributed.parallel_state.get_pipeline_model_parallel_rank()``
: Returns the rank of the worker within the pipeline parallel group
- ``neuronx_distributed.parallel_state.get_data_parallel_rank()`` :
Returns the rank of the worker in the data parallel group.
- ``neuronx_distributed.parallel_state.get_data_parallel_group(as_list=False)``
Expand All @@ -52,6 +58,11 @@ Other helper APIs:
tensor parallel size and the global world size. as_list argument when
set to True, would return the group as a List[List] otherwise it
would return a torch.distributed.group.
- ``neuronx_distributed.parallel_state.get_pipeline_model_parallel_group(as_list=False)``
: Returns the pipeline parallel group after taking into account the
pipeline parallel size and the global world size. as_list argument when
set to True, would return the group as a List[List] otherwise it
would return a torch.distributed.group.
- ``move_model_to_device(model, device)``: This api moves the model to device by
preserving tensor parallel attributes.

Expand Down Expand Up @@ -246,6 +257,110 @@ Parameters:
smoothing when computing the loss, where 0.0 means no smoothing




Pipeline parallelism:
^^^^^^^^^^^^^^^^^^^^

Neuron Distributed Pipeline Model
'''''''''''''''''''''''''''''''''

::

class NxDPPModel(module: torch.nn.Module,
transformer_layer_cls: Optional[Any] = None,
num_microbatches: int = 1,
output_loss_value_spec: Optional[Union[Dict, Tuple]] = None,
return_mb_loss: bool = False,
broadcast_and_average_loss: bool = False,
pipeline_cuts: Optional[List[str]] = None,
input_names: Optional[List[str]] = None,
leaf_module_cls: Optional[List[Any]] = None,
autowrap_functions: Optional[Tuple[ModuleType]] = None,
autowrap_modules: Optional[Tuple[Callable, ...]] = None,
tracer_cls: Optional[Union[str, Any]] = None,
param_init_fn: Optional[Any] = None,
trace_file_path: Optional[str] = None,
use_zero1_optimizer: bool = False,
)

Parameters:

- ``module``: Module to be distributed with pipeline parallelism

- ``transformer_layer_cls``: The module class of transformer layers

- ``num_microbatches``: Number of pipeline microbatchs

- ``output_loss_value_spec``:
The ``output_loss_value_spec`` value can be specified to disambiguate
which value in the output of `forward` is the loss value on which NxDPPModel should apply
backpropagation. For example, if your ``forward`` returns a tuple ``(loss, model_out)``,
you can specify ``output_loss_value_spec=(True, False)``. Or, if your ``forward`` returns
a dict ``{'loss': loss_value, 'model_out': model_out}``, you can specify
``output_loss_value_spec={'loss': True, 'model_out': False}``
referred from `this <https://github.com/pytorch/PiPPy/blob/main/pippy/IR.py#L697>`__

- ``return_mb_loss``: Whether return a list of loss for all microbatchs

- ``broadcast_and_average_loss``:Whether to broadcast loss to all PP ranks and average across dp ranks, when set to True return_mb_loss must be False

- ``pipeline_cuts``: A list of layer names that will be used to annotate pipeline stage boundaries

- ``input_names``:The input names that will be used for tracing, which will be the same as the model inputs during runtime.

- ``leaf_module_cls``:A list of module classes that should be treated as leaf nodes during tracing. Note transformer layer class will be by default treat as leaf nodes.

- ``autowrap_modules``: (symbolic tracing only)
Python modules whose functions should be wrapped automatically
without needing to use fx.wrap().
reference `here <https://github.com/pytorch/pytorch/blob/main/torch/fx/_symbolic_trace.py#L241>`__

- ``autowrap_functions``: (symbolic tracing only)
Python functions that should be wrapped automatically without
needing to use fx.wrap().
reference `here <https://github.com/pytorch/pytorch/blob/main/torch/fx/_symbolic_trace.py#L241>`__

- ``tracer_cls``:User provided tracer class for symbolic tracing. It can be "hf", "torch" or any tracer class user created.

- ``param_init_fn``:
Function used to initialize parameters. This is useful if user wants to use meta device to do
delayed parameter initialization. param_init_fn should take a module as input and initialize the
parameters that belongs to this module only (not for submodules).

- ``use_zero1_optimizer``: Whether to use the zero1 optimizer. When setting to True the gradient average will be handled over.

Common used APIs

::

NxDPPModel.run_train(**kwargs)

Train the model with PP schedule, which will run both forward and backward in a PP manner.
The kwargs should be the same as the input_names provided to the trace function.
Will output the loss that provided by user from output_loss_value_spec.

::

NxDPPModel.run_eval(**kwargs)

Eval the model with PP schedule, which will run forward only.
The kwargs should be the same as the input_names provided to the trace function.
Will output the loss that provided by user from output_loss_value_spec.

::

NxDPPModel.local_named_parameters(**kwargs)

The parameters that are local to this PP rank. This must be called after the model is partitioned.

::

NxDPPModel.local_named_modules(**kwargs)

The graph modules that are local to this PP rank. This must be called after the model is partitioned.


Checkpointing:
^^^^^^^^^^^^^^

Expand All @@ -258,13 +373,15 @@ Save Checkpoint:

::

def neuronx_distributed.parallel_layers.save(state_dict, save_dir, save_serially = True, down_cast_bf16 = False)
def neuronx_distributed.parallel_layers.save(state_dict, save_dir, save_serially=True, save_xser: bool=False, down_cast_bf16=False)

This API will save the model from each tensor-parallel rank in the
save_dir . Only workers with data parallel rank equal to 0 would be
saving the checkpoints. Each tensor parallel rank would be creating a
``tp_rank_i`` folder inside ``save_dir`` and each ones saves its shard
in the ``tp_rank_i`` folder.
``tp_rank_ii_pp_rank_ii`` folder inside ``save_dir`` and each ones saves its shard
in the ``tp_rank_ii_pp_rank_ii`` folder.
If ``save_xser`` is enabled, the folder name would be ``tp_rank_ii_pp_rank_ii.tensors``
and there will be a ref data file named as ``tp_rank_ii_pp_rank_ii`` in save_dir for each rank.

.. _parameters-4:

Expand All @@ -274,17 +391,21 @@ Parameters:
- ``state_dict: (dict)`` : Model state dict. Its the same dict that you
would save using torch.save
- ``save_dir: (str)`` : Model save directory.
- ``save_serially: (bool)``: This flag would save checkpoints one data-parallel rank at a time.
- ``save_serially: (bool)``: This flag would save checkpoints one model-parallel rank at a time.
This is particularly useful when we are checkpointing large models.
- ``down_cast_bf16: (bool)``: This flag would downcast the state_dict to bf16 before saving.
- ``save_xser: (bool)``: This flag would save the model with torch xla serialization.
This could significantly reduce checkpoint saving time when checkpointing large model, so it's recommended
to enable xser when the model is large.
Note that if a checkpoint is saved with ``save_xser``, it needs to be loaded with ``load_xser``, vice versa.
- ``down_cast_bf16: (bool)``: This flag would downcast the state_dict to bf16 before saving.

Load Checkpoint
'''''''''''''''

::

def neuronx_distributed.parallel_layers.load(
load_dir, model=None, model_key='model', sharded=True)
load_dir, model_or_optimizer=None, model_key='model', load_xser=False, sharded=True)

This API will automatically load checkpoint depending on the tensor
parallel rank. For large models, one should pass the model object to the
Expand All @@ -298,9 +419,12 @@ Parameters:


- ``load_dir: (str)`` : Directory where the checkpoint is saved.
- ``model``: (torch.nn.Module): Model object
- ``model_key: (str)`` :The model key used when saving the model in the
- ``model_or_optimizer``: (torch.nn.Module or torch.optim.Optimizer): Model or Optimizer object.
- ``model``: (torch.nn.Module or torch.optim.Optimizer): Model or Optimizer object, equivilant to ``model_or_optimizer``
- ``model_key: (str)`` : The model key used when saving the model in the
state_dict.
- ``load_xser: (bool)`` : Load model with torch xla serialization.
Note that if a checkpoint is saved with ``save_xser``, it needs to be loaded with ``load_xser``, vice versa.
- ``sharded: (bool)`` : If the checkpoint is not sharded, pass False.
This is useful (especially during inference) when the model is
trained using a different strategy and you end up saving a single
Expand Down
1 change: 1 addition & 0 deletions libraries/neuronx-distributed/app_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ App Notes (``neuronx-distributed`` )
:hidden:

/libraries/neuronx-distributed/tensor_parallelism_overview
/libraries/neuronx-distributed/pipeline_parallelism_overview
/libraries/neuronx-distributed/activation_memory_reduction


Expand Down
1 change: 1 addition & 0 deletions libraries/neuronx-distributed/app_notes.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
* :ref:`tensor_parallelism_overview`
* :ref:`pipeline_parallelism_overview`
* :ref:`activation_memory_reduction`
1 change: 1 addition & 0 deletions libraries/neuronx-distributed/developer-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Developer Guide (``neuronx-distributed`` )
:hidden:

/libraries/neuronx-distributed/tp_developer_guide
/libraries/neuronx-distributed/pp_developer_guide
/libraries/neuronx-distributed/activation_memory_reduction_developer_guide


Expand Down
1 change: 1 addition & 0 deletions libraries/neuronx-distributed/developer-guide.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
* :ref:`tp_developer_guide`
* :ref:`pp_developer_guide`
* :ref:`activation_memory_reduction_developer_guide`
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 97 additions & 0 deletions libraries/neuronx-distributed/pipeline_parallelism_overview.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
.. _pipeline_parallelism_overview:

Pipeline Parallelism Overview
===========================

Pipeline parallelism is a technique used in deep learning model training to improve efficiency
and reduce the training time of large neural networks.
Currently NeuronxDistributed's pipeline parallelism is built specially for transformer based models,
where each Neuron core will be assigned with a subset of transformer layers.
Pipelining is a technique to achieve true parallelization in pipeline parallelism,
by having the Neuron cores compute simultaneously on different data samples,
and to overcome the performance loss due to sequential computation.
When you use pipeline parallelism, training job is executed in a pipelined
fashion over microbatches to maximize device usage.

Model partitioning
---------------------

In NeuronxDistributed, we use `Pytorch's FX <https://pytorch.org/docs/stable/fx.html>`__ to trace the model and do partition on the FX IR.
User simply needs to specify where to cut the pipeline stages, and our algorithm will cut the
pipeline stages and assign the corresponding modules to each Neuron core automatically.
Currently we require user to provide model partition decision but auto-partition will be supported in the future.
Here is an example of simple partition with 5 linear layers

.. code:: ipython3
# original NN module
class MyModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.linears = torch.nn.ModuleList([torch.nn.Linear(4, 4) for _ in range(5)])
def forward(self, x):
for lin in self.linears:
x = lin(x)
return x
m = MyModule()
gm = torch.fx.symbolic_trace(m)
print(gm)
"""
GraphModule(
(linears): Module(
(0): Linear(in_features=4, out_features=4, bias=True)
(1): Linear(in_features=4, out_features=4, bias=True)
(2): Linear(in_features=4, out_features=4, bias=True)
(3): Linear(in_features=4, out_features=4, bias=True)
(4): Linear(in_features=4, out_features=4, bias=True)
)
)
def forward(self, x):
linears_0 = getattr(self.linears, "0")(x); x = None
linears_1 = getattr(self.linears, "1")(linears_0); linears_0 = None
linears_2 = getattr(self.linears, "2")(linears_1); linears_1 = None
linears_3 = getattr(self.linears, "3")(linears_2); linears_2 = None
linears_4 = getattr(self.linears, "4")(linears_3); linears_3 = None
return linears_4
"""
If user decide to cut the pipeline stage at the 3nd linear call, after partition
there will be 2 submodules, where `submod_0` contains first 3 linear layers
and `submod_1` contains last 2 linear layers.

.. code:: ipython3
After Split module
GraphModule(
(submod_0): GraphModule(
(linears_0): Linear(in_features=4, out_features=4, bias=True)
(linears_1): Linear(in_features=4, out_features=4, bias=True)
(linears_2): Linear(in_features=4, out_features=4, bias=True)
)
(submod_1): GraphModule(
(linears_3): Linear(in_features=4, out_features=4, bias=True)
(linears_4): Linear(in_features=4, out_features=4, bias=True)
)
)
def forward(self, x):
submod_0 = self.submod_0(x); x = None
submod_1 = self.submod_1(submod_0); submod_0 = None
return submod_1
Pipeline Execution Schedule
----------------------------

Pipelining is based on splitting a mini-batch into microbatches, which are
fed into the training pipeline one-by-one and follow an execution schedule defined
by the library runtime. A microbatch is a smaller subset of a given training mini-batch.
The pipeline schedule determines which microbatch is executed by which device for every time slot.

For example, depending on the pipeline schedule and the model partition,
Neuron core i might perform (forward or backward) computation on microbatch b while Neuron core i+1 performs
computation on microbatch b+1, thereby keeping both Neuron cores active at the same time. An example taken from
Megatron paper is showed as below

.. image:: images/pp_schedule.png
:alt: Image: image.png
Loading

0 comments on commit 340aa91

Please sign in to comment.