Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into undecorate-typecheck
Browse files Browse the repository at this point in the history
  • Loading branch information
borisfom committed Jun 15, 2024
2 parents c7a5e84 + ec0eb59 commit 1a28fe1
Show file tree
Hide file tree
Showing 48 changed files with 4,120 additions and 497 deletions.
31 changes: 16 additions & 15 deletions .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ jobs:
with:
RUNNER: self-hosted-azure
SCRIPT: |
python examples/nlp/language_modeling/megatron_quantization.py \
model_file=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
python examples/nlp/language_modeling/megatron_gpt_quantization.py \
model.restore_from_path=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
quantization.algorithm=null \
model_save=/home/TestData/nlp/megatron_llama/ci_baseline
export.save_path=/home/TestData/nlp/megatron_llama/ci_baseline
AFTER_SCRIPT: |
rm -rf /home/TestData/nlp/megatron_llama/ci_baseline
Expand All @@ -226,16 +226,16 @@ jobs:
with:
RUNNER: self-hosted-azure
SCRIPT: |
python examples/nlp/language_modeling/megatron_quantization.py \
model_file=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
tensor_model_parallel_size=2 \
python examples/nlp/language_modeling/megatron_gpt_quantization.py \
model.restore_from_path=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
model.tensor_model_parallel_size=2 \
trainer.devices=2 \
quantization.calib_dataset=/home/TestData/nlp/test_quantization/test.json \
quantization.algorithm=fp8 \
quantization.num_calib_size=8 \
inference.batch_size=2 \
export.inference_tensor_parallel=2 \
model_save=/home/TestData/nlp/megatron_llama/ci_fp8.qnemo
export.save_path=/home/TestData/nlp/megatron_llama/ci_fp8.qnemo
AFTER_SCRIPT: |
rm -rf /home/TestData/nlp/megatron_llama/ci_fp8.qnemo
Expand All @@ -245,13 +245,13 @@ jobs:
with:
RUNNER: self-hosted-azure
SCRIPT: |
python examples/nlp/language_modeling/megatron_quantization.py \
model_file=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
python examples/nlp/language_modeling/megatron_gpt_quantization.py \
model.restore_from_path=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
quantization.calib_dataset=/home/TestData/nlp/test_quantization/test.json \
quantization.algorithm=int8_sq \
quantization.num_calib_size=8 \
inference.batch_size=2 \
model_save=/home/TestData/nlp/megatron_llama/ci_int8_sq.qnemo
export.save_path=/home/TestData/nlp/megatron_llama/ci_int8_sq.qnemo
AFTER_SCRIPT: |
rm -rf /home/TestData/nlp/megatron_llama/ci_int8_sq.qnemo
Expand All @@ -274,15 +274,15 @@ jobs:
# - name: Checkout repository
# uses: actions/checkout@v4
# - run: |
# python examples/nlp/language_modeling/megatron_quantization.py \
# model_file=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
# tensor_model_parallel_size=1 \
# python examples/nlp/language_modeling/megatron_gpt_quantization.py \
# model.restore_from_path=/home/TestData/nlp/megatron_llama/llama_ci.nemo \
# model.tensor_model_parallel_size=1 \
# trainer.devices=1 \
# quantization.calib_dataset=/home/TestData/nlp/test_quantization/test.json \
# quantization.algorithm=int4_awq \
# quantization.num_calib_size=8 \
# inference.batch_size=2 \
# model_save=/home/TestData/nlp/megatron_llama/ci_int4_awq.qnemo
# export.save_path=/home/TestData/nlp/megatron_llama/ci_int4_awq.qnemo
#
# rm -rf /home/TestData/nlp/megatron_llama/ci_int4_awq.qnemo
#- uses: "NVIDIA/NeMo/.github/actions/cancel-workflow@main"
Expand Down Expand Up @@ -4310,7 +4310,8 @@ jobs:
}
'
JOBS_URL="https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/jobs"
# We are close to reaching 100 jobs: Once we break that barrier, we have to iterate pages
JOBS_URL="https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/jobs?per_page=100"
SUMMARY="[]"
while IFS= read -r JOB; do
JOB_NAME="$(echo $JOB | jq '.key' | tr -d '"') / main"
Expand Down
1 change: 1 addition & 0 deletions docs/source/common/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ The common collection contains things that could be used across all collections.
metrics
tokenizers
data
s3_checkpointing
96 changes: 96 additions & 0 deletions docs/source/common/s3_checkpointing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
****************
S3 Checkpointing
****************

S3CheckpointIO
==============

This checkpoint_io is used for saving and loading files to and from S3.
Initializing this checkpoint_io requires the dirpath be an S3 dirpath.

**Example Usage:**

.. code-block:: bash
async_checkpointing = self.cfg.s3_checkpointing.get('enable_async_checkpointing', False)
chunk_size_MB = self.cfg.s3_checkpointing.get('chunk_size_MB')
max_read_concurrency = self.cfg.s3_checkpointing.get('max_read_concurrency')
max_write_concurrency = self.cfg.s3_checkpointing.get('max_write_concurrency')
dirpath = self.cfg.exp_manager.checkpoint_callback_params.get('dirpath')
s3_checkpoint_io = S3CheckpointIO(dirpath=dirpath, chunk_size_MB=chunk_size_MB, max_read_concurrency=max_read_concurrency, max_write_concurrency=max_write_concurrency, async_checkpointing=async_checkpointing)
strategy = NLPDDPStrategy(
no_ddp_communication_hook=True,
checkpoint_io=s3_checkpoint_io,
gradient_as_bucket_view=self.cfg.model.gradient_as_bucket_view,
find_unused_parameters=False,
nccl_communicator_config_path=self.cfg.model.get('nccl_communicator_config_path', None),
sharp=self.cfg.model.get('sharp', False),
)
**Config changes:**

.. code-block:: bash
checkpoint_callback_params:
dirpath: s3://mstar-eks-dev-us-east-2/alxzhang/nemo123/1n/checkpoints
...
s3_checkpointing:
# write_concurrency * tp * pp * 1.15 (buffer) should be within 3500 S3 TPS limit per partition
max_write_concurrency: 10
# read_concurrency * tp * pp * 1.15 (buffer) should be within 5500 S3 TPS limit per partition
max_read_concurrency: 15
chunk_size_MB: 64
# enables asynchronous checkpoint writing to S3
enable_async_checkpointing: False
**Asynchronous**
By default, the S3CheckpointIO class acts synchronously.
The async feature currently does not check if the previous async save is completed, so it is possible
that an old checkpoint is removed even when the current save fails.
To prevent this, this feature is meant to be used in conjunction with saving top k checkpoints.


S3Utils and Dependencies
========================

This utility class is used by the S3CheckpoinIO and the exp_manager to do S3-related operations.
It has dependencies on

1. boto3[crt]

2. s3fs==0.4.2

3. tenacity

If any of these are missing, this class can't be used.



s3_dirpath_utils
================

Used to operate on strings by checking if they are S3 dirpaths, or convert a bucket and key into an s3 dirpath.
This has no reliance on the S3Utils utility class, and can be used without any new dependencies.


S3 Demands and ExpManager Details When Running at Scale
=======================================================

Typically, in the ExpManager, every rank looks for the checkpoint file to load from. At large scale, there can be thousands of ranks querying S3 for dirpaths which can cause slowdown or throttling errors.

To avoid overloading S3 when resuming from a checkpoint only rank 0 needs to identify the checkpoint path and find the correct resumption file. Rank 0 will broadcast the checkpoint path to the other ranks.

.. code-block:: bash
trainer._checkpoint_connector = NeMoCheckpointConnector(trainer)
The NeMoModelCheckpoint setup() method will automatically broadcast the checkpoint path.

The NeMoCheckpointConnector is defined in the exp_manager.py file, and uses the broadcasted checkpoint path founds by rank 0 on all ranks when resuming training from an existing checkpoint.

The setting of the trainer._checkpoint_connector needs to happen before the ExpManager call as the ExpManager updates the trainer's checkpoint connector.
10 changes: 5 additions & 5 deletions docs/source/nlp/quantization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ The script must be launched correctly with the number of processes equal to tens

.. code-block:: bash
torchrun --nproc-per-node 8 examples/nlp/language_modeling/megatron_quantization.py \
model_file=llama2-70b-base-bf16.nemo \
tensor_model_parallel_size=8 \
pipeline_model_parallel_size=1 \
torchrun --nproc-per-node 8 examples/nlp/language_modeling/megatron_gpt_quantization.py \
model.restore_from_path=llama2-70b-base-bf16.nemo \
model.tensor_model_parallel_size=8 \
model.pipeline_model_parallel_size=1 \
trainer.num_nodes=1 \
trainer.devices=8 \
trainer.precision=bf16 \
quantization.algorithm=fp8 \
export.decoder_type=llama \
export.inference_tensor_parallel=2 \
model_save=llama2-70b-base-fp8-qnemo
export.save_path=llama2-70b-base-fp8-qnemo
Expand Down
11 changes: 11 additions & 0 deletions examples/nlp/language_modeling/conf/megatron_gpt_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ trainer:
benchmark: False
enable_model_summary: False # default PTL callback for this does not support model parallelism, instead we log manually

# Used for S3 Checkpointing
s3_checkpointing:
# write_concurrency * tp * pp * 1.15 (buffer) should be within 3500 S3 TPS limit per partition
max_write_concurrency: 10
# read_concurrency * tp * pp * 1.15 (buffer) should be within 5500 S3 TPS limit per partition
max_read_concurrency: 15
chunk_size_MB: 64
# enables asynchronous checkpoint writing to S3 dirpath. the feature is experimental and currently does not check if the past save succeeded. Therefore, use in conjunction with save_top_k.
enable_async_checkpointing: False

exp_manager:
explicit_log_dir: null
exp_dir: null
Expand All @@ -45,6 +55,7 @@ exp_manager:
resume_from_checkpoint: ${model.resume_from_checkpoint}
create_checkpoint_callback: True
checkpoint_callback_params:
dirpath: null # to use S3 checkpointing, set the dirpath in format s3://bucket/key
monitor: val_loss
save_top_k: 10
mode: min
Expand Down
Loading

0 comments on commit 1a28fe1

Please sign in to comment.