Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better than PL auto strategy selection #159

Merged
merged 3 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion basics/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,13 @@ def start(cls):
accelerator=hparams['pl_trainer_accelerator'],
devices=hparams['pl_trainer_devices'],
num_nodes=hparams['pl_trainer_num_nodes'],
strategy=get_strategy(hparams['pl_trainer_strategy']),
strategy=get_strategy(
hparams['pl_trainer_devices'],
hparams['pl_trainer_num_nodes'],
hparams['pl_trainer_accelerator'],
hparams['pl_trainer_strategy'],
hparams['pl_trainer_precision'],
),
precision=hparams['pl_trainer_precision'],
callbacks=[
DsModelCheckpoint(
Expand Down
91 changes: 78 additions & 13 deletions utils/training_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,17 +364,82 @@ def __getstate__(self):
del state["_all_rank_experiment"]
return state

def get_strategy(
devices="auto",
num_nodes=1,
accelerator="auto",
strategy={"name": "auto"},
precision=None,
):
from lightning.fabric.utilities.device_parser import _determine_root_gpu_device
from lightning.pytorch.accelerators import AcceleratorRegistry
from lightning.pytorch.accelerators.cuda import CUDAAccelerator
from lightning.pytorch.accelerators.mps import MPSAccelerator
from lightning.pytorch.strategies import Strategy, SingleDeviceStrategy, StrategyRegistry
from lightning.pytorch.trainer.connectors import accelerator_connector
from lightning.pytorch.utilities.rank_zero import rank_zero_warn
class _DsAcceleratorConnector(accelerator_connector._AcceleratorConnector):
def __init__(self) -> None:
accelerator_connector._register_external_accelerators_and_strategies()
self._registered_strategies = StrategyRegistry.available_strategies()
self._accelerator_types = AcceleratorRegistry.available_accelerators()
self._parallel_devices = []
self._check_config_and_set_final_flags(
strategy=strategy["name"],
accelerator=accelerator,
precision=precision,
plugins=[],
sync_batchnorm=False,
)
if self._accelerator_flag == "auto":
self._accelerator_flag = self._choose_auto_accelerator()
elif self._accelerator_flag == "gpu":
self._accelerator_flag = self._choose_gpu_accelerator_backend()
self._check_device_config_and_set_final_flags(devices=devices, num_nodes=num_nodes)
self._set_parallel_devices_and_init_accelerator()
if self._strategy_flag == "auto":
self._strategy_flag = self._choose_strategy()
self._check_strategy_and_fallback()
self._init_strategy()
for k in ["colossalai", "bagua", "hpu", "hpu_parallel", "hpu_single", "ipu", "ipu_strategy"]:
if k in StrategyRegistry:
StrategyRegistry.remove(k)

def _init_strategy(self) -> None:
assert isinstance(self._strategy_flag, (str, Strategy))
if isinstance(self._strategy_flag, str):
if self._strategy_flag not in StrategyRegistry:
available_names = ", ".join(sorted(StrategyRegistry.available_strategies())) or "none"
raise KeyError(f"Invalid strategy name {strategy['name']}. Available names: {available_names}")
data = StrategyRegistry[self._strategy_flag]
params = {}
# Replicate additional logic for _choose_strategy when dealing with single device strategies
if issubclass(data["strategy"], SingleDeviceStrategy):
if self._accelerator_flag == "hpu":
params = {"device": torch.device("hpu")}
elif self._accelerator_flag == "tpu":
params = {"device": self._parallel_devices[0]}
elif data["strategy"] is SingleDeviceStrategy:
if isinstance(self._accelerator_flag, (CUDAAccelerator, MPSAccelerator)) or (
isinstance(self._accelerator_flag, str) and self._accelerator_flag in ("cuda", "gpu", "mps")
):
params = {"device": _determine_root_gpu_device(self._parallel_devices)}
else:
params = {"device": "cpu"}
else:
raise NotImplementedError
params.update(data["init_params"])
params.update({k: v for k, v in strategy.items() if k != "name"})
self.strategy = data["strategy"](**utils.filter_kwargs(params, data["strategy"]))
elif isinstance(self._strategy_flag, SingleDeviceStrategy):
params = {"device": self._strategy_flag.root_device}
params.update({k: v for k, v in strategy.items() if k != "name"})
self.strategy = self._strategy_flag.__class__(**utils.filter_kwargs(params, self._strategy_flag.__class__))
else:
rank_zero_warn(
f"Inferred strategy {self._strategy_flag.__class__.__name__} cannot take custom configurations."
f"To use custom configurations, please specify the strategy name explicitly."
)
self.strategy = self._strategy_flag

def get_strategy(strategy):
if strategy['name'] == 'auto':
return 'auto'

from lightning.pytorch.strategies import StrategyRegistry
if strategy['name'] not in StrategyRegistry:
available_names = ", ".join(sorted(StrategyRegistry.keys())) or "none"
raise ValueError(f"Invalid strategy name {strategy['name']}. Available names: {available_names}")

data = StrategyRegistry[strategy['name']]
params = data['init_params']
params.update({k: v for k, v in strategy.items() if k != 'name'})
return data['strategy'](**utils.filter_kwargs(params, data['strategy']))
return _DsAcceleratorConnector().strategy