Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CheckpointIO Plugin #8743

Merged
merged 40 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a93e452
poc API
Jul 19, 2021
72f4dfd
Merge branch 'master' into feat/ckpt_plugin
Aug 5, 2021
e7d2b66
Fix up the API, unsure on connection
Aug 5, 2021
b41e794
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 5, 2021
9161980
Example API
Aug 5, 2021
7aa4e8c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 5, 2021
dffe088
Update all constructors
Aug 5, 2021
cacf0e5
Move towards having the checkpoint plugin not require the plugin, and…
Aug 6, 2021
028ac38
Remove import
Aug 6, 2021
99c7a46
Fix tests
Aug 6, 2021
3adc486
Change name
Aug 6, 2021
b7d5b55
Cleanups
Aug 9, 2021
0a0a068
Fixes/Cleanups
Aug 9, 2021
97fb2a2
Use property
Aug 9, 2021
402156e
Fixes to signature
Aug 9, 2021
5310a7f
Merge branch 'master' into feat/ckpt_plugin
Aug 9, 2021
d7f567a
Add warning for TPU plugins that they do not support custom checkpoin…
Aug 9, 2021
fcc24b4
Cleanup API, introduce storage options
Aug 10, 2021
4421276
Update signature to be more general
Aug 10, 2021
d84cce1
Address feedback, add test for support check
Aug 11, 2021
38c22a2
Merge branch 'master' into feat/ckpt_plugin
Aug 11, 2021
b7f37ee
Add CHANGELOG.md
Aug 11, 2021
49086cc
fix tests
Aug 11, 2021
936f65a
change name
Aug 11, 2021
049a676
Fix mypy
Aug 11, 2021
1ff0912
Reviews
Aug 12, 2021
1841d3b
Add ability to pass checkpoint plugin through the trainer
Aug 12, 2021
b909dfe
Add constraints
Aug 12, 2021
50b11b5
Match signature to see if mypy works
Aug 12, 2021
9e16e34
Address review points
Aug 13, 2021
642e6fa
Revert changes to typing
Aug 13, 2021
5c9e973
Add docs/doc strings and API
Aug 13, 2021
6361c87
Address feedback
Aug 13, 2021
c921dbb
Update pytorch_lightning/plugins/training_type/training_type_plugin.py
Aug 13, 2021
fd82276
Address reviews
Aug 13, 2021
2fc3558
Update typing
Aug 13, 2021
21783f6
Refactor name
Aug 13, 2021
3b8c3f5
Clear up signature of function; checkpoint_plugin -> checkpoint_io
Aug 13, 2021
9cfe98f
Slightly cleaner
Aug 13, 2021
8f234e0
Address reviews
Aug 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
* Added `FastForwardSampler` and `CaptureIterableDataset` injection to data loading utilities ([#8366](https://github.com/PyTorchLightning/pytorch-lightning/pull/8366))


- Added `CheckpointIOPlugin` to expose checkpoint IO from training type plugin ([#8743](https://github.com/PyTorchLightning/pytorch-lightning/pull/8743))


### Changed

- Parsing of the `gpus` Trainer argument has changed: `gpus="n"` (str) no longer selects the GPU index n and instead selects the first n devices. ([#8770](https://github.com/PyTorchLightning/pytorch-lightning/pull/8770))
Expand Down
4 changes: 4 additions & 0 deletions pytorch_lightning/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pytorch_lightning.plugins.base_plugin import Plugin
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.checkpoint.torch import TorchCheckpointIOPlugin
from pytorch_lightning.plugins.plugins_registry import ( # noqa: F401
call_training_type_register_plugins,
TrainingTypePluginsRegistry,
Expand Down Expand Up @@ -29,6 +31,8 @@
from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin

__all__ = [
"CheckpointIOPlugin",
"TorchCheckpointIOPlugin",
"ApexMixedPrecisionPlugin",
"DataParallelPlugin",
"DDP2Plugin",
Expand Down
2 changes: 2 additions & 0 deletions pytorch_lightning/plugins/checkpoint/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin # noqa: F401
from pytorch_lightning.plugins.checkpoint.torch import TorchCheckpointIOPlugin # noqa: F401
33 changes: 33 additions & 0 deletions pytorch_lightning/plugins/checkpoint/checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
from pathlib import Path
from typing import Any, Dict, Optional, TypeVar, Union

TSaveStorageOptions = TypeVar("TSaveStorageOptions")
TLoadStorageOptions = TypeVar("TLoadStorageOptions")


class CheckpointIOPlugin(ABC):
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
def save_checkpoint(
self, checkpoint: Dict[str, Any], path: Union[str, Path], storage_options: Optional[TSaveStorageOptions] = None
) -> None:
"""Save model/training states as a checkpoint file through state-dump and file-write.

Args:
checkpoint: dict containing model and trainer state
path: write-target path
storage_options: Optional parameters when saving the model/training states.
"""

@abstractmethod
def load_checkpoint(
self, path: Union[str, Path], storage_options: Optional[TLoadStorageOptions] = None
) -> Dict[str, Any]:
"""
Load checkpoint from a path when resuming or loading ckpt for test/validate/predict stages.
Args:
path: Path to checkpoint
storage_options: Optional parameters when loading the model/training states.

Returns: The loaded checkpoint.
"""
27 changes: 27 additions & 0 deletions pytorch_lightning/plugins/checkpoint/torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path
from typing import Any, Dict, Optional, Union

import pytorch_lightning as pl
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin, TLoadStorageOptions, TSaveStorageOptions
from pytorch_lightning.utilities import rank_zero_warn
from pytorch_lightning.utilities.cloud_io import atomic_save
from pytorch_lightning.utilities.cloud_io import load as pl_load


class TorchCheckpointIOPlugin(CheckpointIOPlugin):
def save_checkpoint(
self, checkpoint: Dict[str, Any], path: Union[str, Path], storage_options: Optional[TSaveStorageOptions] = None
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
try:
# write the checkpoint dictionary on the file
atomic_save(checkpoint, path)
except AttributeError as err:
key = pl.LightningModule.CHECKPOINT_HYPER_PARAMS_KEY
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
checkpoint.pop(key, None)
rank_zero_warn(f"Warning, `{key}` dropped from checkpoint. An attribute is not picklable: {err}")
atomic_save(checkpoint, path)

def load_checkpoint(
self, path: Union[str, Path], map_location: Optional[TLoadStorageOptions] = lambda storage, loc: storage
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
) -> Dict[str, Any]:
return pl_load(path, map_location=map_location)
10 changes: 8 additions & 2 deletions pytorch_lightning/plugins/training_type/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from pytorch_lightning.distributed import LightningDistributed
from pytorch_lightning.overrides import LightningDistributedModule
from pytorch_lightning.overrides.distributed import prepare_for_backward
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin
from pytorch_lightning.utilities import (
Expand Down Expand Up @@ -75,14 +76,19 @@ def __init__(
self,
parallel_devices: Optional[List[torch.device]] = None,
num_nodes: Optional[int] = None,
cluster_environment: ClusterEnvironment = None,
cluster_environment: Optional[ClusterEnvironment] = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
sync_batchnorm: Optional[bool] = None,
ddp_comm_state: Optional[object] = None,
ddp_comm_hook: Optional[callable] = None,
ddp_comm_wrapper: Optional[callable] = None,
**kwargs: Union[Any, Dict[str, Any]],
) -> None:
super().__init__(parallel_devices=parallel_devices, cluster_environment=cluster_environment)
super().__init__(
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
checkpoint_plugin=checkpoint_plugin,
)
self.interactive_ddp_procs = []
if num_nodes is not None:
rank_zero_deprecation(
Expand Down
8 changes: 7 additions & 1 deletion pytorch_lightning/plugins/training_type/ddp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.overrides import LightningDistributedModule
from pytorch_lightning.overrides.distributed import prepare_for_backward
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin
from pytorch_lightning.trainer.states import TrainerFn
Expand Down Expand Up @@ -63,13 +64,18 @@ def __init__(
parallel_devices: Optional[List[torch.device]] = None,
num_nodes: Optional[int] = None,
cluster_environment: ClusterEnvironment = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
sync_batchnorm: Optional[bool] = None,
ddp_comm_state: Optional[object] = None,
ddp_comm_hook: Optional[callable] = None,
ddp_comm_wrapper: Optional[callable] = None,
**kwargs: Any,
):
super().__init__(parallel_devices=parallel_devices, cluster_environment=cluster_environment)
super().__init__(
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
checkpoint_plugin=checkpoint_plugin,
)
if num_nodes is not None:
rank_zero_deprecation(
"Argument `num_nodes` in `DDPSpawnPlugin` is deprecated in v1.4, and will be removed in v1.6. "
Expand Down
19 changes: 15 additions & 4 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pytorch_lightning as pl
from pytorch_lightning.callbacks import GradientAccumulationScheduler
from pytorch_lightning.overrides.base import _LightningModuleWrapperBase
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.ddp import DDPPlugin
from pytorch_lightning.trainer.optimizers import _get_default_scheduler_config
Expand Down Expand Up @@ -274,8 +275,11 @@ def __init__(
pin_memory = cpu_offload_use_pin_memory

super().__init__(
parallel_devices=parallel_devices, num_nodes=num_nodes, cluster_environment=cluster_environment
parallel_devices=parallel_devices,
num_nodes=num_nodes,
cluster_environment=cluster_environment,
)

self.config = self._load_config(config)
if self.config is None:
# User has not overridden config, set defaults
Expand Down Expand Up @@ -679,12 +683,11 @@ def save_checkpoint(self, checkpoint: Dict, filepath: str) -> None:
filepath: write-target file's path
"""
if self.zero_stage_3 and self._multi_device and self.is_global_zero:
# todo (sean): Add link to docs once docs are merged.
warning_cache.warn(
"When saving the DeepSpeed Stage 3 checkpoint, "
"each worker will save a shard of the checkpoint within a directory. "
"If a single file is required after training, "
"see https://pytorch-lightning.readthedocs.io/en/latest/advanced/advanced_gpu.html#"
"deepspeed-zero-stage-3-single-file for instructions."
"If a single file is required after training, see <TODO> for instructions."
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
)
# Use deepspeed's internal checkpointing function to handle partitioned weights across processes
# dump states as a checkpoint dictionary object
Expand Down Expand Up @@ -818,3 +821,11 @@ def register_plugins(cls, plugin_registry: Dict) -> None:
offload_params_device="nvme",
offload_optimizer_device="nvme",
)

@property
def checkpoint_plugin(self) -> CheckpointIOPlugin:
return self._checkpoint_plugin

@checkpoint_plugin.setter
def checkpoint_plugin(self, plugin: CheckpointIOPlugin) -> None:
raise MisconfigurationException("DeepSpeed currently does not support custom checkpoint plugins.")
11 changes: 9 additions & 2 deletions pytorch_lightning/plugins/training_type/dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from torch.nn import DataParallel

from pytorch_lightning.overrides.data_parallel import LightningParallelModule
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin
from pytorch_lightning.utilities.apply_func import apply_to_collection
from pytorch_lightning.utilities.model_helpers import is_overridden
Expand All @@ -29,8 +30,14 @@ class DataParallelPlugin(ParallelPlugin):
device and each gets a split of the data.
"""

def __init__(self, parallel_devices: Optional[List[torch.device]]):
super().__init__(parallel_devices=parallel_devices, cluster_environment=None)
def __init__(
self,
parallel_devices: Optional[List[torch.device]],
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
):
super().__init__(
parallel_devices=parallel_devices, cluster_environment=None, checkpoint_plugin=checkpoint_plugin
)

@property
def global_rank(self) -> int:
Expand Down
17 changes: 8 additions & 9 deletions pytorch_lightning/plugins/training_type/fully_sharded.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from typing import Any, Dict, Generator, List, Optional, Union
from typing import Dict, Generator, List, Optional

import torch
from torch import Tensor

from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.ddp import DDPPlugin
from pytorch_lightning.utilities import _FAIRSCALE_FULLY_SHARDED_AVAILABLE
Expand All @@ -41,6 +41,7 @@ def __init__(
state_dict_to_cpu: bool = True,
parallel_devices: Optional[List[torch.device]] = None,
cluster_environment: ClusterEnvironment = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
):
"""
Plugin for Fully Sharded Data Parallel provided by FairScale.
Expand Down Expand Up @@ -89,7 +90,11 @@ def __init__(
(Defautl: True).
"""

super().__init__(parallel_devices=parallel_devices, cluster_environment=cluster_environment)
super().__init__(
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
checkpoint_plugin=checkpoint_plugin,
)
self.cpu_offload = cpu_offload
self.move_grads_to_cpu = move_grads_to_cpu
self.flatten_parameters = flatten_parameters
Expand Down Expand Up @@ -169,12 +174,6 @@ def model_to_device(self) -> None:
# ensure we update the device type in the lightning module
self.lightning_module.to(self.root_device)

def lightning_module_state_dict(self) -> Dict[str, Union[Any, Tensor]]:
# Currently it is same as default TrainingTypePlugin, i.e. return
# the full state dict for FSDP, in the future, we will provide sharded
# state dict.
return super().lightning_module_state_dict()

@property
def setup_optimizers_in_pre_dispatch(self) -> bool:
# Setup optimizers after the Fully Sharded Model has been made
Expand Down
11 changes: 9 additions & 2 deletions pytorch_lightning/plugins/training_type/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from torch.optim.lr_scheduler import _LRScheduler

from pytorch_lightning.core.optimizer import LightningOptimizer
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin
from pytorch_lightning.utilities import _HOROVOD_AVAILABLE
from pytorch_lightning.utilities.distributed import distributed_available
Expand All @@ -33,8 +34,14 @@
class HorovodPlugin(ParallelPlugin):
"""Plugin for Horovod distributed training integration."""

def __init__(self, parallel_devices: Optional[List[torch.device]] = None):
super().__init__(parallel_devices=parallel_devices, cluster_environment=None)
def __init__(
self,
parallel_devices: Optional[List[torch.device]] = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
):
super().__init__(
parallel_devices=parallel_devices, cluster_environment=None, checkpoint_plugin=checkpoint_plugin
)
rank_zero_only.rank = self.global_rank

@property
Expand Down
8 changes: 7 additions & 1 deletion pytorch_lightning/plugins/training_type/ipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pytorch_lightning as pl
from pytorch_lightning.callbacks import GradientAccumulationScheduler
from pytorch_lightning.overrides.base import _LightningModuleWrapperBase
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin
from pytorch_lightning.trainer.states import RunningStage
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
autoreport_dir: Optional[str] = None,
parallel_devices: Optional[List[torch.device]] = None,
cluster_environment: Optional[ClusterEnvironment] = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
training_opts: Optional["poptorch.Options"] = None,
inference_opts: Optional["poptorch.Options"] = None,
) -> None:
Expand All @@ -83,7 +85,11 @@ def __init__(
inference_opts: Optional ``poptorch.Options`` to override the default
created options for validation/testing and predicting.
"""
super().__init__(parallel_devices, cluster_environment)
super().__init__(
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
checkpoint_plugin=checkpoint_plugin,
)
if not _POPTORCH_AVAILABLE or not poptorch.ipuHardwareIsAvailable():
raise MisconfigurationException(
"The IPU Accelerator requires IPU devices to run. "
Expand Down
4 changes: 3 additions & 1 deletion pytorch_lightning/plugins/training_type/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import pytorch_lightning as pl
from pytorch_lightning.overrides.base import unwrap_lightning_module
from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin
from pytorch_lightning.utilities import _XLA_AVAILABLE
Expand All @@ -34,8 +35,9 @@ def __init__(
self,
parallel_devices: Optional[List[torch.device]] = None,
cluster_environment: Optional[ClusterEnvironment] = None,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
):
super().__init__()
super().__init__(checkpoint_plugin)
self.parallel_devices = parallel_devices
self.cluster_environment = cluster_environment

Expand Down
9 changes: 7 additions & 2 deletions pytorch_lightning/plugins/training_type/single_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@

import torch

from pytorch_lightning.plugins.checkpoint.checkpoint import CheckpointIOPlugin
from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin
from pytorch_lightning.utilities import _XLA_AVAILABLE


class SingleDevicePlugin(TrainingTypePlugin):
"""Plugin that handles communication on a single device."""

def __init__(self, device: torch.device):
super().__init__()
def __init__(
self,
device: torch.device,
checkpoint_plugin: Optional[CheckpointIOPlugin] = None,
):
super().__init__(checkpoint_plugin)
self.device: torch.device = device
self.global_rank = 0
self.local_rank = 0
Expand Down
Loading