From 1045c9250609bfb72f791f4d2a5d04fa882a7842 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 16:53:58 +0530 Subject: [PATCH 01/26] add fsdp tests --- src/transformers/modeling_utils.py | 2 +- src/transformers/testing_utils.py | 10 ++ src/transformers/trainer.py | 15 +- src/transformers/utils/__init__.py | 1 + src/transformers/utils/import_utils.py | 4 + tests/fsdp/test_fsdp.py | 188 +++++++++++++++++++++++++ 6 files changed, 212 insertions(+), 8 deletions(-) create mode 100644 tests/fsdp/test_fsdp.py diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index 9279950d3ac..a856f40e434 100644 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -128,7 +128,7 @@ def is_fsdp_enabled(): def is_fsdp_enabled_and_dist_rank_0(): - return is_fsdp_enabled() and torch.distributed.get_rank() == 0 + return is_fsdp_enabled() and int(os.environ.get("LOCAL_RANK", -1)) == 0 if is_sagemaker_mp_enabled(): diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index a7e36322a6b..eb5e4bd1578 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -105,6 +105,7 @@ is_torchdynamo_available, is_torchvision_available, is_vision_available, + is_fsdp_available, strtobool, ) @@ -315,6 +316,15 @@ def require_accelerate(test_case): return unittest.skipUnless(is_accelerate_available(), "test requires accelerate")(test_case) +def require_fsdp(test_case, min_version: str = "1.12.0"): + """ + Decorator marking a test that requires accelerate. These tests are skipped when accelerate isn't installed. + """ + return unittest.skipUnless(is_fsdp_available(min_version), f"test requires torch version >= {min_version}")( + test_case + ) + + def require_safetensors(test_case): """ Decorator marking a test that requires safetensors. These tests are skipped when safetensors isn't installed. diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index abf4c9a34ba..8f7ef1344dc 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1690,9 +1690,6 @@ def _inner_training_loop( model = self._wrap_model(self.model_wrapped) - if (is_sagemaker_mp_enabled() or self.is_fsdp_enabled) and resume_from_checkpoint is not None: - self._load_from_checkpoint(resume_from_checkpoint, model) - # as the model is wrapped, don't use `accelerator.prepare` # this is for unhandled cases such as # Fairscale Sharded DDP, FSDP-XLA, SageMaker MP/DP, DataParallel, IPEX @@ -1728,16 +1725,20 @@ def _inner_training_loop( if self.is_deepspeed_enabled: self.deepspeed = self.model_wrapped - # deepspeed ckpt loading - if resume_from_checkpoint is not None and self.is_deepspeed_enabled: - deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + # ckpt loading + if resume_from_checkpoint is not None: + if self.is_deepspeed_enabled: + deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + elif (is_sagemaker_mp_enabled() or self.is_fsdp_enabled) and resume_from_checkpoint is not None: + self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist self._load_optimizer_and_scheduler(resume_from_checkpoint) # important: at this point: # self.model is the Transformers Model - # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), + # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc. # Train! logger.info("***** Running training *****") diff --git a/src/transformers/utils/__init__.py b/src/transformers/utils/__init__.py index 68c39c732e3..7e648c9a04b 100644 --- a/src/transformers/utils/__init__.py +++ b/src/transformers/utils/__init__.py @@ -103,6 +103,7 @@ direct_transformers_import, get_torch_version, is_accelerate_available, + is_fsdp_available, is_apex_available, is_auto_gptq_available, is_bitsandbytes_available, diff --git a/src/transformers/utils/import_utils.py b/src/transformers/utils/import_utils.py index 6829ca9ad67..01db9505c4f 100644 --- a/src/transformers/utils/import_utils.py +++ b/src/transformers/utils/import_utils.py @@ -605,6 +605,10 @@ def is_accelerate_available(min_version: str = None): return _accelerate_available +def is_fsdp_available(min_version: str = "1.12.0"): + return version.parse(_torch_version) >= version.parse(min_version) + + def is_optimum_available(): return _optimum_available diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py new file mode 100644 index 00000000000..bc3b8028643 --- /dev/null +++ b/tests/fsdp/test_fsdp.py @@ -0,0 +1,188 @@ +# Copyright 2023 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import io +import itertools +import json +import os +import unittest +from copy import deepcopy +from functools import partial + +import datasets +from parameterized import parameterized + +import tests.trainer.test_trainer +from tests.trainer.test_trainer import TrainerIntegrationCommon # noqa +from transformers import AutoModel, TrainingArguments, is_torch_available, logging + +from transformers.testing_utils import ( + CaptureLogger, + CaptureStd, + CaptureStderr, + LoggingLevel, + TestCasePlus, + execute_subprocess_async, + get_gpu_count, + mockenv_context, + require_deepspeed, + require_optuna, + require_torch_gpu, + require_accelerate, + require_fsdp, + require_torch_multi_gpu, + slow, +) +from transformers.trainer_utils import get_last_checkpoint, set_seed, FSDPOption +from transformers.utils import WEIGHTS_NAME, is_torch_bf16_gpu_available, is_accelerate_available + + +def get_master_port(real_launcher=False): + """ + When using a single gpu launcher emulation (i.e. not deepspeed or python -m torch.distributed) + the issue is that once the port is tied it can't be used anywhere else outside of this process, + since torch.dist doesn't free the port until the process exits. Therefore for the sake of being + able to run both emulated launcher and normal launcher tests we need 2 distinct ports. + + This function will give the right port in the right context. For real launcher it'll give the + base port, for emulated launcher it'll give the base port + 1. In both cases a string is + returned. + + Args: + `real_launcher`: whether a real launcher is going to be used, or the emulated one + + """ + + master_port_base = os.environ.get("DS_TEST_PORT", DEFAULT_MASTER_PORT) + if not real_launcher: + master_port_base = str(int(master_port_base) + 1) + return master_port_base + + +if is_torch_available(): + from tests.trainer.test_trainer import ( # noqa + RegressionModelConfig, + RegressionPreTrainedModel, + ) + + # hack to restore original logging level pre #21700 + get_regression_trainer = partial(tests.trainer.test_trainer.get_regression_trainer, log_level="info") + +if is_accelerate_available(): + from accelerate.utils.constants import ( + FSDP_AUTO_WRAP_POLICY, + FSDP_BACKWARD_PREFETCH, + FSDP_SHARDING_STRATEGY, + FSDP_STATE_DICT_TYPE, + FSDP_PYTORCH_VERSION, + ) + + require_fsdp_version = partial(require_fsdp, min_version=FSDP_PYTORCH_VERSION) + +FP16 = "fp16" +BF16 = "bf16" +if is_torch_bf16_gpu_available(): + dtypes = [FP16, BF16] +else: + dtypes = [FP16] + +FULL_SHARD = "full_shard" +SHARD_GRAD_OP = "shard_grad_op" +sharding_strategies = [FULL_SHARD, SHARD_GRAD_OP] + + +def parameterized_custom_name_func(func, param_num, param): + # customize the test name generator function as we want both params to appear in the sub-test + # name, as by default it shows only the first param + param_based_name = parameterized.to_safe_name("_".join(str(x) for x in param.args)) + return f"{func.__name__}_{param_based_name}" + + +params = list(itertools.product(sharding_strategies, dtypes)) + +set_seed(42) + + +@require_accelerate +@require_torch_gpu +@require_fsdp_version +class TrainerIntegrationFSDP(TestCasePlus, TrainerIntegrationCommon): + def setUp(self): + super().setUp() + master_port = get_master_port(real_launcher=False) + self.dist_env_1_gpu = { + "MASTER_ADDR": "localhost", + "MASTER_PORT": master_port, + "RANK": "0", + "LOCAL_RANK": "0", + "WORLD_SIZE": "1", + } + + self.fsdp_config = { + "backward_prefetch": "backward_pre", + "forward_prefetch": "False", + "limit_all_gathers": "False", + "use_orig_params": "True", + "sync_module_states": "True", + "activation_checkpointing": "False", + "min_num_params": 1, + } + + def tearDown(self): + super().tearDown() + + @parameterized.expand(params, name_func=parameterized_custom_name_func) + def test_fsdp_config(self, sharding_strategy, dtype): + output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) + kwargs = { + "output_dir": output_dir, + "train_len": 128, + "save_steps": 5, + "learning_rate": 0.1, + "fsdp": f"{sharding_strategy} auto_wrap", + "fsdp_config": self.fsdp_config, + } + kwargs[dtype] = True + with mockenv_context(**self.dist_env_1_gpu): + trainer = get_regression_trainer(**kwargs) + self.assertEqual(trainer.args.fsdp[0], sharding_strategy) + self.assertEqual(trainer.args.fsdp[1], FSDPOption.AUTO_WRAP) + for k, v in trainer.args.fsdp_config.items(): + self.assertEqual(v, self.fsdp_config[k]) + self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") + trainer.train() + + def test_fsdp_offload(self): + output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) + kwargs = { + "output_dir": output_dir, + "train_len": 128, + "save_steps": 5, + "learning_rate": 0.1, + "fsdp": f"full_shard offload auto_wrap", + "fsdp_config": self.fsdp_config, + } + with mockenv_context(**self.dist_env_1_gpu): + trainer = get_regression_trainer(**kwargs) + self.assertEqual(trainer.args.fsdp[0], FSDPOption.FULL_SHARD) + self.assertEqual(trainer.args.fsdp[1], FSDPOption.OFFLOAD) + self.assertEqual(trainer.args.fsdp[2], FSDPOption.AUTO_WRAP) + for k, v in trainer.args.fsdp_config.items(): + self.assertEqual(v, self.fsdp_config[k]) + self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") + trainer.train() + + def test_can_resume_training_normal(self): + pass From f172658f7b2c8547025d6677b8fb29aa1fb0bc73 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 16:59:34 +0530 Subject: [PATCH 02/26] Update test_fsdp.py --- tests/fsdp/test_fsdp.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index bc3b8028643..f5e3a85d099 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -183,6 +183,7 @@ def test_fsdp_offload(self): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") trainer.train() + print(trainer.model_wrapped) def test_can_resume_training_normal(self): pass From 498f4a15fc902ff358d85bd9828be2e6b564c092 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:00:51 +0530 Subject: [PATCH 03/26] Update test_fsdp.py --- tests/fsdp/test_fsdp.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index f5e3a85d099..380ef14fd54 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -49,6 +49,11 @@ from transformers.utils import WEIGHTS_NAME, is_torch_bf16_gpu_available, is_accelerate_available +# default torch.distributed port +DEFAULT_MASTER_PORT = "10999" +set_seed(42) + + def get_master_port(real_launcher=False): """ When using a single gpu launcher emulation (i.e. not deepspeed or python -m torch.distributed) @@ -112,8 +117,6 @@ def parameterized_custom_name_func(func, param_num, param): params = list(itertools.product(sharding_strategies, dtypes)) -set_seed(42) - @require_accelerate @require_torch_gpu From 141dce52b4475e9f211069cba838c109d39031cc Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:38:29 +0530 Subject: [PATCH 04/26] fixes --- src/transformers/trainer.py | 2 ++ tests/fsdp/test_fsdp.py | 37 +++++++++++++++---------------------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 8f7ef1344dc..478b16d51c0 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2128,6 +2128,8 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): # release memory del state_dict elif self.is_fsdp_enabled: + print(model) + print(self.accelerator.state.fsdp_plugin) load_fsdp_model(self.accelerator.state.fsdp_plugin, self.accelerator, model, resume_from_checkpoint) else: # We load the model state dict on the CPU to avoid an OOM error. diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 380ef14fd54..9a639bf6ec1 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -96,6 +96,19 @@ def get_master_port(real_launcher=False): require_fsdp_version = partial(require_fsdp, min_version=FSDP_PYTORCH_VERSION) + +def get_launcher(distributed=False, use_accelerate=False): + # 1. explicitly set --num_nodes=1 just in case these tests end up run on a multi-node setup + # - it won't be able to handle that + # 2. for now testing with just 2 gpus max (since some quality tests may give different + # results with mode gpus because we use very little data) + num_gpus = min(2, get_gpu_count()) if distributed else 1 + master_port = get_master_port(real_launcher=True) + if use_accelerate: + return f"accelerate launch --num_processes={num_gpus} --main_process_port={master_port}".split() + return f"torchrun --nnodes 1 --nproc-per-node {num_gpus} --master-port {master_port}".split() + + FP16 = "fp16" BF16 = "bf16" if is_torch_bf16_gpu_available(): @@ -154,39 +167,19 @@ def test_fsdp_config(self, sharding_strategy, dtype): "train_len": 128, "save_steps": 5, "learning_rate": 0.1, - "fsdp": f"{sharding_strategy} auto_wrap", + "fsdp": f"{sharding_strategy} offload auto_wrap", "fsdp_config": self.fsdp_config, } kwargs[dtype] = True with mockenv_context(**self.dist_env_1_gpu): trainer = get_regression_trainer(**kwargs) self.assertEqual(trainer.args.fsdp[0], sharding_strategy) - self.assertEqual(trainer.args.fsdp[1], FSDPOption.AUTO_WRAP) - for k, v in trainer.args.fsdp_config.items(): - self.assertEqual(v, self.fsdp_config[k]) - self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") - trainer.train() - - def test_fsdp_offload(self): - output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) - kwargs = { - "output_dir": output_dir, - "train_len": 128, - "save_steps": 5, - "learning_rate": 0.1, - "fsdp": f"full_shard offload auto_wrap", - "fsdp_config": self.fsdp_config, - } - with mockenv_context(**self.dist_env_1_gpu): - trainer = get_regression_trainer(**kwargs) - self.assertEqual(trainer.args.fsdp[0], FSDPOption.FULL_SHARD) self.assertEqual(trainer.args.fsdp[1], FSDPOption.OFFLOAD) self.assertEqual(trainer.args.fsdp[2], FSDPOption.AUTO_WRAP) for k, v in trainer.args.fsdp_config.items(): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") trainer.train() - print(trainer.model_wrapped) - def test_can_resume_training_normal(self): + def test_basic_run(self): pass From 98c608dffc47df1099b5f231bbbc453936bf68d3 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:40:43 +0530 Subject: [PATCH 05/26] checks --- src/transformers/trainer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 478b16d51c0..b3fc7341ecc 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1729,7 +1729,8 @@ def _inner_training_loop( if resume_from_checkpoint is not None: if self.is_deepspeed_enabled: deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) - elif (is_sagemaker_mp_enabled() or self.is_fsdp_enabled) and resume_from_checkpoint is not None: + elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: + print(f"{self.model_wrapped=}") self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist From 986c8b5bc045a985b3002c8578530ea58989d01e Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:43:49 +0530 Subject: [PATCH 06/26] Update trainer.py --- src/transformers/trainer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index b3fc7341ecc..f924e0e326f 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1715,6 +1715,7 @@ def _inner_training_loop( ) if self.is_fsdp_enabled: + print(f"{model=}") self.model = model # for the rest of this function `model` is the outside model, whether it was wrapped or not From 4239fa116953b1bf13578df4a223a1cdb9ef3e26 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:47:38 +0530 Subject: [PATCH 07/26] fix --- src/transformers/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index f924e0e326f..290a10d5a87 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1716,7 +1716,7 @@ def _inner_training_loop( if self.is_fsdp_enabled: print(f"{model=}") - self.model = model + self.model = self.model_wrapped = model # for the rest of this function `model` is the outside model, whether it was wrapped or not if model is not self.model: From b61509e3030a915d27c941641b8a44ff390c6b48 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 09:30:56 +0530 Subject: [PATCH 08/26] fixes for saving/resuming checkpoints --- src/transformers/trainer.py | 40 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 290a10d5a87..5fb18973026 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2082,16 +2082,23 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): safe_weights_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_NAME) safe_weights_index_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_INDEX_NAME) - if not any( - os.path.isfile(f) - for f in [ - weights_file, - safe_weights_file, - weights_index_file, - safe_weights_index_file, - adapter_weights_file, - adapter_safe_weights_file, - ] + if not ( + any( + os.path.isfile(f) + for f in [ + weights_file, + safe_weights_file, + weights_index_file, + safe_weights_index_file, + adapter_weights_file, + adapter_safe_weights_file, + ] + ) + or any( + WEIGHTS_NAME.split(".")[0] in folder_name + for folder_name in os.listdir(resume_from_checkpoint) + if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) + ) ): raise ValueError(f"Can't find a valid checkpoint at {resume_from_checkpoint}") @@ -2179,6 +2186,10 @@ def _load_best_model(self): model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model if self.is_deepspeed_enabled: deepspeed_load_checkpoint(self.model_wrapped, self.state.best_model_checkpoint) + elif self.is_fsdp_enabled: + load_result = load_fsdp_model( + self.accelerator.state.fsdp_plugin, self.accelerator, model, self.state.best_model_checkpoint + ) elif ( os.path.exists(best_model_path) or os.path.exists(best_safe_model_path) @@ -2206,10 +2217,6 @@ def _load_best_model(self): state_dict["_smp_is_partial"] = False load_result = model.load_state_dict(state_dict, strict=True) - elif self.is_fsdp_enabled: - load_result = load_fsdp_model( - self.accelerator.state.fsdp_plugin, self.accelerator, model, self.state.best_model_checkpoint - ) else: if is_peft_available() and isinstance(model, PeftModel): # If train a model using PEFT & LoRA, assume that adapter have been saved properly. @@ -2498,6 +2505,11 @@ def _load_optimizer_and_scheduler(self, checkpoint): else ( os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME)) or os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME_BIN)) + or any( + OPTIMIZER_NAME_BIN.split(".")[0] in folder_name + for folder_name in os.listdir(checkpoint) + if os.path.isdir(os.path.join(checkpoint, folder_name)) + ) ) ) if checkpoint_file_exists and os.path.isfile(os.path.join(checkpoint, SCHEDULER_NAME)): From 085bcb37a848bf99a28b72f358c8fc9acdf4259d Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 09:52:13 +0530 Subject: [PATCH 09/26] fixes --- src/transformers/trainer.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 5fb18973026..80cf11d0c3a 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2081,6 +2081,11 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): weights_index_file = os.path.join(resume_from_checkpoint, WEIGHTS_INDEX_NAME) safe_weights_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_NAME) safe_weights_index_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_INDEX_NAME) + is_fsdp_ckpt = any( + WEIGHTS_NAME.split(".")[0] in folder_name + for folder_name in os.listdir(resume_from_checkpoint) + if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) + ) if not ( any( @@ -2094,11 +2099,7 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): adapter_safe_weights_file, ] ) - or any( - WEIGHTS_NAME.split(".")[0] in folder_name - for folder_name in os.listdir(resume_from_checkpoint) - if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) - ) + or is_fsdp_ckpt ): raise ValueError(f"Can't find a valid checkpoint at {resume_from_checkpoint}") @@ -2114,7 +2115,7 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): "yield to errors or unwanted behaviors." ) - if os.path.isfile(weights_file) or os.path.isfile(safe_weights_file): + if os.path.isfile(weights_file) or os.path.isfile(safe_weights_file) or is_fsdp_ckpt: # If the model is on the GPU, it still works! if is_sagemaker_mp_enabled(): if os.path.isfile(os.path.join(resume_from_checkpoint, "user_content.pt")): From 85357d35a14281a841037fea690a2b3173c67dac Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:17:40 +0530 Subject: [PATCH 10/26] add tests and delete debug statements --- src/transformers/trainer.py | 4 - tests/fsdp/test_fsdp.py | 181 +++++++++++++++++++++++++++++++++++- 2 files changed, 177 insertions(+), 8 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 80cf11d0c3a..2b1bb78298b 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1715,7 +1715,6 @@ def _inner_training_loop( ) if self.is_fsdp_enabled: - print(f"{model=}") self.model = self.model_wrapped = model # for the rest of this function `model` is the outside model, whether it was wrapped or not @@ -1731,7 +1730,6 @@ def _inner_training_loop( if self.is_deepspeed_enabled: deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: - print(f"{self.model_wrapped=}") self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist @@ -2138,8 +2136,6 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): # release memory del state_dict elif self.is_fsdp_enabled: - print(model) - print(self.accelerator.state.fsdp_plugin) load_fsdp_model(self.accelerator.state.fsdp_plugin, self.accelerator, model, resume_from_checkpoint) else: # We load the model state dict on the CPU to avoid an OOM error. diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 9a639bf6ec1..7407df17186 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -27,6 +27,7 @@ import tests.trainer.test_trainer from tests.trainer.test_trainer import TrainerIntegrationCommon # noqa from transformers import AutoModel, TrainingArguments, is_torch_available, logging +from transformers.trainer_callback import TrainerState from transformers.testing_utils import ( CaptureLogger, @@ -105,7 +106,13 @@ def get_launcher(distributed=False, use_accelerate=False): num_gpus = min(2, get_gpu_count()) if distributed else 1 master_port = get_master_port(real_launcher=True) if use_accelerate: - return f"accelerate launch --num_processes={num_gpus} --main_process_port={master_port}".split() + return f"""accelerate launch + --num_processes {num_gpus} + --main_process_port {master_port} + --use_fsdp + --fsdp_auto_wrap_policy TRANSFORMER_BASED_WRAP + --fsdp_state_dict_type SHARDED_STATE_DICT + --fsdp_transformer_layer_cls_to_wrap BertLayer""".split() return f"torchrun --nnodes 1 --nproc-per-node {num_gpus} --master-port {master_port}".split() @@ -129,6 +136,7 @@ def parameterized_custom_name_func(func, param_num, param): params = list(itertools.product(sharding_strategies, dtypes)) +params_with_state_dict_type = list(itertools.product(sharding_strategies, dtypes, FSDP_STATE_DICT_TYPE)) @require_accelerate @@ -161,7 +169,7 @@ def tearDown(self): @parameterized.expand(params, name_func=parameterized_custom_name_func) def test_fsdp_config(self, sharding_strategy, dtype): - output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) + output_dir = self.get_auto_remove_tmp_dir() kwargs = { "output_dir": output_dir, "train_len": 128, @@ -179,7 +187,172 @@ def test_fsdp_config(self, sharding_strategy, dtype): for k, v in trainer.args.fsdp_config.items(): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") - trainer.train() + @require_torch_multi_gpu + @slow def test_basic_run(self): - pass + launcher = get_launcher(distributed=True, use_accelerate=False) + output_dir = self.get_auto_remove_tmp_dir() + args = f""" + --model_name_or_path bert-base-cased + --task_name mrpc + --output_dir {output_dir} + --overwrite_output_dir + --do_train + --max_seq_length 128 + --per_device_train_batch_size 16 + --learning_rate 5e-5 + --num_train_epochs 3 + --lr_scheduler_type cosine + --logging_steps 1 + --save_strategy epoch + --do_eval + --evaluation_strategy epoch + --load_best_model_at_end + --skip_memory_metrics False + """.split() + + fsdp_args = """ + --fsdp shard_grad_op auto_wrap + --fsdp_transformer_layer_cls_to_wrap BertLayer + """.split() + + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + cmd = launcher + script + args + fsdp_args + + # keep for quick debug + # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die + execute_subprocess_async(cmd, env=self.get_env()) + + @parameterized.expand(dtypes) + @require_torch_multi_gpu + @slow + def test_basic_run_with_cpu_offload(self, dtype): + launcher = get_launcher(distributed=True, use_accelerate=False) + output_dir = self.get_auto_remove_tmp_dir() + args = f""" + --model_name_or_path bert-base-cased + --task_name mrpc + --output_dir {output_dir} + --overwrite_output_dir + --do_train + --max_seq_length 128 + --per_device_train_batch_size 16 + --learning_rate 5e-5 + --num_train_epochs 3 + --lr_scheduler_type cosine + --logging_steps 1 + --save_strategy epoch + --do_eval + --evaluation_strategy epoch + --load_best_model_at_end + --skip_memory_metrics False + --{dtype} + """.split() + + fsdp_args = """ + --fsdp shard_grad_op auto_wrap offload + --fsdp_transformer_layer_cls_to_wrap BertLayer + """.split() + + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + cmd = launcher + script + args + fsdp_args + + # keep for quick debug + # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die + execute_subprocess_async(cmd, env=self.get_env()) + + @parameterized.expand(params_with_state_dict_type, name_func=parameterized_custom_name_func) + @require_torch_multi_gpu + @slow + def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_dict_type): + output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) + + if state_dict_type == "LOCAL_STATE_DICT": + return + + use_accelerate = state_dict_type == "SHARDED_STATE_DICT" + launcher = get_launcher(True, use_accelerate=use_accelerate) + + args = f""" + --model_name_or_path bert-base-cased + --task_name mrpc + --output_dir {output_dir} + --overwrite_output_dir + --do_train + --max_seq_length 128 + --per_device_train_batch_size 16 + --learning_rate 5e-5 + --num_train_epochs 2 + --lr_scheduler_type cosine + --logging_steps 1 + --save_strategy epoch + --do_eval + --evaluation_strategy epoch + --{dtype} + """.split() + + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + + if not use_accelerate: + fsdp_args = f""" + --fsdp {sharding_strategy} auto_wrap + --fsdp_transformer_layer_cls_to_wrap BertLayer + """.split() + + cmd = launcher + script + args + fsdp_args + else: + fsdp_config = f""" + --fsdp_sharding_strategy={FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} + """.split() + cmd = launcher + fsdp_config + script + args + + # keep for quick debug + # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die + execute_subprocess_async(cmd, env=self.get_env()) + + logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history + + # resume from ckpt + checkpoint = os.path.join(output_dir, "checkpoint-115") + resume_args = f""" + --model_name_or_path bert-base-cased + --task_name mrpc + --output_dir {output_dir} + --overwrite_output_dir + --do_train + --max_seq_length 128 + --per_device_train_batch_size 16 + --learning_rate 5e-5 + --num_train_epochs 2 + --lr_scheduler_type cosine + --logging_steps 1 + --save_strategy epoch + --do_eval + --evaluation_strategy epoch + --{dtype} + --resume_from_checkpoint {checkpoint} + """.split() + + if not use_accelerate: + fsdp_args = f""" + --fsdp {sharding_strategy} auto_wrap + --fsdp_transformer_layer_cls_to_wrap BertLayer + """.split() + + cmd = launcher + script + resume_args + fsdp_args + else: + fsdp_config = f""" + --fsdp_sharding_strategy={FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} + """.split() + cmd = launcher + fsdp_config + script + resume_args + + # keep for quick debug + # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die + execute_subprocess_async(cmd, env=self.get_env()) + + logs_resume = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history + + for log, log1 in zip(logs, logs_resume): + if "learning_rate" in log: + self.assertEqual(log["learning_rate"], log1["learning_rate"]) From 816ccb2b18759f69afebd1138622edcaf51c34a1 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:28:43 +0530 Subject: [PATCH 11/26] fixing tests --- tests/fsdp/test_fsdp.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 7407df17186..511859eaf3d 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -213,7 +213,7 @@ def test_basic_run(self): """.split() fsdp_args = """ - --fsdp shard_grad_op auto_wrap + --fsdp "shard_grad_op auto_wrap" --fsdp_transformer_layer_cls_to_wrap BertLayer """.split() @@ -251,7 +251,7 @@ def test_basic_run_with_cpu_offload(self, dtype): """.split() fsdp_args = """ - --fsdp shard_grad_op auto_wrap offload + --fsdp "shard_grad_op auto_wrap offload" --fsdp_transformer_layer_cls_to_wrap BertLayer """.split() @@ -296,7 +296,7 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ if not use_accelerate: fsdp_args = f""" - --fsdp {sharding_strategy} auto_wrap + --fsdp "{sharding_strategy} auto_wrap" --fsdp_transformer_layer_cls_to_wrap BertLayer """.split() @@ -336,7 +336,7 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ if not use_accelerate: fsdp_args = f""" - --fsdp {sharding_strategy} auto_wrap + --fsdp "{sharding_strategy} auto_wrap" --fsdp_transformer_layer_cls_to_wrap BertLayer """.split() @@ -355,4 +355,4 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ for log, log1 in zip(logs, logs_resume): if "learning_rate" in log: - self.assertEqual(log["learning_rate"], log1["learning_rate"]) + self.assertAlmostEqual(log["learning_rate"], log1["learning_rate"], delta=1e-5) From c3e0a1a68cc28058d3fd10eac12bff35283773c3 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:31:11 +0530 Subject: [PATCH 12/26] Update test_fsdp.py --- tests/fsdp/test_fsdp.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 511859eaf3d..cdaeb4a7446 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -257,6 +257,7 @@ def test_basic_run_with_cpu_offload(self, dtype): script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args + cmd = cmd.replace('"', "") # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die From 8d57ed8291a8b6dee393c4b413a56b2cdcea282f Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:34:28 +0530 Subject: [PATCH 13/26] fix tests --- tests/fsdp/test_fsdp.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index cdaeb4a7446..4dd37c98f58 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -212,10 +212,7 @@ def test_basic_run(self): --skip_memory_metrics False """.split() - fsdp_args = """ - --fsdp "shard_grad_op auto_wrap" - --fsdp_transformer_layer_cls_to_wrap BertLayer - """.split() + fsdp_args = ["--fsdp", "shard_grad_op auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args @@ -250,14 +247,10 @@ def test_basic_run_with_cpu_offload(self, dtype): --{dtype} """.split() - fsdp_args = """ - --fsdp "shard_grad_op auto_wrap offload" - --fsdp_transformer_layer_cls_to_wrap BertLayer - """.split() + fsdp_args = ["--fsdp", "shard_grad_op auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args - cmd = cmd.replace('"', "") # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die From c4f8b685d31515713701116b5b80a0816148dd55 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:39:00 +0530 Subject: [PATCH 14/26] fix tests --- tests/fsdp/test_fsdp.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 4dd37c98f58..f9058edf7fd 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -210,6 +210,7 @@ def test_basic_run(self): --evaluation_strategy epoch --load_best_model_at_end --skip_memory_metrics False + --report_to none """.split() fsdp_args = ["--fsdp", "shard_grad_op auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] @@ -236,7 +237,7 @@ def test_basic_run_with_cpu_offload(self, dtype): --max_seq_length 128 --per_device_train_batch_size 16 --learning_rate 5e-5 - --num_train_epochs 3 + --num_train_epochs 1 --lr_scheduler_type cosine --logging_steps 1 --save_strategy epoch @@ -244,6 +245,7 @@ def test_basic_run_with_cpu_offload(self, dtype): --evaluation_strategy epoch --load_best_model_at_end --skip_memory_metrics False + --report_to none --{dtype} """.split() @@ -283,21 +285,23 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ --save_strategy epoch --do_eval --evaluation_strategy epoch + --report_to none --{dtype} """.split() script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] if not use_accelerate: - fsdp_args = f""" - --fsdp "{sharding_strategy} auto_wrap" - --fsdp_transformer_layer_cls_to_wrap BertLayer - """.split() - + fsdp_args = [ + "--fsdp", + f"{sharding_strategy} auto_wrap", + "--fsdp_transformer_layer_cls_to_wrap", + "BertLayer", + ] cmd = launcher + script + args + fsdp_args else: fsdp_config = f""" - --fsdp_sharding_strategy={FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} + --fsdp_sharding_strategy {FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} """.split() cmd = launcher + fsdp_config + script + args @@ -324,20 +328,22 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ --save_strategy epoch --do_eval --evaluation_strategy epoch + --report_to none --{dtype} --resume_from_checkpoint {checkpoint} """.split() if not use_accelerate: - fsdp_args = f""" - --fsdp "{sharding_strategy} auto_wrap" - --fsdp_transformer_layer_cls_to_wrap BertLayer - """.split() - + fsdp_args = [ + "--fsdp", + f"{sharding_strategy} auto_wrap", + "--fsdp_transformer_layer_cls_to_wrap", + "BertLayer", + ] cmd = launcher + script + resume_args + fsdp_args else: fsdp_config = f""" - --fsdp_sharding_strategy={FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} + --fsdp_sharding_strategy {FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} """.split() cmd = launcher + fsdp_config + script + resume_args From c0fc9300f4bf2986fa0ca32d2c430d522cac669d Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:45:00 +0530 Subject: [PATCH 15/26] minor nits --- tests/fsdp/test_fsdp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index f9058edf7fd..377e55c0db9 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -204,7 +204,7 @@ def test_basic_run(self): --learning_rate 5e-5 --num_train_epochs 3 --lr_scheduler_type cosine - --logging_steps 1 + --logging_steps 50 --save_strategy epoch --do_eval --evaluation_strategy epoch @@ -239,7 +239,7 @@ def test_basic_run_with_cpu_offload(self, dtype): --learning_rate 5e-5 --num_train_epochs 1 --lr_scheduler_type cosine - --logging_steps 1 + --logging_steps 50 --save_strategy epoch --do_eval --evaluation_strategy epoch From dd2431b112c8d35b3742e8c86358d3c4830f923d Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:48:42 +0530 Subject: [PATCH 16/26] fix code style and quality --- src/transformers/testing_utils.py | 2 +- src/transformers/utils/__init__.py | 2 +- tests/fsdp/test_fsdp.py | 31 ++++++++---------------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index eb5e4bd1578..45a2d33907a 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -61,6 +61,7 @@ is_essentia_available, is_faiss_available, is_flax_available, + is_fsdp_available, is_ftfy_available, is_ipex_available, is_jieba_available, @@ -105,7 +106,6 @@ is_torchdynamo_available, is_torchvision_available, is_vision_available, - is_fsdp_available, strtobool, ) diff --git a/src/transformers/utils/__init__.py b/src/transformers/utils/__init__.py index 7e648c9a04b..701c094e246 100644 --- a/src/transformers/utils/__init__.py +++ b/src/transformers/utils/__init__.py @@ -103,7 +103,6 @@ direct_transformers_import, get_torch_version, is_accelerate_available, - is_fsdp_available, is_apex_available, is_auto_gptq_available, is_bitsandbytes_available, @@ -116,6 +115,7 @@ is_essentia_available, is_faiss_available, is_flax_available, + is_fsdp_available, is_ftfy_available, is_in_notebook, is_ipex_available, diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 377e55c0db9..64a6f7a86e7 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -12,42 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -import dataclasses -import io import itertools -import json import os -import unittest -from copy import deepcopy from functools import partial -import datasets from parameterized import parameterized import tests.trainer.test_trainer from tests.trainer.test_trainer import TrainerIntegrationCommon # noqa -from transformers import AutoModel, TrainingArguments, is_torch_available, logging -from transformers.trainer_callback import TrainerState - +from transformers import is_torch_available from transformers.testing_utils import ( - CaptureLogger, - CaptureStd, - CaptureStderr, - LoggingLevel, TestCasePlus, execute_subprocess_async, get_gpu_count, mockenv_context, - require_deepspeed, - require_optuna, - require_torch_gpu, require_accelerate, require_fsdp, + require_torch_gpu, require_torch_multi_gpu, slow, ) -from transformers.trainer_utils import get_last_checkpoint, set_seed, FSDPOption -from transformers.utils import WEIGHTS_NAME, is_torch_bf16_gpu_available, is_accelerate_available +from transformers.trainer_callback import TrainerState +from transformers.trainer_utils import FSDPOption, set_seed +from transformers.utils import is_accelerate_available, is_torch_bf16_gpu_available # default torch.distributed port @@ -88,11 +75,9 @@ def get_master_port(real_launcher=False): if is_accelerate_available(): from accelerate.utils.constants import ( - FSDP_AUTO_WRAP_POLICY, - FSDP_BACKWARD_PREFETCH, + FSDP_PYTORCH_VERSION, FSDP_SHARDING_STRATEGY, FSDP_STATE_DICT_TYPE, - FSDP_PYTORCH_VERSION, ) require_fsdp_version = partial(require_fsdp, min_version=FSDP_PYTORCH_VERSION) @@ -208,7 +193,7 @@ def test_basic_run(self): --save_strategy epoch --do_eval --evaluation_strategy epoch - --load_best_model_at_end + --load_best_model_at_end --skip_memory_metrics False --report_to none """.split() @@ -243,7 +228,7 @@ def test_basic_run_with_cpu_offload(self, dtype): --save_strategy epoch --do_eval --evaluation_strategy epoch - --load_best_model_at_end + --load_best_model_at_end --skip_memory_metrics False --report_to none --{dtype} From 6cb7db30cafba2d0f7dda76c76fa175e4a0dbc28 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:02:53 +0530 Subject: [PATCH 17/26] refactor and modularize test code --- tests/fsdp/test_fsdp.py | 129 +++++++++------------------------------- 1 file changed, 27 insertions(+), 102 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 64a6f7a86e7..18980556062 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -178,33 +178,10 @@ def test_fsdp_config(self, sharding_strategy, dtype): def test_basic_run(self): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = f""" - --model_name_or_path bert-base-cased - --task_name mrpc - --output_dir {output_dir} - --overwrite_output_dir - --do_train - --max_seq_length 128 - --per_device_train_batch_size 16 - --learning_rate 5e-5 - --num_train_epochs 3 - --lr_scheduler_type cosine - --logging_steps 50 - --save_strategy epoch - --do_eval - --evaluation_strategy epoch - --load_best_model_at_end - --skip_memory_metrics False - --report_to none - """.split() - - fsdp_args = ["--fsdp", "shard_grad_op auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] - + args = self.get_base_args(output_dir, 3, 50).split() + fsdp_args = ["--fsdp", "full_shard auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args - - # keep for quick debug - # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die execute_subprocess_async(cmd, env=self.get_env()) @parameterized.expand(dtypes) @@ -213,34 +190,10 @@ def test_basic_run(self): def test_basic_run_with_cpu_offload(self, dtype): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = f""" - --model_name_or_path bert-base-cased - --task_name mrpc - --output_dir {output_dir} - --overwrite_output_dir - --do_train - --max_seq_length 128 - --per_device_train_batch_size 16 - --learning_rate 5e-5 - --num_train_epochs 1 - --lr_scheduler_type cosine - --logging_steps 50 - --save_strategy epoch - --do_eval - --evaluation_strategy epoch - --load_best_model_at_end - --skip_memory_metrics False - --report_to none - --{dtype} - """.split() - - fsdp_args = ["--fsdp", "shard_grad_op auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] - + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}"] + fsdp_args = ["--fsdp", "full_shard auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args - - # keep for quick debug - # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die execute_subprocess_async(cmd, env=self.get_env()) @parameterized.expand(params_with_state_dict_type, name_func=parameterized_custom_name_func) @@ -254,28 +207,27 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ use_accelerate = state_dict_type == "SHARDED_STATE_DICT" launcher = get_launcher(True, use_accelerate=use_accelerate) + args = self.get_base_args(output_dir, 2, 50).split() + [f"--{dtype}"] + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + logs = self.run_cmd_and_get_logs(use_accelerate, sharding_strategy, launcher, script, args, output_dir) - args = f""" - --model_name_or_path bert-base-cased - --task_name mrpc - --output_dir {output_dir} - --overwrite_output_dir - --do_train - --max_seq_length 128 - --per_device_train_batch_size 16 - --learning_rate 5e-5 - --num_train_epochs 2 - --lr_scheduler_type cosine - --logging_steps 1 - --save_strategy epoch - --do_eval - --evaluation_strategy epoch - --report_to none - --{dtype} + # resume from ckpt + checkpoint = os.path.join(output_dir, "checkpoint-115") + resume_args = ( + args + + f""" + --resume_from_checkpoint {checkpoint} """.split() + ) + logs_resume = self.run_cmd_and_get_logs( + use_accelerate, sharding_strategy, launcher, script, resume_args, output_dir + ) - script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + for log, log1 in zip(logs, logs_resume): + if "learning_rate" in log: + self.assertAlmostEqual(log["learning_rate"], log1["learning_rate"], delta=1e-5) + def run_cmd_and_get_logs(self, use_accelerate, sharding_strategy, launcher, script, args, output_dir): if not use_accelerate: fsdp_args = [ "--fsdp", @@ -293,12 +245,11 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ # keep for quick debug # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die execute_subprocess_async(cmd, env=self.get_env()) - logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history + return logs - # resume from ckpt - checkpoint = os.path.join(output_dir, "checkpoint-115") - resume_args = f""" + def get_base_args(self, output_dir, num_epochs, logging_steps): + return f""" --model_name_or_path bert-base-cased --task_name mrpc --output_dir {output_dir} @@ -307,37 +258,11 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ --max_seq_length 128 --per_device_train_batch_size 16 --learning_rate 5e-5 - --num_train_epochs 2 + --num_train_epochs {num_epochs} --lr_scheduler_type cosine - --logging_steps 1 + --logging_steps {logging_steps} --save_strategy epoch --do_eval --evaluation_strategy epoch --report_to none - --{dtype} - --resume_from_checkpoint {checkpoint} - """.split() - - if not use_accelerate: - fsdp_args = [ - "--fsdp", - f"{sharding_strategy} auto_wrap", - "--fsdp_transformer_layer_cls_to_wrap", - "BertLayer", - ] - cmd = launcher + script + resume_args + fsdp_args - else: - fsdp_config = f""" - --fsdp_sharding_strategy {FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} - """.split() - cmd = launcher + fsdp_config + script + resume_args - - # keep for quick debug - # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die - execute_subprocess_async(cmd, env=self.get_env()) - - logs_resume = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history - - for log, log1 in zip(logs, logs_resume): - if "learning_rate" in log: - self.assertAlmostEqual(log["learning_rate"], log1["learning_rate"], delta=1e-5) + """ From 5ae3309cc5457c093a52357d4e86bd6c63987d70 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:04:49 +0530 Subject: [PATCH 18/26] reduce the time of tests --- tests/fsdp/test_fsdp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 18980556062..96f7e194ab5 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -190,7 +190,7 @@ def test_basic_run(self): def test_basic_run_with_cpu_offload(self, dtype): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}"] + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}", "--max_steps 10"] fsdp_args = ["--fsdp", "full_shard auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args @@ -207,7 +207,7 @@ def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_ use_accelerate = state_dict_type == "SHARDED_STATE_DICT" launcher = get_launcher(True, use_accelerate=use_accelerate) - args = self.get_base_args(output_dir, 2, 50).split() + [f"--{dtype}"] + args = self.get_base_args(output_dir, 2, 25).split() + [f"--{dtype}"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] logs = self.run_cmd_and_get_logs(use_accelerate, sharding_strategy, launcher, script, args, output_dir) From 28a0300c2fe37ff3130b1b53741e5b56b7b225f1 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:11:19 +0530 Subject: [PATCH 19/26] reduce the test time --- tests/fsdp/test_fsdp.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 96f7e194ab5..193bf26d5fc 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -77,7 +77,6 @@ def get_master_port(real_launcher=False): from accelerate.utils.constants import ( FSDP_PYTORCH_VERSION, FSDP_SHARDING_STRATEGY, - FSDP_STATE_DICT_TYPE, ) require_fsdp_version = partial(require_fsdp, min_version=FSDP_PYTORCH_VERSION) @@ -112,6 +111,10 @@ def get_launcher(distributed=False, use_accelerate=False): SHARD_GRAD_OP = "shard_grad_op" sharding_strategies = [FULL_SHARD, SHARD_GRAD_OP] +FULL_STATE_DICT = "FULL_STATE_DICT" +SHARDED_STATE_DICT = "SHARDED_STATE_DICT" +STATE_DICT_TYPE = [FULL_STATE_DICT, SHARDED_STATE_DICT] + def parameterized_custom_name_func(func, param_num, param): # customize the test name generator function as we want both params to appear in the sub-test @@ -121,7 +124,7 @@ def parameterized_custom_name_func(func, param_num, param): params = list(itertools.product(sharding_strategies, dtypes)) -params_with_state_dict_type = list(itertools.product(sharding_strategies, dtypes, FSDP_STATE_DICT_TYPE)) +params_with_state_dict_type = list(itertools.product(dtypes, STATE_DICT_TYPE)) @require_accelerate @@ -173,13 +176,14 @@ def test_fsdp_config(self, sharding_strategy, dtype): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") + @parameterized.expand(sharding_strategies) @require_torch_multi_gpu @slow - def test_basic_run(self): + def test_basic_run(self, sharding_strategy): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() args = self.get_base_args(output_dir, 3, 50).split() - fsdp_args = ["--fsdp", "full_shard auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] + fsdp_args = ["--fsdp", f"{sharding_strategy} auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args execute_subprocess_async(cmd, env=self.get_env()) @@ -199,12 +203,10 @@ def test_basic_run_with_cpu_offload(self, dtype): @parameterized.expand(params_with_state_dict_type, name_func=parameterized_custom_name_func) @require_torch_multi_gpu @slow - def test_training_and_can_resume_normally(self, sharding_strategy, dtype, state_dict_type): + def test_training_and_can_resume_normally(self, dtype, state_dict_type): output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) - if state_dict_type == "LOCAL_STATE_DICT": - return - + sharding_strategy = "full_shard" use_accelerate = state_dict_type == "SHARDED_STATE_DICT" launcher = get_launcher(True, use_accelerate=use_accelerate) args = self.get_base_args(output_dir, 2, 25).split() + [f"--{dtype}"] From 7ec28e7e7fb33ac4ffd2c7fbfc788712c7fed728 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:15:19 +0530 Subject: [PATCH 20/26] fix test --- tests/fsdp/test_fsdp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 193bf26d5fc..0c18eb26455 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -194,7 +194,7 @@ def test_basic_run(self, sharding_strategy): def test_basic_run_with_cpu_offload(self, dtype): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}", "--max_steps 10"] + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}", "--max_steps", "10"] fsdp_args = ["--fsdp", "full_shard auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args From 56dfb476313e26f24e42a5913f29a53c2499b853 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:24:40 +0530 Subject: [PATCH 21/26] reduce test time --- tests/fsdp/test_fsdp.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 0c18eb26455..7ad337df4ec 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -176,13 +176,13 @@ def test_fsdp_config(self, sharding_strategy, dtype): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") - @parameterized.expand(sharding_strategies) + @parameterized.expand(params, name_func=parameterized_custom_name_func) @require_torch_multi_gpu @slow - def test_basic_run(self, sharding_strategy): + def test_basic_run(self, sharding_strategy, dtype): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = self.get_base_args(output_dir, 3, 50).split() + args = self.get_base_args(output_dir, 3, 50).split() + [f"--{dtype}"] fsdp_args = ["--fsdp", f"{sharding_strategy} auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args @@ -200,16 +200,16 @@ def test_basic_run_with_cpu_offload(self, dtype): cmd = launcher + script + args + fsdp_args execute_subprocess_async(cmd, env=self.get_env()) - @parameterized.expand(params_with_state_dict_type, name_func=parameterized_custom_name_func) + @parameterized.expand(STATE_DICT_TYPE, name_func=parameterized_custom_name_func) @require_torch_multi_gpu @slow - def test_training_and_can_resume_normally(self, dtype, state_dict_type): + def test_training_and_can_resume_normally(self, state_dict_type): output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) sharding_strategy = "full_shard" use_accelerate = state_dict_type == "SHARDED_STATE_DICT" launcher = get_launcher(True, use_accelerate=use_accelerate) - args = self.get_base_args(output_dir, 2, 25).split() + [f"--{dtype}"] + args = self.get_base_args(output_dir, 2, 25).split() script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] logs = self.run_cmd_and_get_logs(use_accelerate, sharding_strategy, launcher, script, args, output_dir) From 28cd21d1a996fa1e70aa5900ec9eb91b2070159c Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:37:13 +0530 Subject: [PATCH 22/26] reduce test time --- tests/fsdp/test_fsdp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 7ad337df4ec..a07644fad20 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -182,7 +182,7 @@ def test_fsdp_config(self, sharding_strategy, dtype): def test_basic_run(self, sharding_strategy, dtype): launcher = get_launcher(distributed=True, use_accelerate=False) output_dir = self.get_auto_remove_tmp_dir() - args = self.get_base_args(output_dir, 3, 50).split() + [f"--{dtype}"] + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}"] fsdp_args = ["--fsdp", f"{sharding_strategy} auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] cmd = launcher + script + args + fsdp_args From a6c27b2da460cdc591547b347763025979278129 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:55:44 +0530 Subject: [PATCH 23/26] fix failing tests --- src/transformers/trainer.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 2b1bb78298b..a6cfa3ce913 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2079,7 +2079,7 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): weights_index_file = os.path.join(resume_from_checkpoint, WEIGHTS_INDEX_NAME) safe_weights_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_NAME) safe_weights_index_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_INDEX_NAME) - is_fsdp_ckpt = any( + is_fsdp_ckpt = resume_from_checkpoint.is_dir() and any( WEIGHTS_NAME.split(".")[0] in folder_name for folder_name in os.listdir(resume_from_checkpoint) if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) @@ -2502,10 +2502,13 @@ def _load_optimizer_and_scheduler(self, checkpoint): else ( os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME)) or os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME_BIN)) - or any( - OPTIMIZER_NAME_BIN.split(".")[0] in folder_name - for folder_name in os.listdir(checkpoint) - if os.path.isdir(os.path.join(checkpoint, folder_name)) + or ( + checkpoint.is_dir() + and any( + OPTIMIZER_NAME_BIN.split(".")[0] in folder_name + for folder_name in os.listdir(checkpoint) + if os.path.isdir(os.path.join(checkpoint, folder_name)) + ) ) ) ) From 94a5e0a59d3829d32b7cefab10720718b260eaea Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:58:08 +0530 Subject: [PATCH 24/26] fix --- src/transformers/trainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index a6cfa3ce913..df8bffadafe 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2079,7 +2079,7 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): weights_index_file = os.path.join(resume_from_checkpoint, WEIGHTS_INDEX_NAME) safe_weights_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_NAME) safe_weights_index_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_INDEX_NAME) - is_fsdp_ckpt = resume_from_checkpoint.is_dir() and any( + is_fsdp_ckpt = os.path.isdir(resume_from_checkpoint) and any( WEIGHTS_NAME.split(".")[0] in folder_name for folder_name in os.listdir(resume_from_checkpoint) if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) @@ -2503,7 +2503,7 @@ def _load_optimizer_and_scheduler(self, checkpoint): os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME)) or os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME_BIN)) or ( - checkpoint.is_dir() + os.path.isdir(checkpoint) and any( OPTIMIZER_NAME_BIN.split(".")[0] in folder_name for folder_name in os.listdir(checkpoint) From a80385a9ae2c369ebf9cefd9d53a7b3891f849fb Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Tue, 19 Sep 2023 10:19:13 +0530 Subject: [PATCH 25/26] Apply suggestions from code review Co-authored-by: Arthur <48595927+ArthurZucker@users.noreply.github.com> --- src/transformers/testing_utils.py | 2 +- tests/fsdp/test_fsdp.py | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index 45a2d33907a..f5f36a8e25b 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -318,7 +318,7 @@ def require_accelerate(test_case): def require_fsdp(test_case, min_version: str = "1.12.0"): """ - Decorator marking a test that requires accelerate. These tests are skipped when accelerate isn't installed. + Decorator marking a test that requires fsdp. These tests are skipped when fsdp isn't installed. """ return unittest.skipUnless(is_fsdp_available(min_version), f"test requires torch version >= {min_version}")( test_case diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index a07644fad20..3d51715bb25 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -100,13 +100,9 @@ def get_launcher(distributed=False, use_accelerate=False): return f"torchrun --nnodes 1 --nproc-per-node {num_gpus} --master-port {master_port}".split() -FP16 = "fp16" -BF16 = "bf16" +dtypes = ["fp16"] if is_torch_bf16_gpu_available(): - dtypes = [FP16, BF16] -else: - dtypes = [FP16] - + dtypes += ["bf16"] FULL_SHARD = "full_shard" SHARD_GRAD_OP = "shard_grad_op" sharding_strategies = [FULL_SHARD, SHARD_GRAD_OP] @@ -116,7 +112,7 @@ def get_launcher(distributed=False, use_accelerate=False): STATE_DICT_TYPE = [FULL_STATE_DICT, SHARDED_STATE_DICT] -def parameterized_custom_name_func(func, param_num, param): +def _parameterized_custom_name_func(func, param_num, param): # customize the test name generator function as we want both params to appear in the sub-test # name, as by default it shows only the first param param_based_name = parameterized.to_safe_name("_".join(str(x) for x in param.args)) From eb7cb98af861eacddb7e7b198807f127fc9fe5d7 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Tue, 19 Sep 2023 10:39:55 +0530 Subject: [PATCH 26/26] resolve comments --- src/transformers/trainer.py | 3 +++ tests/fsdp/test_fsdp.py | 35 ++++++++++------------------------- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index df8bffadafe..6f3605da2c7 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2085,6 +2085,9 @@ def _load_from_checkpoint(self, resume_from_checkpoint, model=None): if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) ) + if is_fsdp_ckpt and not self.is_fsdp_enabled: + raise ValueError(f"Checkpoint found at {resume_from_checkpoint} is only supported when using PyTorch FSDP") + if not ( any( os.path.isfile(f) diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py index 3d51715bb25..1a968f68faf 100644 --- a/tests/fsdp/test_fsdp.py +++ b/tests/fsdp/test_fsdp.py @@ -39,7 +39,13 @@ # default torch.distributed port DEFAULT_MASTER_PORT = "10999" +dtypes = ["fp16"] +if is_torch_bf16_gpu_available(): + dtypes += ["bf16"] +sharding_strategies = ["full_shard", "shard_grad_op"] +state_dict_types = ["FULL_STATE_DICT", "SHARDED_STATE_DICT"] set_seed(42) +params = list(itertools.product(sharding_strategies, dtypes)) def get_master_port(real_launcher=False): @@ -100,18 +106,6 @@ def get_launcher(distributed=False, use_accelerate=False): return f"torchrun --nnodes 1 --nproc-per-node {num_gpus} --master-port {master_port}".split() -dtypes = ["fp16"] -if is_torch_bf16_gpu_available(): - dtypes += ["bf16"] -FULL_SHARD = "full_shard" -SHARD_GRAD_OP = "shard_grad_op" -sharding_strategies = [FULL_SHARD, SHARD_GRAD_OP] - -FULL_STATE_DICT = "FULL_STATE_DICT" -SHARDED_STATE_DICT = "SHARDED_STATE_DICT" -STATE_DICT_TYPE = [FULL_STATE_DICT, SHARDED_STATE_DICT] - - def _parameterized_custom_name_func(func, param_num, param): # customize the test name generator function as we want both params to appear in the sub-test # name, as by default it shows only the first param @@ -119,10 +113,6 @@ def _parameterized_custom_name_func(func, param_num, param): return f"{func.__name__}_{param_based_name}" -params = list(itertools.product(sharding_strategies, dtypes)) -params_with_state_dict_type = list(itertools.product(dtypes, STATE_DICT_TYPE)) - - @require_accelerate @require_torch_gpu @require_fsdp_version @@ -151,7 +141,7 @@ def setUp(self): def tearDown(self): super().tearDown() - @parameterized.expand(params, name_func=parameterized_custom_name_func) + @parameterized.expand(params, name_func=_parameterized_custom_name_func) def test_fsdp_config(self, sharding_strategy, dtype): output_dir = self.get_auto_remove_tmp_dir() kwargs = { @@ -172,7 +162,7 @@ def test_fsdp_config(self, sharding_strategy, dtype): self.assertEqual(v, self.fsdp_config[k]) self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") - @parameterized.expand(params, name_func=parameterized_custom_name_func) + @parameterized.expand(params, name_func=_parameterized_custom_name_func) @require_torch_multi_gpu @slow def test_basic_run(self, sharding_strategy, dtype): @@ -196,7 +186,7 @@ def test_basic_run_with_cpu_offload(self, dtype): cmd = launcher + script + args + fsdp_args execute_subprocess_async(cmd, env=self.get_env()) - @parameterized.expand(STATE_DICT_TYPE, name_func=parameterized_custom_name_func) + @parameterized.expand(state_dict_types, name_func=_parameterized_custom_name_func) @require_torch_multi_gpu @slow def test_training_and_can_resume_normally(self, state_dict_type): @@ -211,12 +201,7 @@ def test_training_and_can_resume_normally(self, state_dict_type): # resume from ckpt checkpoint = os.path.join(output_dir, "checkpoint-115") - resume_args = ( - args - + f""" - --resume_from_checkpoint {checkpoint} - """.split() - ) + resume_args = args + f"--resume_from_checkpoint {checkpoint}".split() logs_resume = self.run_cmd_and_get_logs( use_accelerate, sharding_strategy, launcher, script, resume_args, output_dir )