diff --git a/dockers/base-cuda/Dockerfile b/dockers/base-cuda/Dockerfile index 843e47ca91289..a3624f536a0f2 100644 --- a/dockers/base-cuda/Dockerfile +++ b/dockers/base-cuda/Dockerfile @@ -113,11 +113,6 @@ RUN \ pip install --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./apex && \ rm -rf apex -RUN \ - # install DeepSpeed from source. - # todo: swap to pypi release once DeepSpeed releases a new version >= 0.3.10 - pip install deepspeed@git+https://github.com/microsoft/DeepSpeed@ec8b1cb - RUN \ # Show what we have pip --version && \ diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 7d16d91e3bf82..569af875e6c64 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -441,7 +441,7 @@ def results(self) -> Any: return self.training_type_plugin.results @contextlib.contextmanager - def model_sharded_context(self) -> Generator: + def model_sharded_context(self) -> Generator[None, None, None]: """ Provide hook to create modules in a distributed aware context. This is useful for when we'd like to shard the model instantly - useful for extremely large models. Can save memory and @@ -511,3 +511,6 @@ def setup_optimizers_in_pre_dispatch(self) -> bool: Returns: If True, delay setup optimizers till pre_dispatch, else call within setup. """ return self.training_type_plugin.setup_optimizers_in_pre_dispatch + + def update_global_step(self, total_batch_idx: int, current_global_step: int) -> int: + return self.training_type_plugin.update_global_step(total_batch_idx, current_global_step) diff --git a/pytorch_lightning/plugins/precision/double.py b/pytorch_lightning/plugins/precision/double.py index 4720f0f874fd0..6e37c79f2b163 100644 --- a/pytorch_lightning/plugins/precision/double.py +++ b/pytorch_lightning/plugins/precision/double.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from functools import wraps -from typing import Any, Sequence, Tuple, TYPE_CHECKING, List +from typing import Any, List, Sequence, Tuple, TYPE_CHECKING import torch @@ -44,9 +44,7 @@ def _to_double_precision(data: torch.Tensor) -> torch.Tensor: @staticmethod def _move_float_tensors_to_double(collection: Any) -> Any: - return apply_to_collection( - collection, torch.Tensor, function=_DoublePrecisionPatch._to_double_precision - ) + return apply_to_collection(collection, torch.Tensor, function=_DoublePrecisionPatch._to_double_precision) @classmethod def patch(cls, model: 'Module', method_name: str) -> '_DoublePrecisionPatch': diff --git a/pytorch_lightning/plugins/training_type/deepspeed.py b/pytorch_lightning/plugins/training_type/deepspeed.py index b196044937414..3dc52b60055d8 100644 --- a/pytorch_lightning/plugins/training_type/deepspeed.py +++ b/pytorch_lightning/plugins/training_type/deepspeed.py @@ -11,17 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import contextlib import json import logging import os +from collections import OrderedDict from pathlib import Path from types import SimpleNamespace -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union import torch -from torch.nn.parallel import DistributedDataParallel +from pytorch_lightning.callbacks import GradientAccumulationScheduler from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.overrides.base import _LightningModuleWrapperBase from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment @@ -37,6 +38,17 @@ import deepspeed +def remove_module_hooks(model: torch.nn.Module) -> None: + # todo (tchaton) awaiting this feature to move upstream to DeepSpeed + for module in model.modules(): + module._backward_hooks = OrderedDict() + module._is_full_backward_hook = None + module._forward_hooks = OrderedDict() + module._forward_pre_hooks = OrderedDict() + module._state_dict_hooks = OrderedDict() + module._load_state_dict_pre_hooks = OrderedDict() + + class LightningDeepSpeedModule(_LightningModuleWrapperBase): def __init__(self, pl_module: LightningModule, precision: int): @@ -67,6 +79,8 @@ def __init__( zero_optimization: bool = True, stage: int = 2, cpu_offload: bool = False, + cpu_offload_params: bool = False, + cpu_offload_use_pin_memory: bool = False, contiguous_gradients: bool = True, overlap_comm: bool = True, allgather_partitions: bool = True, @@ -80,10 +94,14 @@ def __init__( parallel_devices: Optional[List[torch.device]] = None, cluster_environment: Optional[ClusterEnvironment] = None, loss_scale: float = 0, - initial_scale_power: int = 32, + initial_scale_power: int = 16, loss_scale_window: int = 1000, hysteresis: int = 2, - min_loss_scale: int = 1 + min_loss_scale: int = 1, + partition_activations: bool = False, + cpu_checkpointing: bool = False, + contiguous_memory_optimization: bool = False, + synchronize_checkpoint_boundary: bool = False, ) -> None: """ @@ -106,6 +124,10 @@ def __init__( cpu_offload: Enable offloading optimizer memory and computation to CPU + cpu_offload_params: When using ZeRO stage 3, offload parameters to CPU + + cpu_offload_use_pin_memory: When using ZeRO stage 3, pin memory on CPU + contiguous_gradients: Copies gradients to a continuous buffer as they are produced. Avoids memory fragmentation during backwards. Useful when training large models. (default: True) @@ -144,6 +166,17 @@ def __init__( min_loss_scale: The minimum FP16 dynamic loss scaling value (Default: 1000) + partition_activations: Enables partition activation when used with ZeRO stage 3. + Still requires you to wrap your forward functions in deepspeed.checkpointing.checkpoint. + See `deepspeed tutorial + `_ + + cpu_checkpointing: Offloads partitioned activations to CPU if ``partition_activations`` is enabled + + contiguous_memory_optimization: Copies partitioned activations so that they are contiguous in memory. + Not supported by all models + + synchronize_checkpoint_boundary: Insert :func:`torch.cuda.synchronize` at each checkpoint boundary. """ if not _DEEPSPEED_AVAILABLE: raise MisconfigurationException( @@ -159,8 +192,14 @@ def __init__( self.config = self._create_default_config( zero_optimization, zero_allow_untested_optimizer, + partition_activations=partition_activations, + cpu_checkpointing=cpu_checkpointing, + contiguous_memory_optimization=contiguous_memory_optimization, + synchronize_checkpoint_boundary=synchronize_checkpoint_boundary, stage=stage, cpu_offload=cpu_offload, + cpu_offload_params=cpu_offload_params, + cpu_offload_use_pin_memory=cpu_offload_use_pin_memory, contiguous_gradients=contiguous_gradients, overlap_comm=overlap_comm, allgather_partitions=allgather_partitions, @@ -200,9 +239,14 @@ def init_deepspeed(self): self._format_config() self._config_initialized = True + self._handle_gradient_accumulation_steps() + precision = self.lightning_module.trainer.accelerator.precision model = LightningDeepSpeedModule(pl_module=self.model, precision=precision) + if self.on_gpu: + torch.cuda.set_device(self.root_device) + if self.lightning_module.trainer and self.lightning_module.trainer.training: self._initialize_deepspeed_train(model) else: @@ -220,9 +264,11 @@ def _init_scheduler_optimizer(self): optimizer = optimizers[0] return optimizer, scheduler, optimizer_frequencies + @property + def zero_stage_3(self) -> bool: + return self.config.get('zero_optimization') and self.config.get('zero_optimization').get('stage') == 3 + def _initialize_deepspeed_train(self, model): - if self.on_gpu: - torch.cuda.set_device(self.root_device) optimizer, lightning_scheduler, optimizer_frequencies = None, None, None if "optimizer" not in self.config: rank_zero_info( @@ -239,21 +285,65 @@ def _initialize_deepspeed_train(self, model): lr_scheduler=lightning_scheduler, config_params=self.config, ) + self._set_deepspeed_activation_checkpointing() # set optimizer for save/load, but deepspeed manages the specific optimizer logic self.lightning_module.trainer.optimizers = [optimizer] + self.lightning_module.trainer.schedulers = [lr_scheduler] self.model = model + @contextlib.contextmanager + def model_sharded_context(self) -> Generator[None, None, None]: + if self.zero_stage_3: + model_parallel_context = deepspeed.zero.Init(remote_device="cpu", pin_memory=True) + else: + model_parallel_context = super().model_sharded_context() + + with model_parallel_context: + yield + + def _set_deepspeed_activation_checkpointing(self): + if self.config.get('activation_checkpointing'): + checkpoint_config = self.config['activation_checkpointing'] + deepspeed.checkpointing.configure( + mpu_=None, + partition_activations=checkpoint_config.get('partition_activations'), + contiguous_checkpointing=checkpoint_config.get('contiguous_checkpointing'), + checkpoint_in_cpu=checkpoint_config.get('checkpoint_in_cpu'), + profile=checkpoint_config.get('profile'), + ) + def _initialize_deepspeed_inference(self, model): - # move the model to the correct device - self.model_to_device() - - self.pre_configure_ddp() - self.model = DistributedDataParallel( - model, - device_ids=self.determine_ddp_device_ids(), - **self._ddp_kwargs, + # todo: Currently DeepSpeed requires optimizers at inference to partition weights correctly + optimizer, lightning_scheduler, optimizer_frequencies = None, None, None + if "optimizer" not in self.config: + rank_zero_info( + "You have not specified an optimizer or scheduler within the DeepSpeed config." + "Using `configure_optimizers` to define optimizer and scheduler." + ) + optimizer, lightning_scheduler, optimizer_frequencies = self._init_scheduler_optimizer() + inference_config = { + # todo: this is required for DeepSpeed throughput timers, or throughput timers will be incorrect + 'train_micro_batch_size_per_gpu': 1, + } + if 'fp16' in self.config: + inference_config.update({"fp16": self.config["fp16"]}) + if self.zero_stage_3: + inference_config.update({ + "zero_allow_untested_optimizer": self.config['zero_allow_untested_optimizer'], + "zero_optimization": self.config['zero_optimization'], + }) + # Remove all module hooks before initializing new model + remove_module_hooks(model) + model, _, _, _ = deepspeed.initialize( + args=SimpleNamespace(local_rank=self.local_rank), + model=model, + optimizer=optimizer, + lr_scheduler=lightning_scheduler, + config_params=inference_config, + model_parameters=[], ) + self.model = model def configure_scheduler(self, lr_scheduler): scheduler = _get_default_scheduler_config() @@ -282,6 +372,20 @@ def optimizer_step(self, optimizer: torch.optim.Optimizer, lambda_closure: Calla # internally, the engine has a reference to the optimizer already. self.model.step(**kwargs) + def _handle_gradient_accumulation_steps(self): + """ + This functions overrides the trainer.accumulation_scheduler to generate + ``accumulate_grad_batches=1``. + Therefore, ``optimizer_step`` will be called on every batches seen + so DeepSpeed Engine handles the gradient accumulation logic internally. + """ + if self.config.get("gradient_accumulation_steps") > 1: + self._original_accumulate_grad_batches = self.lightning_module.trainer.accumulate_grad_batches + # todo (tchaton) Add support for accumulate_grad_batches being a dictionary. + self.lightning_module.trainer.accumulation_scheduler = GradientAccumulationScheduler({0: 1}) + else: + self._original_accumulate_grad_batches = None + def _format_config(self): if self.config is None: raise MisconfigurationException( @@ -300,14 +404,13 @@ def _format_batch_size_and_grad_accum_config(self): if "train_micro_batch_size_per_gpu" not in self.config: # train_micro_batch_size_per_gpu is used for throughput logging purposes # by default we use the batch size of the loader which may be incorrect if a batch sampler is passed - batch_size = self.lightning_module.train_dataloader().batch_size + batch_size = self.lightning_module.train_dataloader().batch_sampler.batch_size self.config["train_micro_batch_size_per_gpu"] = batch_size self.config["gradient_accumulation_steps"] = self.lightning_module.trainer.accumulate_grad_batches if "gradient_clipping" not in self.config: self.config["gradient_clipping"] = self.lightning_module.trainer.gradient_clip_val def _format_precision_config(self): - amp_type = self.lightning_module.trainer.accelerator_connector.amp_type amp_level = self.lightning_module.trainer.accelerator_connector.amp_level precision = self.lightning_module.trainer.accelerator_connector.precision @@ -333,8 +436,87 @@ def _format_precision_config(self): raise MisconfigurationException("To use DeepSpeed ZeRO Optimization, you must set precision=16.") def _create_default_config( - self, zero_optimization: bool, zero_allow_untested_optimizer: bool, **zero_kwargs + self, + zero_optimization: bool, + zero_allow_untested_optimizer: bool, + partition_activations: bool, + cpu_checkpointing: bool, + contiguous_memory_optimization: bool, + synchronize_checkpoint_boundary: bool, + **zero_kwargs, ) -> Dict: + cfg = { + 'activation_checkpointing': { + "partition_activations": partition_activations, + "cpu_checkpointing": cpu_checkpointing, + "contiguous_memory_optimization": contiguous_memory_optimization, + "synchronize_checkpoint_boundary": synchronize_checkpoint_boundary + } + } if zero_optimization: - return {"zero_allow_untested_optimizer": zero_allow_untested_optimizer, "zero_optimization": zero_kwargs} - return {} + cfg = { + "zero_allow_untested_optimizer": zero_allow_untested_optimizer, + "zero_optimization": zero_kwargs, + **cfg + } + return cfg + + def _filepath_to_dir(self, filepath: str) -> str: + return os.path.dirname(filepath) + + @property + def deepspeed_engine(self): + return self.model + + def save_checkpoint(self, checkpoint: Dict, filepath: str) -> None: + """Save model/training states as a checkpoint file through state-dump and file-write. + + Args: + filepath: write-target file's path + weights_only: saving model weights only + """ + if self.world_size > 1 and self.zero_stage_3: + # Use deepspeed's internal checkpointing function to handle partitioned weights across processes + # dump states as a checkpoint dictionary object + save_dir = self._filepath_to_dir(filepath) + _exclude_keys = ['state_dict', 'optimizer_states', 'lr_schedulers'] + checkpoint = {k: v for k, v in checkpoint.items() if k not in _exclude_keys} + self.deepspeed_engine.save_checkpoint(save_dir, client_state=checkpoint) + + else: + super().save_checkpoint(checkpoint, filepath) + + def restore_model_state_from_ckpt_path( + self, + ckpt_path: str, + map_location: Callable = lambda storage, loc: storage, + ) -> Tuple[Dict, bool]: + if self.world_size > 1: + from pytorch_lightning.trainer.states import TrainerState + stage_is_fit = self.lightning_module.trainer.state == TrainerState.FITTING + save_dir = self._filepath_to_dir(ckpt_path) + + if self.zero_stage_3: + # TODO: Currently required as this call is missing within the deepspeed engine. + self.deepspeed_engine.optimizer._partition_all_parameters() + + _, client_state = self.deepspeed_engine.load_checkpoint( + save_dir, load_optimizer_states=stage_is_fit, load_lr_scheduler_states=stage_is_fit + ) + + # restore datamodule states + if self.lightning_module.trainer.datamodule is not None: + self.lightning_module.trainer.datamodule.on_load_checkpoint(client_state) + + # hook: give user access to checkpoint if needed. + self.lightning_module.on_load_checkpoint(client_state) + return client_state, False + return super().restore_model_state_from_ckpt_path(ckpt_path, map_location=map_location) + + def update_global_step(self, total_batch_idx: int, current_global_step: int) -> int: + if self._original_accumulate_grad_batches is None: + return super().update_global_step(total_batch_idx, current_global_step) + else: + if total_batch_idx % self._original_accumulate_grad_batches == 0: + current_global_step += 1 + return current_global_step diff --git a/pytorch_lightning/plugins/training_type/training_type_plugin.py b/pytorch_lightning/plugins/training_type/training_type_plugin.py index 1eac88212e0fb..01c23504b7773 100644 --- a/pytorch_lightning/plugins/training_type/training_type_plugin.py +++ b/pytorch_lightning/plugins/training_type/training_type_plugin.py @@ -13,7 +13,7 @@ # limitations under the License. import contextlib from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, Generator, Iterable, Optional, TYPE_CHECKING, Union +from typing import Any, Callable, Dict, Generator, Iterable, Optional, Tuple, TYPE_CHECKING, Union import torch from torch.nn import Module @@ -25,6 +25,7 @@ from pytorch_lightning.plugins.base_plugin import Plugin from pytorch_lightning.utilities import rank_zero_warn from pytorch_lightning.utilities.cloud_io import atomic_save +from pytorch_lightning.utilities.cloud_io import load as pl_load if TYPE_CHECKING: from pytorch_lightning.trainer.trainer import Trainer @@ -197,6 +198,45 @@ def setup_optimizers_in_pre_dispatch(self) -> bool: """ return False + def restore_model_state_from_ckpt_path( + self, + ckpt_path: str, + map_location: Callable = lambda storage, loc: storage, + ) -> Tuple[Dict, bool]: + """ + This function is used to load and restore the model state. + + Args: + ckpt_path: Path to a checkpoint + map_location: lambda function to map checkpoint location + + Return + checkpoint: Return loaded checkpoint + bool: Wether to load optimizer / lr_schedulers states from checkpoint + + """ + ckpt = pl_load(ckpt_path, map_location=map_location) + # restore datamodule states + if self.lightning_module.trainer.datamodule is not None: + self.lightning_module.trainer.datamodule.on_load_checkpoint(ckpt) + + # hook: give user access to checkpoint if needed. + self.lightning_module.on_load_checkpoint(ckpt) + self.lightning_module.load_state_dict(ckpt['state_dict']) + return ckpt, True + + def update_global_step(self, total_batch_idx: int, current_global_step: int) -> int: + """ + Provide a hook to count optimizer step calls. + + Args: + total_batch_idx: Total number of batches seen for training + current_global_step: Current number of optimizer step calls + + Returns: New optimizer step calls + """ + return current_global_step + 1 + def save_checkpoint(self, checkpoint: Dict[str, Any], filepath: str) -> None: """Save model/training states as a checkpoint file through state-dump and file-write. diff --git a/pytorch_lightning/trainer/connectors/checkpoint_connector.py b/pytorch_lightning/trainer/connectors/checkpoint_connector.py index 8b602fa6caa69..4ae42e4bad6ac 100644 --- a/pytorch_lightning/trainer/connectors/checkpoint_connector.py +++ b/pytorch_lightning/trainer/connectors/checkpoint_connector.py @@ -90,20 +90,17 @@ def restore(self, checkpoint_path: str, on_gpu: bool) -> bool: rank_zero_warn("No checkpoint file exists at `resume_from_checkpoint`. Start from scratch") return False - # read a checkpoint dictionary object from the 'PyTorch-Lightning checkpoint' file at `checkpoint_path` - checkpoint = pl_load(checkpoint_path, map_location=lambda storage, loc: storage) + checkpoint, load_optimizer_states = self.trainer.training_type_plugin.restore_model_state_from_ckpt_path( + checkpoint_path, map_location=lambda storage, loc: storage + ) - # acquire the model model = self.trainer.lightning_module - # restore model and datamodule state - self.restore_model_state(model, checkpoint) - if on_gpu: model.cuda(self.trainer.root_gpu) # restore training state - self.restore_training_state(checkpoint) + self.restore_training_state(checkpoint, load_optimizer_states) rank_zero_info(f"Restored states from the checkpoint file at {checkpoint_path}") return True @@ -123,7 +120,7 @@ def restore_model_state(self, model: LightningModule, checkpoint) -> None: # restore model state_dict model.load_state_dict(checkpoint['state_dict']) - def restore_training_state(self, checkpoint): + def restore_training_state(self, checkpoint, load_optimizer_states: bool = True): """ Restore trainer state. Model will get its change to update @@ -131,7 +128,7 @@ def restore_training_state(self, checkpoint): :return: """ # validation - if 'optimizer_states' not in checkpoint or 'lr_schedulers' not in checkpoint: + if load_optimizer_states and ('optimizer_states' not in checkpoint or 'lr_schedulers' not in checkpoint): raise KeyError( 'Trying to restore training state but checkpoint contains only the model.' ' This is probably due to `ModelCheckpoint.save_weights_only` being set to `True`.' @@ -177,6 +174,9 @@ def restore_training_state(self, checkpoint): " consider using an end of epoch checkpoint." ) + if not load_optimizer_states: + return + # restore the optimizers optimizer_states = checkpoint['optimizer_states'] for optimizer, opt_state in zip(self.trainer.optimizers, optimizer_states): @@ -238,10 +238,8 @@ def hpc_save(self, folderpath: str, logger): def dump_checkpoint(self, weights_only: bool = False) -> dict: """Creating a model checkpoint dictionary object from various component states. - Args: weights_only: saving model weights only - Return: structured dictionary: { 'epoch': training epoch @@ -350,11 +348,9 @@ def hpc_load(self, checkpoint_path: str, on_gpu: bool): def max_ckpt_in_folder(self, dir_path: Union[str, Path], name_key: str = 'ckpt_') -> Optional[int]: """List up files in `dir_path` with `name_key`, then yield maximum suffix number. - Args: dir_path: path of directory which may contain files whose name include `name_key` name_key: file name prefix - Returns: None if no-corresponding-file else maximum suffix number """ diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 78d9602f8e529..d565f0906e59e 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -58,7 +58,6 @@ from pytorch_lightning.trainer.training_tricks import TrainerTrainingTricksMixin from pytorch_lightning.tuner.tuning import Tuner from pytorch_lightning.utilities import DeviceType, rank_zero_warn -from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.debugging import InternalDebugger from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.memory import recursive_detach @@ -982,12 +981,13 @@ def __load_ckpt_weights( ) # only one process running at this point for TPUs, as spawn isn't triggered yet - if self._device_type != DeviceType.TPU: + # todo: move this logic internally within the barrier. + if not self._device_type == DeviceType.TPU: self.training_type_plugin.barrier() - ckpt = pl_load(ckpt_path, map_location=lambda storage, loc: storage) - model.load_state_dict(ckpt['state_dict']) - + self.training_type_plugin.restore_model_state_from_ckpt_path( + ckpt_path, map_location=lambda storage, loc: storage + ) return ckpt_path def predict( @@ -1086,10 +1086,14 @@ def call_setup_hook(self, model: LightningModule) -> None: def call_configure_sharded_model(self, model: LightningModule) -> None: # Call configure sharded model hook if accelerator requests. In some cases # we will not call the hook; the hook has initialized the sharded model for example. - if self.accelerator.call_configure_sharded_model_hook: + + # used on the model if the user re-create a trainer with resume_from_checkpoint + model_call_configure_sharded_model_hook = getattr(model, "call_configure_sharded_model_hook", False) + if self.accelerator.call_configure_sharded_model_hook and not model_call_configure_sharded_model_hook: with self.accelerator.model_sharded_context(): model.configure_sharded_model() self.configure_sharded_model(model) + model.call_configure_sharded_model_hook = True self.accelerator.call_configure_sharded_model_hook = False def call_teardown_hook(self, model: LightningModule) -> None: diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 696f14742935c..4640343710f81 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -771,7 +771,9 @@ def increment_accumulated_grad_global_step(self): # progress global step according to grads progress if num_accumulated_batches_reached or num_training_batches_reached: - self.trainer.global_step += 1 + self.trainer.global_step = self.trainer.accelerator.update_global_step( + self.trainer.total_batch_idx, self.trainer.global_step + ) def _accumulated_batches_reached(self): return (self.trainer.batch_idx + 1) % self.trainer.accumulate_grad_batches == 0 diff --git a/pytorch_lightning/utilities/__init__.py b/pytorch_lightning/utilities/__init__.py index 03981b0042eac..28cb05bc06f2d 100644 --- a/pytorch_lightning/utilities/__init__.py +++ b/pytorch_lightning/utilities/__init__.py @@ -14,6 +14,7 @@ """General utilities""" import numpy + from pytorch_lightning.utilities.apply_func import move_data_to_device # noqa: F401 from pytorch_lightning.utilities.distributed import ( # noqa: F401 AllGatherGrad, diff --git a/pytorch_lightning/utilities/imports.py b/pytorch_lightning/utilities/imports.py index baeac9be57218..001b9a67c5703 100644 --- a/pytorch_lightning/utilities/imports.py +++ b/pytorch_lightning/utilities/imports.py @@ -70,7 +70,6 @@ def _compare_version(package: str, op, version) -> bool: _TORCH_GREATER_EQUAL_1_7 = _compare_version("torch", operator.ge, "1.7.0") _TORCH_GREATER_EQUAL_1_8 = _compare_version("torch", operator.ge, "1.8.0") -_KINETO_AVAILABLE = torch.profiler.kineto_available() if _TORCH_GREATER_EQUAL_1_8 else False _APEX_AVAILABLE = _module_available("apex.amp") _BOLTS_AVAILABLE = _module_available('pl_bolts') _DEEPSPEED_AVAILABLE = not _IS_WINDOWS and _module_available('deepspeed') @@ -80,6 +79,7 @@ def _compare_version(package: str, op, version) -> bool: _HOROVOD_AVAILABLE = _module_available("horovod.torch") _HYDRA_AVAILABLE = _module_available("hydra") _HYDRA_EXPERIMENTAL_AVAILABLE = _module_available("hydra.experimental") +_KINETO_AVAILABLE = torch.profiler.kineto_available() if _TORCH_GREATER_EQUAL_1_8 else False _NATIVE_AMP_AVAILABLE = _module_available("torch.cuda.amp") and hasattr(torch.cuda.amp, "autocast") _OMEGACONF_AVAILABLE = _module_available("omegaconf") _RPC_AVAILABLE = not _IS_WINDOWS and _module_available('torch.distributed.rpc') diff --git a/requirements/extra.txt b/requirements/extra.txt index 715916c4e36ac..cee1fd0eb07e1 100644 --- a/requirements/extra.txt +++ b/requirements/extra.txt @@ -9,3 +9,4 @@ onnxruntime>=1.3.0 hydra-core>=1.0 # todo: when switch to standard package stream, drop `fairscale` from hard mocked docs libs https://github.com/PyTorchLightning/fairscale/archive/pl_1.2.0.zip +deepspeed>=0.3.13 diff --git a/tests/models/test_hooks.py b/tests/models/test_hooks.py index 1d55d4a5a63b7..57af82ccc3e08 100644 --- a/tests/models/test_hooks.py +++ b/tests/models/test_hooks.py @@ -19,7 +19,7 @@ from pytorch_lightning import Callback, Trainer from pytorch_lightning.trainer.states import TrainerState -from tests.helpers import BoringModel, RandomDataset, BoringDataModule +from tests.helpers import BoringDataModule, BoringModel, RandomDataset from tests.helpers.runif import RunIf @@ -515,6 +515,7 @@ def test_trainer_datamodule_hook_system(tmpdir): """Test the LightningDataModule hook system.""" class HookedDataModule(BoringDataModule): + def __init__(self): super().__init__() self.called = [] @@ -574,23 +575,10 @@ def on_after_batch_transfer(self, *args, **kwargs): trainer.fit(model, datamodule=dm) expected = [ - 'prepare_data', - 'setup_fit', - 'val_dataloader', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', - 'train_dataloader', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', - 'val_dataloader', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', + 'prepare_data', 'setup_fit', 'val_dataloader', 'on_before_batch_transfer', 'transfer_batch_to_device', + 'on_after_batch_transfer', 'train_dataloader', 'on_before_batch_transfer', 'transfer_batch_to_device', + 'on_after_batch_transfer', 'on_before_batch_transfer', 'transfer_batch_to_device', 'on_after_batch_transfer', + 'val_dataloader', 'on_before_batch_transfer', 'transfer_batch_to_device', 'on_after_batch_transfer', 'teardown_fit' ] assert dm.called == expected @@ -599,13 +587,8 @@ def on_after_batch_transfer(self, *args, **kwargs): trainer.validate(model, datamodule=dm, verbose=False) expected = [ - 'prepare_data', - 'setup_validate', - 'val_dataloader', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', - 'teardown_validate' + 'prepare_data', 'setup_validate', 'val_dataloader', 'on_before_batch_transfer', 'transfer_batch_to_device', + 'on_after_batch_transfer', 'teardown_validate' ] assert dm.called == expected @@ -613,12 +596,7 @@ def on_after_batch_transfer(self, *args, **kwargs): trainer.test(model, datamodule=dm, verbose=False) expected = [ - 'prepare_data', - 'setup_test', - 'test_dataloader', - 'on_before_batch_transfer', - 'transfer_batch_to_device', - 'on_after_batch_transfer', - 'teardown_test' + 'prepare_data', 'setup_test', 'test_dataloader', 'on_before_batch_transfer', 'transfer_batch_to_device', + 'on_after_batch_transfer', 'teardown_test' ] assert dm.called == expected diff --git a/tests/plugins/test_deepspeed_plugin.py b/tests/plugins/test_deepspeed_plugin.py index e6b15069f256a..b2637a114a09c 100644 --- a/tests/plugins/test_deepspeed_plugin.py +++ b/tests/plugins/test_deepspeed_plugin.py @@ -1,22 +1,37 @@ import json import os +from typing import Any import pytest import torch -from torch import Tensor +import torch.nn.functional as F +from torch import nn, Tensor from torch.optim import Optimizer -from pytorch_lightning import Trainer +from pytorch_lightning import LightningModule, seed_everything, Trainer +from pytorch_lightning.callbacks import Callback, ModelCheckpoint +from pytorch_lightning.metrics import Accuracy from pytorch_lightning.plugins import DeepSpeedPlugin, DeepSpeedPrecisionPlugin from pytorch_lightning.plugins.training_type.deepspeed import LightningDeepSpeedModule from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.helpers.boring_model import BoringModel +from tests.helpers.datamodules import ClassifDataModule from tests.helpers.runif import RunIf +class ModelParallelBoringModel(BoringModel): + + def __init__(self): + super().__init__() + self.linear = None + + def configure_sharded_model(self) -> None: + self.linear = torch.nn.Linear(32, 2) + + def test_deepspeed_lightning_module(tmpdir): """ - Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves types and device correctly. + Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves types and device correctly. """ model = BoringModel() @@ -34,7 +49,7 @@ def test_deepspeed_lightning_module(tmpdir): @RunIf(min_gpus=1) def test_deepspeed_lightning_module_precision(tmpdir): """ - Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves tensors to half when precision 16. + Test to ensure that a model wrapped in `LightningDeepSpeedModule` moves tensors to half when precision 16. """ model = BoringModel() @@ -84,7 +99,7 @@ def deepspeed_zero_config(deepspeed_config): @pytest.mark.parametrize("input", ("deepspeed", DeepSpeedPlugin)) def test_deepspeed_plugin_string(tmpdir, input): """ - Test to ensure that the plugin can be passed via string or instance, and parallel devices is correctly set. + Test to ensure that the plugin can be passed via string or instance, and parallel devices is correctly set. """ trainer = Trainer( @@ -128,8 +143,8 @@ def test_deepspeed_plugin_env(tmpdir, monkeypatch, deepspeed_config): ) def test_deepspeed_precision_choice(amp_backend, tmpdir): """ - Test to ensure precision plugin is also correctly chosen. - DeepSpeed handles precision via Custom DeepSpeedPrecisionPlugin + Test to ensure precision plugin is also correctly chosen. + DeepSpeed handles precision via Custom DeepSpeedPrecisionPlugin """ trainer = Trainer( @@ -160,7 +175,7 @@ def test_deepspeed_with_invalid_config_path(tmpdir): @RunIf(deepspeed=True) def test_deepspeed_with_env_path(tmpdir, monkeypatch, deepspeed_config): """ - Test to ensure if we pass an env variable, we load the config from the path. + Test to ensure if we pass an env variable, we load the config from the path. """ config_path = os.path.join(tmpdir, 'temp.json') with open(config_path, 'w') as f: @@ -180,7 +195,7 @@ def test_deepspeed_defaults(tmpdir): assert isinstance(plugin.config["zero_optimization"], dict) -@RunIf(min_gpus=1, deepspeed=True, special=True) +@RunIf(min_gpus=1, deepspeed=True) def test_invalid_deepspeed_defaults_no_precision(tmpdir): """Test to ensure that using defaults, if precision is not set to 16, we throw an exception.""" model = BoringModel() @@ -218,27 +233,30 @@ def backward(self, loss: Tensor, optimizer: Optimizer, optimizer_idx: int, *args @RunIf(min_gpus=1, deepspeed=True, special=True) def test_deepspeed_run_configure_optimizers(tmpdir): - """Test end to end that deepspeed works with defaults (without ZeRO as that requires compilation), - whilst using configure_optimizers for optimizers and schedulers.""" + """ + Test end to end that deepspeed works with defaults (without ZeRO as that requires compilation), + whilst using configure_optimizers for optimizers and schedulers. + """ - class TestModel(BoringModel): + class TestCB(Callback): - def on_train_start(self) -> None: + def on_train_start(self, trainer, pl_module) -> None: from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer - assert isinstance(self.trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer) - assert isinstance(self.trainer.optimizers[0].optimizer, torch.optim.SGD) - assert self.trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally + assert isinstance(trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer) + assert isinstance(trainer.optimizers[0].optimizer, torch.optim.SGD) + assert trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally # Ensure DeepSpeed engine has initialized with our optimizer/lr_scheduler - assert isinstance(self.trainer.model.lr_scheduler, torch.optim.lr_scheduler.StepLR) + assert isinstance(trainer.model.lr_scheduler, torch.optim.lr_scheduler.StepLR) - model = TestModel() + model = BoringModel() trainer = Trainer( plugins=DeepSpeedPlugin(), # disable ZeRO so our optimizers are not wrapped default_root_dir=tmpdir, gpus=1, fast_dev_run=True, precision=16, + callbacks=[TestCB()] ) trainer.fit(model) @@ -249,29 +267,30 @@ def on_train_start(self) -> None: @RunIf(min_gpus=1, deepspeed=True, special=True) def test_deepspeed_config(tmpdir, deepspeed_zero_config): """ - Test to ensure deepspeed works correctly when passed a DeepSpeed config object including optimizers/schedulers - and saves the model weights to load correctly. + Test to ensure deepspeed works correctly when passed a DeepSpeed config object including optimizers/schedulers + and saves the model weights to load correctly. """ - class TestModel(BoringModel): + class TestCB(Callback): - def on_train_start(self) -> None: + def on_train_start(self, trainer, pl_module) -> None: from deepspeed.runtime.lr_schedules import WarmupLR from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer - assert isinstance(self.trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer) - assert isinstance(self.trainer.optimizers[0].optimizer, torch.optim.SGD) - assert self.trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally + assert isinstance(trainer.optimizers[0], FP16_DeepSpeedZeroOptimizer) + assert isinstance(trainer.optimizers[0].optimizer, torch.optim.SGD) + assert trainer.lr_schedulers == [] # DeepSpeed manages LR scheduler internally # Ensure DeepSpeed engine has initialized with our optimizer/lr_scheduler - assert isinstance(self.trainer.model.lr_scheduler, WarmupLR) + assert isinstance(trainer.model.lr_scheduler, WarmupLR) - model = TestModel() + model = BoringModel() trainer = Trainer( plugins=[DeepSpeedPlugin(config=deepspeed_zero_config)], default_root_dir=tmpdir, gpus=1, fast_dev_run=True, precision=16, + callbacks=[TestCB()] ) trainer.fit(model) @@ -284,54 +303,72 @@ def on_train_start(self) -> None: def test_deepspeed_custom_precision_params(tmpdir): """Ensure if we modify the FP16 parameters via the DeepSpeedPlugin, the deepspeed config contains these changes.""" - class TestModel(BoringModel): + class TestCB(Callback): - def on_train_start(self) -> None: - assert self.trainer.training_type_plugin.config['fp16']['loss_scale'] == 10 - assert self.trainer.training_type_plugin.config['fp16']['initial_scale_power'] == 10 - assert self.trainer.training_type_plugin.config['fp16']['loss_scale_window'] == 10 - assert self.trainer.training_type_plugin.config['fp16']['hysteresis'] == 10 - assert self.trainer.training_type_plugin.config['fp16']['min_loss_scale'] == 10 + def on_train_start(self, trainer, pl_module) -> None: + assert trainer.training_type_plugin.config['fp16']['loss_scale'] == 10 + assert trainer.training_type_plugin.config['fp16']['initial_scale_power'] == 10 + assert trainer.training_type_plugin.config['fp16']['loss_scale_window'] == 10 + assert trainer.training_type_plugin.config['fp16']['hysteresis'] == 10 + assert trainer.training_type_plugin.config['fp16']['min_loss_scale'] == 10 raise SystemExit() - model = TestModel() + model = BoringModel() ds = DeepSpeedPlugin(loss_scale=10, initial_scale_power=10, loss_scale_window=10, hysteresis=10, min_loss_scale=10) - trainer = Trainer(default_root_dir=tmpdir, plugins=[ds], precision=16, gpus=1) + trainer = Trainer(default_root_dir=tmpdir, plugins=[ds], precision=16, gpus=1, callbacks=[TestCB()]) with pytest.raises(SystemExit): trainer.fit(model) -@RunIf(min_gpus=1, deepspeed=True, special=True) +@RunIf(deepspeed=True) +def test_deepspeed_custom_activation_checkpointing_params(tmpdir): + """Ensure if we modify the activation checkpointing parameters, the deepspeed config contains these changes.""" + ds = DeepSpeedPlugin( + partition_activations=True, + cpu_checkpointing=True, + contiguous_memory_optimization=True, + synchronize_checkpoint_boundary=True + ) + checkpoint_config = ds.config['activation_checkpointing'] + assert checkpoint_config['partition_activations'] + assert checkpoint_config['cpu_checkpointing'] + assert checkpoint_config['contiguous_memory_optimization'] + assert checkpoint_config['synchronize_checkpoint_boundary'] + + +@RunIf(min_gpus=1, deepspeed=True) def test_deepspeed_assert_config_zero_offload_disabled(tmpdir, deepspeed_zero_config): """Ensure if we use a config and turn off cpu_offload, that this is set to False within the config.""" deepspeed_zero_config['zero_optimization']['cpu_offload'] = False - class TestModel(BoringModel): + class TestCallback(Callback): - def on_train_start(self) -> None: - assert self.trainer.training_type_plugin.config['zero_optimization']['cpu_offload'] is False + def on_before_accelerator_backend_setup(self, trainer, pl_module) -> None: + assert trainer.training_type_plugin.config['zero_optimization']['cpu_offload'] is False raise SystemExit() - model = TestModel() + model = BoringModel() trainer = Trainer( + max_epochs=1, plugins=[DeepSpeedPlugin(config=deepspeed_zero_config)], precision=16, gpus=1, default_root_dir=tmpdir, + callbacks=[TestCallback()] ) with pytest.raises(SystemExit): trainer.fit(model) -@RunIf(min_gpus=2, special=True, deepspeed=True) +@RunIf(min_gpus=2, deepspeed=True, special=True) def test_deepspeed_multigpu(tmpdir, deepspeed_config): """ - Test to ensure that DeepSpeed with multiple GPUs works, without ZeRO Optimization as this requires compilation. + Test to ensure that DeepSpeed with multiple GPUs works, without ZeRO Optimization as this requires compilation. """ model = BoringModel() trainer = Trainer( - plugins=[DeepSpeedPlugin()], + plugins=[DeepSpeedPlugin(zero_optimization=False, stage=2)], default_root_dir=tmpdir, gpus=2, fast_dev_run=True, @@ -343,12 +380,184 @@ def test_deepspeed_multigpu(tmpdir, deepspeed_config): _assert_save_model_is_equal(model, tmpdir, trainer) -def _assert_save_model_is_equal(model, tmpdir, trainer): +class ModelParallelClassificationModel(LightningModule): + + def __init__(self, lr: float = 0.01, num_blocks: int = 5): + super().__init__() + self.lr = lr + self.num_blocks = num_blocks + + self.train_acc = Accuracy() + self.valid_acc = Accuracy() + self.test_acc = Accuracy() + + def make_block(self): + return nn.Sequential(nn.Linear(32, 32, bias=False), nn.ReLU()) + + def configure_sharded_model(self) -> None: + self.model = nn.Sequential(*(self.make_block() for x in range(self.num_blocks)), nn.Linear(32, 3)) + + def forward(self, x): + x = self.model(x) + # Ensure output is in float32 for softmax operation + x = x.float() + logits = F.softmax(x, dim=1) + return logits + + def training_step(self, batch, batch_idx): + x, y = batch + logits = self.forward(x) + loss = F.cross_entropy(logits, y) + self.log('train_loss', loss, prog_bar=True) + self.log('train_acc', self.train_acc(logits, y), prog_bar=True, sync_dist=True) + return {"loss": loss} + + def validation_step(self, batch, batch_idx): + x, y = batch + logits = self.forward(x) + self.log('val_loss', F.cross_entropy(logits, y), prog_bar=False, sync_dist=True) + self.log('val_acc', self.valid_acc(logits, y), prog_bar=True, sync_dist=True) + + def test_step(self, batch, batch_idx): + x, y = batch + logits = self.forward(x) + self.log('test_loss', F.cross_entropy(logits, y), prog_bar=False, sync_dist=True) + self.log('test_acc', self.test_acc(logits, y), prog_bar=True, sync_dist=True) + + def predict_step(self, batch, batch_idx, dataloader_idx=None): + x, y = batch + logits = self.forward(x) + self.test_acc(logits, y) + return self.test_acc.compute() + + def configure_optimizers(self): + optimizer = torch.optim.Adam(self.parameters(), lr=self.lr) + + lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99) + return [optimizer], [{ + 'scheduler': lr_scheduler, + 'interval': 'step', + }] + + +@RunIf(min_gpus=2, deepspeed=True, special=True) +def test_deepspeed_multigpu_stage_3(tmpdir, deepspeed_config): + """ + Test to ensure ZeRO Stage 3 works with a parallel model. + """ + model = ModelParallelBoringModel() + trainer = Trainer( + plugins=[DeepSpeedPlugin(stage=3)], + default_root_dir=tmpdir, + gpus=2, + fast_dev_run=True, + precision=16, + ) + trainer.fit(model) + trainer.test(model) + + # todo (tchaton) Currently load_from_checkpoint is not support for zero-v3 + # _assert_save_model_is_equal(model, tmpdir, trainer) + + +@RunIf(min_gpus=2, deepspeed=True, special=True) +def test_deepspeed_multigpu_stage_3_checkpointing(tmpdir): + """ + Test to ensure with Stage 3 and multiple GPUs that we can save/load a model resuming from a checkpoint, + and see convergence. + """ + seed_everything(42) + model = ModelParallelClassificationModel() + dm = ClassifDataModule() + ck = ModelCheckpoint(monitor="val_acc", mode="max", save_last=True, save_top_k=-1) + trainer = Trainer( + max_epochs=10, + plugins=[DeepSpeedPlugin(stage=3)], + default_root_dir=tmpdir, + gpus=2, + precision=16, + accumulate_grad_batches=2, + callbacks=[ck] + ) + trainer.fit(model, datamodule=dm) + + results = trainer.test(model, datamodule=dm) + assert results[0]['test_acc'] > 0.7 + + saved_results = trainer.test(ckpt_path=ck.best_model_path, datamodule=dm) + assert saved_results[0]['test_acc'] > 0.7 + assert saved_results == results + + trainer = Trainer( + max_epochs=10, + plugins=[DeepSpeedPlugin(stage=3)], + default_root_dir=tmpdir, + gpus=2, + precision=16, + accumulate_grad_batches=2, + callbacks=[ck], + resume_from_checkpoint=ck.best_model_path + ) + results = trainer.test(model, datamodule=dm) + assert results[0]['test_acc'] > 0.7 + + dm.predict_dataloader = dm.test_dataloader + results = trainer.predict(datamodule=dm) + assert results[-1] > 0.7 + + +@RunIf(min_gpus=2, deepspeed=True, special=True) +@pytest.mark.parametrize('cpu_offload', [True, False]) +def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, cpu_offload): + """ + Test to ensure with Stage 2 and multiple GPUs, accumulated grad batches works. + """ + seed_everything(42) + + class VerificationCallback(Callback): + + def on_train_batch_start( + self, trainer, pl_module: LightningModule, batch: Any, batch_idx: int, dataloader_idx: int + ) -> None: + deepspeed_engine = trainer.training_type_plugin.model + assert trainer.global_step == deepspeed_engine.global_steps + + model = ModelParallelClassificationModel() + dm = ClassifDataModule() + trainer = Trainer( + max_epochs=5, + plugins=[DeepSpeedPlugin(stage=2, cpu_offload=cpu_offload)], + gpus=2, + limit_val_batches=2, + precision=16, + accumulate_grad_batches=2, + callbacks=[VerificationCallback()] + ) + trainer.fit(model, datamodule=dm) + + +@RunIf(min_gpus=2, deepspeed=True, special=True) +def test_deepspeed_multigpu_test(tmpdir, deepspeed_config): + """ + Test to ensure we can use DeepSpeed with just test using ZeRO Stage 3. + """ + model = ModelParallelBoringModel() + trainer = Trainer( + plugins=[DeepSpeedPlugin(stage=3)], + default_root_dir=tmpdir, + gpus=2, + fast_dev_run=True, + precision=16, + ) + trainer.test(model) + + +def _assert_save_model_is_equal(model, tmpdir, trainer, cls=BoringModel): checkpoint_path = os.path.join(tmpdir, 'model.pt') trainer.save_checkpoint(checkpoint_path) # carry out the check only on rank 0 if trainer.global_rank == 0: - saved_model = BoringModel.load_from_checkpoint(checkpoint_path) + saved_model = cls.load_from_checkpoint(checkpoint_path) if model.dtype == torch.half: saved_model = saved_model.half() # model is loaded in float32 as default, move it to float16 model = model.cpu() diff --git a/tests/plugins/test_double_plugin.py b/tests/plugins/test_double_plugin.py index f089b1c23149e..175ca5ecaba6b 100644 --- a/tests/plugins/test_double_plugin.py +++ b/tests/plugins/test_double_plugin.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest - import torch from torch.utils.data import DataLoader, Dataset @@ -107,10 +106,7 @@ def predict_dataloader(self): return DataLoader(RandomDataset(32, 64)) -@pytest.mark.parametrize( - 'boring_model', - (DoublePrecisionBoringModel, DoublePrecisionBoringModelNoForward) -) +@pytest.mark.parametrize('boring_model', (DoublePrecisionBoringModel, DoublePrecisionBoringModelNoForward)) def test_double_precision(tmpdir, boring_model): model = boring_model() original_training_step = model.training_step diff --git a/tests/special_tests.sh b/tests/special_tests.sh index aa5d65844a1c5..cf81700291b8d 100755 --- a/tests/special_tests.sh +++ b/tests/special_tests.sh @@ -52,6 +52,14 @@ for i in "${!files_arr[@]}"; do break fi + # SPECIAL_PATTERN allows filtering the tests to run when debugging. + # use as `SPECIAL_PATTERN="foo_bar" ./special_tests.sh` to run only those + # test with `foo_bar` in their name + if [[ $line != *$SPECIAL_PATTERN* ]]; then + report+="Skipped\t$file:$lineno::$test_name\n" + break + fi + # run the test report+="Ran\t$file:$lineno::$test_name\n" python ${defaults} "${file}::${test_name}" diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index ee93ca59eca76..9c3ee6ceeef5b 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -1475,7 +1475,7 @@ def test_trainer_predict_dp(tmpdir, num_gpus): predict(tmpdir, "dp", num_gpus, None) -@RunIf(min_gpus=2, special=True) +@RunIf(min_gpus=2, special=True, fairscale=True) def test_trainer_predict_ddp(tmpdir): predict(tmpdir, "ddp", 2, None, plugins=["ddp_sharded"])