From 3f69e5eac6e2f2c68d9a2c8abeece96adb8dd44d Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Thu, 24 Aug 2023 22:23:55 +0300 Subject: [PATCH 1/7] fix --- src/super_gradients/common/deprecate.py | 61 +++++++ .../environment/checkpoints_dir_utils.py | 4 +- .../common/environment/ddp_utils.py | 163 +++++++++++++++++- .../common/environment/omegaconf_utils.py | 3 +- .../training/dataloaders/dataloaders.py | 2 +- .../training/datasets/datasets_utils.py | 2 +- .../training/losses/ppyolo_loss.py | 4 +- .../training/metrics/detection_metrics.py | 3 +- .../metrics/pose_estimation_metrics.py | 3 +- .../training/models/__init__.py | 14 +- .../models/detection_models/csp_resnet.py | 3 +- .../training/sg_trainer/sg_trainer.py | 6 +- .../training/utils/checkpoint_utils.py | 3 +- .../utils/distributed_training_utils.py | 32 ++-- .../training/utils/quantization/calibrator.py | 2 +- 15 files changed, 262 insertions(+), 43 deletions(-) create mode 100644 src/super_gradients/common/deprecate.py diff --git a/src/super_gradients/common/deprecate.py b/src/super_gradients/common/deprecate.py new file mode 100644 index 0000000000..a88c3203ec --- /dev/null +++ b/src/super_gradients/common/deprecate.py @@ -0,0 +1,61 @@ +import warnings +from typing import Optional +from functools import wraps + + +def make_function_deprecated(version: str, new_function_name: Optional[str] = None, new_module_name: Optional[str] = None): + """ + Decorator to mark a function as deprecated, providing a warning message with the version number when it will be removed, and how to replace it. + + :param version: Version number when the function will be removed. + :param new_function_name: New name of the function if it has been renamed. + :param new_module_name: Module where the function has been moved. If None, this will be set as the + + Example usage: + >> @make_deprecated('4.0.0', new_function_name='new_get_local_rank', new_module_name='new.module.path') + >> def get_local_rank(): + >> from new_module import get_local_rank as _get_local_rank + >> return _get_local_rank() + + >> from deprecated_module import get_local_rank + >> get_local_rank() + DeprecationWarning: You are using `deprecated_module.get_local_rank` which is deprecated and will be removed in 4.0.0. + Please update your code to import it as follows: + [-] from deprecated_module import get_local_rank + [+] from new.module.path import new_get_local_rank + """ + + def decorator(old_func): + @wraps(old_func) + def wrapper(*args, **kwargs): + if not wrapper._warned: + new_name = new_function_name or old_func.__name__ + new_module = new_module_name or old_func.__module__ + reason = ( + f"You are using `{old_func.__module__}.{old_func.__name__}` which is deprecated and will be removed in {version}.\n" + f"Please update your code to import it as follows:\n" + f" [-] from {old_func.__module__} import {old_func.__name__}\n" + f" [+] from {new_module} import {new_name}\n." + ) + warnings.warn(reason, DeprecationWarning, stacklevel=2) + wrapper._warned = True + + return old_func(*args, **kwargs) + + # Each decorated object will have its own _warned state + # This state ensures that the warning will appear only once, to avoid polluting the console in case the function is called too often. + wrapper._warned = False + return wrapper + + return decorator + + +def make_deprecated(func, reason): + def inner(*args, **kwargs): + with warnings.catch_warnings(): + warnings.simplefilter("once", DeprecationWarning) + warnings.warn(reason, category=DeprecationWarning, stacklevel=2) + warnings.warn(reason, DeprecationWarning) + return func(*args, **kwargs) + + return inner diff --git a/src/super_gradients/common/environment/checkpoints_dir_utils.py b/src/super_gradients/common/environment/checkpoints_dir_utils.py index 00beb53149..56b8270c16 100644 --- a/src/super_gradients/common/environment/checkpoints_dir_utils.py +++ b/src/super_gradients/common/environment/checkpoints_dir_utils.py @@ -5,6 +5,7 @@ from datetime import datetime from super_gradients.common.abstractions.abstract_logger import get_logger +from super_gradients.common.environment.ddp_utils import execute_and_distribute_from_master try: @@ -16,6 +17,7 @@ logger = get_logger(__name__) +@execute_and_distribute_from_master def generate_run_id() -> str: """Generate a unique run ID based on the current timestamp. @@ -51,7 +53,7 @@ def get_latest_run_id(experiment_name: str, checkpoints_root_dir: Optional[str] f"Trying to load the n-1 most recent run..." ) else: - return run_dir + return os.path.basename(run_dir) def validate_run_id(run_id: str, experiment_name: str, ckpt_root_dir: Optional[str] = None): diff --git a/src/super_gradients/common/environment/ddp_utils.py b/src/super_gradients/common/environment/ddp_utils.py index e583a9f4ea..0b5238cf8d 100755 --- a/src/super_gradients/common/environment/ddp_utils.py +++ b/src/super_gradients/common/environment/ddp_utils.py @@ -1,6 +1,11 @@ -import os import socket from functools import wraps +import os +import pickle +from typing import Any, List, Callable + +import torch +import torch.distributed as dist from super_gradients.common.environment.device_utils import device_config from super_gradients.common.environment.omegaconf_utils import register_hydra_resolvers @@ -77,3 +82,159 @@ def find_free_port() -> int: sock.bind(("", 0)) _ip, port = sock.getsockname() return port + + +def get_local_rank(): + """ + Returns the local rank if running in DDP, and 0 otherwise + :return: local rank + """ + return dist.get_rank() if dist.is_initialized() else 0 + + +def require_ddp_setup() -> bool: + from super_gradients.common import MultiGPUMode + + return device_config.multi_gpu == MultiGPUMode.DISTRIBUTED_DATA_PARALLEL and device_config.assigned_rank != get_local_rank() + + +def is_ddp_subprocess(): + return torch.distributed.get_rank() > 0 if dist.is_initialized() else False + + +def get_world_size() -> int: + """ + Returns the world size if running in DDP, and 1 otherwise + :return: world size + """ + if not dist.is_available(): + return 1 + if not dist.is_initialized(): + return 1 + return dist.get_world_size() + + +def get_device_ids() -> List[int]: + return list(range(get_world_size())) + + +def count_used_devices() -> int: + return len(get_device_ids()) + + +def execute_and_distribute_from_master(func: Callable[..., Any]) -> Callable[..., Any]: + """ + Decorator to execute a function on the master process and distribute the result to all other processes. + Useful in parallel computing scenarios where a computational task needs to be performed only on the master + node (e.g., a computational-heavy calculation), and the result must be shared with other nodes without + redundant computation. + + Example usage: + >>> @execute_and_distribute_from_master + >>> def some_code_to_run(param1, param2): + >>> return param1 + param2 + + The wrapped function will only be executed on the master node, and the result will be propagated to all + other nodes. + + :param func: The function to be executed on the master process and whose result is to be distributed. + :return: A wrapper function that encapsulates the execute-and-distribute logic. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + # Run the function only if it's the master process + if device_config.assigned_rank <= 0: + result = func(*args, **kwargs) + else: + result = None + + # Broadcast the result from the master process to all nodes + return broadcast_from_master(result) + + return wrapper + + +def broadcast_from_master(data: Any) -> Any: + """ + Broadcast data from master node to all other nodes. This may be required when you + want to compute something only on master node (e.g computational-heavy metric) and + don't want to vaste CPU of other nodes doing same work simultaneously. + + >>> if device_config.assigned_rank <= 0: + >>> result = some_code_to_run(...) + >>> else: + >>> result = None + >>> # 'result' propagated to all nodes from master + >>> result = broadcast_from_master(result) + + :param data: Data to be broadcasted from master node (rank 0) + :return: Data from rank 0 node + """ + world_size = get_world_size() + if world_size == 1: + return data + + local_rank = get_local_rank() + storage: torch.Tensor + + if local_rank == 0: + buffer = pickle.dumps(data) + storage = torch.ByteStorage.from_buffer(buffer) + payload = torch.ByteTensor(storage).to("cuda") + local_size = payload.numel() + else: + local_size = 0 + + # Propagate target tensor size to all nodes + local_size = max(all_gather(local_size)) + if local_rank != 0: + payload = torch.empty((local_size,), dtype=torch.uint8, device="cuda") + + dist.broadcast(payload, 0) + buffer = payload.cpu().numpy().tobytes() + return pickle.loads(buffer) + + +def all_gather(data: Any) -> List[Any]: + """ + Run all_gather on arbitrary picklable data (not necessarily tensors) + :param data: Any picklable object + :return: List of data gathered from each rank + """ + world_size = get_world_size() + if world_size == 1: + return [data] + + # serialized to a Tensor + buffer = pickle.dumps(data) + try: + storage = torch.UntypedStorage.from_buffer(buffer, dtype=torch.uint8) + except AttributeError: + storage = torch._UntypedStorage.from_buffer(buffer, dtype=torch.uint8) + tensor = torch.ByteTensor(storage).to("cuda") + + # obtain Tensor size of each rank + local_size = torch.tensor([tensor.numel()], device="cuda") + size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] + dist.all_gather(size_list, local_size) + size_list = [int(size.item()) for size in size_list] + max_size = max(size_list) + + # receiving Tensor from all ranks + # we pad the tensor because torch all_gather does not support + # gathering tensors of different shapes + tensor_list = [] + for _ in size_list: + tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) + if local_size != max_size: + padding = torch.empty(size=(max_size - local_size,), dtype=torch.uint8, device="cuda") + tensor = torch.cat((tensor, padding), dim=0) + dist.all_gather(tensor_list, tensor) + + data_list = [] + for size, tensor in zip(size_list, tensor_list): + buffer = tensor.cpu().numpy().tobytes()[:size] + data_list.append(pickle.loads(buffer)) + + return data_list diff --git a/src/super_gradients/common/environment/omegaconf_utils.py b/src/super_gradients/common/environment/omegaconf_utils.py index 3c83880368..6eb6f93d37 100644 --- a/src/super_gradients/common/environment/omegaconf_utils.py +++ b/src/super_gradients/common/environment/omegaconf_utils.py @@ -4,7 +4,6 @@ from omegaconf import OmegaConf, DictConfig -from super_gradients.common.environment.checkpoints_dir_utils import get_checkpoints_dir_path from hydra.experimental.callback import Callback @@ -72,6 +71,8 @@ def get_cls(cls_path: str): def hydra_output_dir_resolver(ckpt_root_dir: str, experiment_name: str) -> str: + from super_gradients.common.environment.checkpoints_dir_utils import get_checkpoints_dir_path + return get_checkpoints_dir_path(experiment_name=experiment_name, ckpt_root_dir=ckpt_root_dir) diff --git a/src/super_gradients/training/dataloaders/dataloaders.py b/src/super_gradients/training/dataloaders/dataloaders.py index a506346946..17872e4f61 100644 --- a/src/super_gradients/training/dataloaders/dataloaders.py +++ b/src/super_gradients/training/dataloaders/dataloaders.py @@ -37,8 +37,8 @@ from super_gradients.training.utils import get_param from super_gradients.training.utils.distributed_training_utils import ( wait_for_the_master, - get_local_rank, ) +from super_gradients.common.environment.ddp_utils import get_local_rank from super_gradients.training.utils.utils import override_default_params_without_nones from super_gradients.common.environment.cfg_utils import load_dataset_params import torch.distributed as dist diff --git a/src/super_gradients/training/datasets/datasets_utils.py b/src/super_gradients/training/datasets/datasets_utils.py index da8e0f8326..b48f590626 100755 --- a/src/super_gradients/training/datasets/datasets_utils.py +++ b/src/super_gradients/training/datasets/datasets_utils.py @@ -23,7 +23,7 @@ from super_gradients.common.registry.registry import register_collate_function, register_callback, register_transform from super_gradients.training.datasets.auto_augment import rand_augment_transform from super_gradients.training.utils.detection_utils import DetectionVisualization, Anchors -from super_gradients.training.utils.distributed_training_utils import get_local_rank, get_world_size +from super_gradients.common.environment.ddp_utils import get_local_rank, get_world_size from super_gradients.training.utils.utils import AverageMeter diff --git a/src/super_gradients/training/losses/ppyolo_loss.py b/src/super_gradients/training/losses/ppyolo_loss.py index d5a8dd49ad..e42588f2e7 100644 --- a/src/super_gradients/training/losses/ppyolo_loss.py +++ b/src/super_gradients/training/losses/ppyolo_loss.py @@ -10,9 +10,7 @@ from super_gradients.common.registry.registry import register_loss from super_gradients.training.datasets.data_formats.bbox_formats.cxcywh import cxcywh_to_xyxy from super_gradients.training.utils.bbox_utils import batch_distance2bbox -from super_gradients.training.utils.distributed_training_utils import ( - get_world_size, -) +from super_gradients.common.environment.ddp_utils import get_world_size def batch_iou_similarity(box1: torch.Tensor, box2: torch.Tensor, eps: float = 1e-9) -> float: diff --git a/src/super_gradients/training/metrics/detection_metrics.py b/src/super_gradients/training/metrics/detection_metrics.py index 5078886022..dca113c3da 100755 --- a/src/super_gradients/training/metrics/detection_metrics.py +++ b/src/super_gradients/training/metrics/detection_metrics.py @@ -6,6 +6,7 @@ from torchmetrics import Metric import super_gradients +import super_gradients.common.environment.ddp_utils from super_gradients.common.object_names import Metrics from super_gradients.common.registry.registry import register_metric from super_gradients.training.utils import tensor_container_to_device @@ -222,7 +223,7 @@ def _sync_dist(self, dist_sync_fn=None, process_group=None): :return: """ if self.world_size is None: - self.world_size = torch.distributed.get_world_size() if self.is_distributed else -1 + self.world_size = super_gradients.common.environment.ddp_utils.get_world_size() if self.is_distributed else -1 if self.rank is None: self.rank = torch.distributed.get_rank() if self.is_distributed else -1 diff --git a/src/super_gradients/training/metrics/pose_estimation_metrics.py b/src/super_gradients/training/metrics/pose_estimation_metrics.py index 9675a0dd1d..4a0ac61fd3 100644 --- a/src/super_gradients/training/metrics/pose_estimation_metrics.py +++ b/src/super_gradients/training/metrics/pose_estimation_metrics.py @@ -6,6 +6,7 @@ from torch import Tensor from torchmetrics import Metric +import super_gradients.common.environment.ddp_utils from super_gradients.common.abstractions.abstract_logger import get_logger from super_gradients.common.environment.ddp_utils import is_distributed from super_gradients.common.object_names import Metrics @@ -258,7 +259,7 @@ def _sync_dist(self, dist_sync_fn=None, process_group=None): :return: """ if self.world_size is None: - self.world_size = torch.distributed.get_world_size() if self.is_distributed else -1 + self.world_size = super_gradients.common.environment.ddp_utils.get_world_size() if self.is_distributed else -1 if self.rank is None: self.rank = torch.distributed.get_rank() if self.is_distributed else -1 diff --git a/src/super_gradients/training/models/__init__.py b/src/super_gradients/training/models/__init__.py index 64becb156b..61db5ca613 100755 --- a/src/super_gradients/training/models/__init__.py +++ b/src/super_gradients/training/models/__init__.py @@ -1,7 +1,6 @@ -import warnings - from .sg_module import SgModule from .classification_models.base_classifer import BaseClassifier +from super_gradients.common.deprecate import make_deprecated # Classification models from super_gradients.training.models.classification_models.beit import Beit, BeitLargePatch16_224, BeitBasePatch16_224 @@ -135,17 +134,6 @@ from super_gradients.training.utils import make_divisible as _make_divisible_current_version, HpmStruct as CurrVersionHpmStruct -def make_deprecated(func, reason): - def inner(*args, **kwargs): - with warnings.catch_warnings(): - warnings.simplefilter("once", DeprecationWarning) - warnings.warn(reason, category=DeprecationWarning, stacklevel=2) - warnings.warn(reason, DeprecationWarning) - return func(*args, **kwargs) - - return inner - - make_divisible = make_deprecated( func=_make_divisible_current_version, reason="You're importing `make_divisible` from `super_gradients.training.models`. This is deprecated since SuperGradients 3.1.0.\n" diff --git a/src/super_gradients/training/models/detection_models/csp_resnet.py b/src/super_gradients/training/models/detection_models/csp_resnet.py index 4d72ed1bc9..056fe5e0a2 100644 --- a/src/super_gradients/training/models/detection_models/csp_resnet.py +++ b/src/super_gradients/training/models/detection_models/csp_resnet.py @@ -13,7 +13,8 @@ __all__ = ["CSPResNetBackbone", "CSPResNetBasicBlock"] -from super_gradients.training.utils.distributed_training_utils import wait_for_the_master, get_local_rank +from super_gradients.training.utils.distributed_training_utils import wait_for_the_master +from super_gradients.common.environment.ddp_utils import get_local_rank class CSPResNetBasicBlock(nn.Module): diff --git a/src/super_gradients/training/sg_trainer/sg_trainer.py b/src/super_gradients/training/sg_trainer/sg_trainer.py index 10031418ff..c04df5db63 100755 --- a/src/super_gradients/training/sg_trainer/sg_trainer.py +++ b/src/super_gradients/training/sg_trainer/sg_trainer.py @@ -61,14 +61,10 @@ compute_precise_bn_stats, setup_device, get_gpu_mem_utilization, - get_world_size, - get_local_rank, - require_ddp_setup, - get_device_ids, - is_ddp_subprocess, wait_for_the_master, DDPNotSetupException, ) +from super_gradients.common.environment.ddp_utils import get_local_rank, require_ddp_setup, is_ddp_subprocess, get_world_size, get_device_ids from super_gradients.training.utils.ema import ModelEMA from super_gradients.training.utils.optimizer_utils import build_optimizer from super_gradients.training.utils.sg_trainer_utils import MonitoredValue, log_main_training_params diff --git a/src/super_gradients/training/utils/checkpoint_utils.py b/src/super_gradients/training/utils/checkpoint_utils.py index 304ee27216..a16de11d4e 100644 --- a/src/super_gradients/training/utils/checkpoint_utils.py +++ b/src/super_gradients/training/utils/checkpoint_utils.py @@ -13,7 +13,8 @@ from super_gradients.common.decorators.explicit_params_validator import explicit_params_validation from super_gradients.module_interfaces import HasPredict from super_gradients.training.pretrained_models import MODEL_URLS -from super_gradients.training.utils.distributed_training_utils import get_local_rank, wait_for_the_master +from super_gradients.training.utils.distributed_training_utils import wait_for_the_master +from super_gradients.common.environment.ddp_utils import get_local_rank from super_gradients.training.utils.utils import unwrap_model try: diff --git a/src/super_gradients/training/utils/distributed_training_utils.py b/src/super_gradients/training/utils/distributed_training_utils.py index 3e97caf553..0a42008fd3 100755 --- a/src/super_gradients/training/utils/distributed_training_utils.py +++ b/src/super_gradients/training/utils/distributed_training_utils.py @@ -14,6 +14,7 @@ from torch.distributed.elastic.multiprocessing.errors import record from torch.distributed.launcher.api import LaunchConfig, elastic_launch +from super_gradients.common.deprecate import make_function_deprecated from super_gradients.common.environment.ddp_utils import init_trainer from super_gradients.common.data_types.enum import MultiGPUMode from super_gradients.common.environment.argparse_utils import EXTRA_ARGS @@ -145,40 +146,47 @@ def compute_precise_bn_stats(model: nn.Module, loader: torch.utils.data.DataLoad bn.momentum = momentums[i] +@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils", new_function_name="get_local_rank") def get_local_rank(): """ Returns the local rank if running in DDP, and 0 otherwise :return: local rank """ - return dist.get_rank() if dist.is_initialized() else 0 + from super_gradients.common.environment.ddp_utils import get_local_rank as _get_local_rank - -def require_ddp_setup() -> bool: - return device_config.multi_gpu == MultiGPUMode.DISTRIBUTED_DATA_PARALLEL and device_config.assigned_rank != get_local_rank() + return _get_local_rank() +@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") def is_ddp_subprocess(): - return torch.distributed.get_rank() > 0 if dist.is_initialized() else False + from super_gradients.common.environment.ddp_utils import is_ddp_subprocess as _is_ddp_subprocess + + return _is_ddp_subprocess() +@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") def get_world_size() -> int: """ Returns the world size if running in DDP, and 1 otherwise :return: world size """ - if not dist.is_available(): - return 1 - if not dist.is_initialized(): - return 1 - return dist.get_world_size() + from super_gradients.common.environment.ddp_utils import get_world_size as _get_world_size + + return _get_world_size() +@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") def get_device_ids() -> List[int]: - return list(range(get_world_size())) + from super_gradients.common.environment.ddp_utils import get_device_ids as _get_device_ids + return _get_device_ids() + +@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") def count_used_devices() -> int: - return len(get_device_ids()) + from super_gradients.common.environment.ddp_utils import count_used_devices as _count_used_devices + + return _count_used_devices() @contextmanager diff --git a/src/super_gradients/training/utils/quantization/calibrator.py b/src/super_gradients/training/utils/quantization/calibrator.py index 31381d5897..4dd36db652 100644 --- a/src/super_gradients/training/utils/quantization/calibrator.py +++ b/src/super_gradients/training/utils/quantization/calibrator.py @@ -12,7 +12,7 @@ from tqdm import tqdm from super_gradients.common.abstractions.abstract_logger import get_logger -from super_gradients.training.utils.distributed_training_utils import get_local_rank, get_world_size +from super_gradients.common.environment.ddp_utils import get_local_rank, get_world_size from torch.distributed import all_gather from super_gradients.training.utils.utils import infer_model_device From 768cf2c1defae5d6faee43570d6638e55f23d54e Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Sat, 26 Aug 2023 22:45:02 +0300 Subject: [PATCH 2/7] finamlize fix --- .../common/environment/checkpoints_dir_utils.py | 5 +++-- src/super_gradients/common/sg_loggers/base_sg_logger.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/super_gradients/common/environment/checkpoints_dir_utils.py b/src/super_gradients/common/environment/checkpoints_dir_utils.py index 56b8270c16..2a7f29a1aa 100644 --- a/src/super_gradients/common/environment/checkpoints_dir_utils.py +++ b/src/super_gradients/common/environment/checkpoints_dir_utils.py @@ -37,11 +37,12 @@ def is_run_dir(dirname: str) -> bool: def get_latest_run_id(experiment_name: str, checkpoints_root_dir: Optional[str] = None) -> Optional[str]: """ - :param experiment_name: Name of the experiment. - :param checkpoints_root_dir: Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment. + :param experiment_name: Name of the experiment. + :param checkpoints_root_dir: Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment. If None, SG will first check if a package named 'checkpoints' exists. If not, SG will look for the root of the project that includes the script that was launched. If not found, raise an error. + :return: Latest valid run ID. in the format "RUN_" """ experiment_dir = get_experiment_dir_path(checkpoints_root_dir=checkpoints_root_dir, experiment_name=experiment_name) diff --git a/src/super_gradients/common/sg_loggers/base_sg_logger.py b/src/super_gradients/common/sg_loggers/base_sg_logger.py index 0ba5f47788..e5210c528f 100644 --- a/src/super_gradients/common/sg_loggers/base_sg_logger.py +++ b/src/super_gradients/common/sg_loggers/base_sg_logger.py @@ -133,7 +133,8 @@ def _setup_dir(self): # Only if it exists, i.e. if hydra was used. if os.path.exists(source_hydra_path): destination_hydra_path = os.path.join(self._local_dir, ".hydra") - shutil.copytree(source_hydra_path, destination_hydra_path, dirs_exist_ok=True) + if not os.path.exists(destination_hydra_path): + shutil.copytree(source_hydra_path, destination_hydra_path) @multi_process_safe def _init_log_file(self): From b9898e446e9fd66a6b68ee9a5fd8fa59884decb1 Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Wed, 30 Aug 2023 15:10:31 +0300 Subject: [PATCH 3/7] rollback --- src/super_gradients/common/deprecate.py | 76 +++++++++++++++++-------- 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/src/super_gradients/common/deprecate.py b/src/super_gradients/common/deprecate.py index 48f8de2f1e..516ec85ff4 100644 --- a/src/super_gradients/common/deprecate.py +++ b/src/super_gradients/common/deprecate.py @@ -1,44 +1,72 @@ import warnings -from typing import Optional from functools import wraps +from typing import Optional +from pkg_resources import parse_version -def make_function_deprecated(version: str, new_function_name: Optional[str] = None, new_module_name: Optional[str] = None): +def deprecated(deprecated_since: str, removed_from: str, target: Optional[callable] = None, reason: str = ""): """ - Decorator to mark a function as deprecated, providing a warning message with the version number when it will be removed, and how to replace it. + Decorator to mark a callable as deprecated. Works on functions and classes. + It provides a clear and actionable warning message informing + the user about the version in which the function was deprecated, the version in which it will be removed, + and guidance on how to replace it. - :param version: Version number when the function will be removed. - :param new_function_name: New name of the function if it has been renamed. - :param new_module_name: Module where the function has been moved. If None, this will be set as the + :param deprecated_since: Version number when the function was deprecated. + :param removed_from: Version number when the function will be removed. + :param target: (Optional) The new function that should be used as a replacement. If provided, it will guide the user to the updated function. + :param reason: (Optional) Additional information or reason for the deprecation. Example usage: - >> @make_deprecated('4.0.0', new_function_name='new_get_local_rank', new_module_name='new.module.path') + If a direct replacement function exists: + >> from new.module.path import new_get_local_rank + + >> @deprecated(deprecated_since='3.2.0', removed_from='4.0.0', target=new_get_local_rank, reason="Replaced for optimization") >> def get_local_rank(): - >> from new_module import get_local_rank as _get_local_rank - >> return _get_local_rank() + >> return new_get_local_rank() + + If there's no direct replacement: + >> @deprecated(deprecated_since='3.2.0', removed_from='4.0.0', reason="Function is no longer needed due to XYZ reason") + >> def some_old_function(): + >> # ... function logic ... - >> from deprecated_module import get_local_rank + When calling a deprecated function: + >> from some_module import get_local_rank >> get_local_rank() - DeprecationWarning: You are using `deprecated_module.get_local_rank` which is deprecated and will be removed in 4.0.0. - Please update your code to import it as follows: - [-] from deprecated_module import get_local_rank - [+] from new.module.path import new_get_local_rank + DeprecationWarning: Function `some_module.get_local_rank` is deprecated. Deprecated since version `3.2.0` + and will be removed in version `4.0.0`. Reason: `Replaced for optimization`. + Please update your code: + [-] from `some_module` import `get_local_rank` + [+] from `new.module.path` import `new_get_local_rank`. """ - def decorator(old_func): + def decorator(old_func: callable) -> callable: @wraps(old_func) def wrapper(*args, **kwargs): if not wrapper._warned: - new_name = new_function_name or old_func.__name__ - new_module = new_module_name or old_func.__module__ - reason = ( - f"You are using `{old_func.__module__}.{old_func.__name__}` which is deprecated and will be removed in {version}.\n" - f"Please update your code to import it as follows:\n" - f" [-] from {old_func.__module__} import {old_func.__name__}\n" - f" [+] from {new_module} import {new_name}\n." + import super_gradients + + is_still_supported = parse_version(super_gradients.__version__) < parse_version(removed_from) + status_msg = "is deprecated" if is_still_supported else "was deprecated and has been removed" + message = ( + f"Callable `{old_func.__module__}.{old_func.__name__}` {status_msg} since version `{deprecated_since}` " + f"and will be removed in version `{removed_from}`.\n" ) - warnings.warn(reason, DeprecationWarning, stacklevel=2) - wrapper._warned = True + if reason: + message += f"Reason: {reason}.\n" + + if target is not None: + message += ( + f"Please update your code:\n" + f" [-] from `{old_func.__module__}` import `{old_func.__name__}`\n" + f" [+] from `{target.__module__}` import `{target.__name__}`" + ) + + if is_still_supported: + warnings.simplefilter("once", DeprecationWarning) # Required, otherwise the warning may never be displayed. + warnings.warn(message, DeprecationWarning, stacklevel=2) + wrapper._warned = True + else: + raise ImportError(message) return old_func(*args, **kwargs) From 337c24d8c42389a9c6ce5636ee620a7df0c22696 Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Wed, 30 Aug 2023 15:34:24 +0300 Subject: [PATCH 4/7] fix --- .../utils/distributed_training_utils.py | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/super_gradients/training/utils/distributed_training_utils.py b/src/super_gradients/training/utils/distributed_training_utils.py index 0a42008fd3..9fde5f3348 100755 --- a/src/super_gradients/training/utils/distributed_training_utils.py +++ b/src/super_gradients/training/utils/distributed_training_utils.py @@ -14,7 +14,7 @@ from torch.distributed.elastic.multiprocessing.errors import record from torch.distributed.launcher.api import LaunchConfig, elastic_launch -from super_gradients.common.deprecate import make_function_deprecated +from super_gradients.common.deprecate import deprecated from super_gradients.common.environment.ddp_utils import init_trainer from super_gradients.common.data_types.enum import MultiGPUMode from super_gradients.common.environment.argparse_utils import EXTRA_ARGS @@ -28,6 +28,13 @@ from super_gradients.common.decorators.factory_decorator import resolve_param from super_gradients.common.factories.type_factory import TypeFactory +from super_gradients.common.environment.ddp_utils import get_local_rank as _get_local_rank +from super_gradients.common.environment.ddp_utils import is_ddp_subprocess as _is_ddp_subprocess +from super_gradients.common.environment.ddp_utils import get_world_size as _get_world_size +from super_gradients.common.environment.ddp_utils import get_device_ids as _get_device_ids +from super_gradients.common.environment.ddp_utils import count_used_devices as _count_used_devices + + logger = get_logger(__name__) @@ -146,46 +153,36 @@ def compute_precise_bn_stats(model: nn.Module, loader: torch.utils.data.DataLoad bn.momentum = momentums[i] -@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils", new_function_name="get_local_rank") +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_get_local_rank) def get_local_rank(): """ Returns the local rank if running in DDP, and 0 otherwise :return: local rank """ - from super_gradients.common.environment.ddp_utils import get_local_rank as _get_local_rank - return _get_local_rank() -@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_is_ddp_subprocess) def is_ddp_subprocess(): - from super_gradients.common.environment.ddp_utils import is_ddp_subprocess as _is_ddp_subprocess - return _is_ddp_subprocess() -@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_get_world_size) def get_world_size() -> int: """ Returns the world size if running in DDP, and 1 otherwise :return: world size """ - from super_gradients.common.environment.ddp_utils import get_world_size as _get_world_size - return _get_world_size() -@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_get_device_ids) def get_device_ids() -> List[int]: - from super_gradients.common.environment.ddp_utils import get_device_ids as _get_device_ids - return _get_device_ids() -@make_function_deprecated(version="3.5.0", new_module_name="super_gradients.common.environment.ddp_utils") +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_count_used_devices) def count_used_devices() -> int: - from super_gradients.common.environment.ddp_utils import count_used_devices as _count_used_devices - return _count_used_devices() From 36abc20222d1cc94e0bd50c0b503037e83af1f05 Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Wed, 30 Aug 2023 16:02:10 +0300 Subject: [PATCH 5/7] simplyfy broadcastin --- .../common/environment/ddp_utils.py | 80 +------------------ 1 file changed, 4 insertions(+), 76 deletions(-) diff --git a/src/super_gradients/common/environment/ddp_utils.py b/src/super_gradients/common/environment/ddp_utils.py index 0b5238cf8d..f57ab6567d 100755 --- a/src/super_gradients/common/environment/ddp_utils.py +++ b/src/super_gradients/common/environment/ddp_utils.py @@ -1,7 +1,6 @@ import socket from functools import wraps import os -import pickle from typing import Any, List, Callable import torch @@ -159,82 +158,11 @@ def broadcast_from_master(data: Any) -> Any: """ Broadcast data from master node to all other nodes. This may be required when you want to compute something only on master node (e.g computational-heavy metric) and - don't want to vaste CPU of other nodes doing same work simultaneously. - - >>> if device_config.assigned_rank <= 0: - >>> result = some_code_to_run(...) - >>> else: - >>> result = None - >>> # 'result' propagated to all nodes from master - >>> result = broadcast_from_master(result) + don't want to waste CPU of other nodes doing the same work simultaneously. :param data: Data to be broadcasted from master node (rank 0) :return: Data from rank 0 node """ - world_size = get_world_size() - if world_size == 1: - return data - - local_rank = get_local_rank() - storage: torch.Tensor - - if local_rank == 0: - buffer = pickle.dumps(data) - storage = torch.ByteStorage.from_buffer(buffer) - payload = torch.ByteTensor(storage).to("cuda") - local_size = payload.numel() - else: - local_size = 0 - - # Propagate target tensor size to all nodes - local_size = max(all_gather(local_size)) - if local_rank != 0: - payload = torch.empty((local_size,), dtype=torch.uint8, device="cuda") - - dist.broadcast(payload, 0) - buffer = payload.cpu().numpy().tobytes() - return pickle.loads(buffer) - - -def all_gather(data: Any) -> List[Any]: - """ - Run all_gather on arbitrary picklable data (not necessarily tensors) - :param data: Any picklable object - :return: List of data gathered from each rank - """ - world_size = get_world_size() - if world_size == 1: - return [data] - - # serialized to a Tensor - buffer = pickle.dumps(data) - try: - storage = torch.UntypedStorage.from_buffer(buffer, dtype=torch.uint8) - except AttributeError: - storage = torch._UntypedStorage.from_buffer(buffer, dtype=torch.uint8) - tensor = torch.ByteTensor(storage).to("cuda") - - # obtain Tensor size of each rank - local_size = torch.tensor([tensor.numel()], device="cuda") - size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] - dist.all_gather(size_list, local_size) - size_list = [int(size.item()) for size in size_list] - max_size = max(size_list) - - # receiving Tensor from all ranks - # we pad the tensor because torch all_gather does not support - # gathering tensors of different shapes - tensor_list = [] - for _ in size_list: - tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) - if local_size != max_size: - padding = torch.empty(size=(max_size - local_size,), dtype=torch.uint8, device="cuda") - tensor = torch.cat((tensor, padding), dim=0) - dist.all_gather(tensor_list, tensor) - - data_list = [] - for size, tensor in zip(size_list, tensor_list): - buffer = tensor.cpu().numpy().tobytes()[:size] - data_list.append(pickle.loads(buffer)) - - return data_list + broadcast_list = [data] if dist.get_rank() == 0 else [None] + dist.broadcast_object_list(broadcast_list, src=0) + return broadcast_list[0] From 6d9ef6233331661257332d7e8593a30e8a0fb466 Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Wed, 30 Aug 2023 16:17:53 +0300 Subject: [PATCH 6/7] fix --- .../training/utils/distributed_training_utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/super_gradients/training/utils/distributed_training_utils.py b/src/super_gradients/training/utils/distributed_training_utils.py index 9fde5f3348..da790170a5 100755 --- a/src/super_gradients/training/utils/distributed_training_utils.py +++ b/src/super_gradients/training/utils/distributed_training_utils.py @@ -33,6 +33,7 @@ from super_gradients.common.environment.ddp_utils import get_world_size as _get_world_size from super_gradients.common.environment.ddp_utils import get_device_ids as _get_device_ids from super_gradients.common.environment.ddp_utils import count_used_devices as _count_used_devices +from super_gradients.common.environment.ddp_utils import require_ddp_setup as _require_ddp_setup logger = get_logger(__name__) @@ -186,6 +187,11 @@ def count_used_devices() -> int: return _count_used_devices() +@deprecated(deprecated_since="3.2.1", removed_from="3.5.0", target=_require_ddp_setup) +def require_ddp_setup() -> bool: + return _require_ddp_setup() + + @contextmanager def wait_for_the_master(local_rank: int): """ From 16c8c78fdb247fa393687c732394d9b355164e81 Mon Sep 17 00:00:00 2001 From: Louis Dupont Date: Wed, 30 Aug 2023 17:12:21 +0300 Subject: [PATCH 7/7] fix --- src/super_gradients/common/environment/ddp_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/super_gradients/common/environment/ddp_utils.py b/src/super_gradients/common/environment/ddp_utils.py index f57ab6567d..a745130974 100755 --- a/src/super_gradients/common/environment/ddp_utils.py +++ b/src/super_gradients/common/environment/ddp_utils.py @@ -163,6 +163,9 @@ def broadcast_from_master(data: Any) -> Any: :param data: Data to be broadcasted from master node (rank 0) :return: Data from rank 0 node """ + world_size = get_world_size() + if world_size == 1: + return data broadcast_list = [data] if dist.get_rank() == 0 else [None] dist.broadcast_object_list(broadcast_list, src=0) return broadcast_list[0]