Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[accelerator][BugFix] Resolve some test for 1 gpu #5863

Merged
merged 47 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
02f4818
update
Feb 7, 2021
18b4b25
revert init
tchaton Feb 7, 2021
6cdf71d
resolve a bug
Feb 7, 2021
95870b9
Merge branch 'resolve_tests' of https://github.com/PyTorchLightning/p…
Feb 7, 2021
ffdddb9
update
Feb 8, 2021
6f9830a
resolve flake8
tchaton Feb 8, 2021
b02b7b0
update
Feb 8, 2021
67a8cb3
:Merge branch 'resolve_tests' of https://github.com/PyTorchLightning/…
Feb 8, 2021
701539f
update
Feb 8, 2021
b8a8d81
update
Feb 7, 2021
eea223f
revert init
tchaton Feb 7, 2021
e85e213
resolve a bug
Feb 7, 2021
337f723
update
Feb 8, 2021
b41fc9f
resolve flake8
tchaton Feb 8, 2021
951cc4d
update
Feb 8, 2021
e8cc904
update
Feb 8, 2021
3e79a6d
update
Feb 8, 2021
f9666f1
update
Feb 8, 2021
6ac21c5
Merge branch 'resolve_tests' of https://github.com/PyTorchLightning/p…
Feb 8, 2021
5890da3
update
Feb 7, 2021
83ff23f
revert init
tchaton Feb 7, 2021
cde3781
resolve a bug
Feb 7, 2021
0f6eeb4
update
Feb 8, 2021
47ef8e0
resolve flake8
tchaton Feb 8, 2021
35a6f53
update
Feb 8, 2021
f7689b4
update
Feb 8, 2021
e411983
update
Feb 7, 2021
60082d7
revert init
tchaton Feb 7, 2021
8153efd
update
Feb 8, 2021
f53aa29
resolve flake8
tchaton Feb 8, 2021
4bfc621
update
Feb 8, 2021
77b5e87
update
Feb 8, 2021
9f7e41f
update
Feb 8, 2021
d96b249
Merge branch 'resolve_tests' of https://github.com/PyTorchLightning/p…
Feb 8, 2021
3b1e784
update
Feb 8, 2021
f2214ef
update
Feb 8, 2021
c5029f7
all_gather
justusschock Feb 8, 2021
af791a7
update
Feb 8, 2021
7378e2e
make plugins work, add misconfig for RPC
justusschock Feb 8, 2021
b2812c2
Merge branch 'resolve_tests' of github.com:PytorchLightning/pytorch-l…
justusschock Feb 8, 2021
28c8005
update
Feb 8, 2021
1f96f00
Merge branch 'resolve_tests' of https://github.com/PyTorchLightning/p…
Feb 8, 2021
13972e7
update
Feb 8, 2021
b77003e
remove breaking test
Feb 8, 2021
0c7e10d
resolve some tests
Feb 8, 2021
1c247dc
resolve flake8
tchaton Feb 8, 2021
c3594b0
revert to ddp_spawn
tchaton Feb 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ wandb

# dataset generated from bolts in examples.
cifar-10-batches-py
*.pt
23 changes: 17 additions & 6 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
HorovodPlugin,
NativeMixedPrecisionPlugin,
PrecisionPlugin,
RPCPlugin,
ShardedNativeMixedPrecisionPlugin,
SingleDevicePlugin,
SingleTPUPlugin,
Expand Down Expand Up @@ -116,11 +115,11 @@ def __init__(
self.parallel_device_ids = device_parser.parse_gpu_ids(self.gpus)
self.root_gpu = device_parser.determine_root_gpu_device(self.parallel_device_ids)

self.handle_given_plugins(plugins)

self.set_distributed_mode()
self.configure_slurm_ddp()

self.handle_given_plugins(plugins)

self.accelerator = self.select_accelerator()

# override dist backend when using tpus
Expand Down Expand Up @@ -148,6 +147,7 @@ def __init__(

def handle_given_plugins(self, plugins: Optional[Sequence]):
if plugins is None:
self._cluster_environment = self.select_cluster_environment()
tchaton marked this conversation as resolved.
Show resolved Hide resolved
return

if not isinstance(plugins, Sequence):
Expand Down Expand Up @@ -194,6 +194,13 @@ def handle_given_plugins(self, plugins: Optional[Sequence]):
self._precision_plugin = precision
self._cluster_environment = cluster_environment or self.select_cluster_environment()

@property
def local_rank(self):
try:
return self._cluster_environment.local_rank()
except KeyError:
return None

@property
def precision_plugin(self) -> PrecisionPlugin:
if self._precision_plugin is None:
Expand All @@ -206,6 +213,8 @@ def training_type_plugin(self) -> TrainingTypePlugin:
self._training_type_plugin = self.select_training_type_plugin()
else:
self._training_type_plugin = self.resolve_training_type_plugin(self._training_type_plugin)
# attach local_rank
self._training_type_plugin.task_idx = self.local_rank
tchaton marked this conversation as resolved.
Show resolved Hide resolved
return self._training_type_plugin

@property
Expand Down Expand Up @@ -294,7 +303,7 @@ def select_precision_plugin(self):
if not _APEX_AVAILABLE and self.on_cpu:
raise MisconfigurationException(
"You have asked for native AMP on CPU, but AMP is only available on GPU."
)
)
self.amp_type = "apex"
elif self.on_cpu:
raise MisconfigurationException(
Expand Down Expand Up @@ -372,7 +381,6 @@ def select_training_type_plugin(self):
plugin = SingleDevicePlugin(device=torch.device(f"cuda:{self.root_gpu}" if self.on_gpu else "cpu"))
return plugin


def resolve_training_type_plugin(self, training_type: TrainingTypePlugin) -> TrainingTypePlugin:
# necessary for RPC, when user has to provide balance
if hasattr(training_type, 'parallel_devices') and not getattr(training_type, 'parallel_devices'):
Expand Down Expand Up @@ -481,10 +489,13 @@ def set_distributed_mode(self):
# for DDP overwrite nb processes by requested GPUs
if (
self._device_type == DeviceType.GPU
and self._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN, DistributedType.DDP2)
and self._distrib_type in (DistributedType.DDP, DistributedType.DDP_SPAWN)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about DDP2?

):
self.num_processes = self.num_gpus

if (self._device_type == DeviceType.GPU and self._distrib_type == DistributedType.DDP2):
self.num_processes = self.num_nodes

# Horovod is an extra case...
if self.distributed_backend == "horovod":
self._set_horovod_backend()
Expand Down
14 changes: 12 additions & 2 deletions pytorch_lightning/plugins/precision/apex_amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Tuple
from typing import List, Tuple, Callable

import torch
from torch.optim import Optimizer
Expand Down Expand Up @@ -71,7 +71,7 @@ def backward(
# do backward pass
# TODO: not entirely sure, why we need this
if model is not None and isinstance(model, LightningModule):
model.backward(closure_loss, optimizer, opt_idx)
model.backward(closure_loss, optimizer, opt_idx, **kwargs)

# TODO: avoid dev_debugger and track these calls with mock
model.trainer.dev_debugger.track_event('AMP', str(AMPType.APEX))
Expand All @@ -90,6 +90,16 @@ def backward(
closure_loss = closure_loss.detach()
return closure_loss

def pre_optimizer_step(
self, pl_module: LightningModule, optimizer: Optimizer, optimizer_idx: int, closure: Callable, **kwargs
) -> bool:
"""Hook to do something before each optimizer step."""
# Apex: Amp does not support closure use with optimizers
closure()
optimizer.step()
return False


def configure_apex(
self,
amp: object,
Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/trainer/trainer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ def fit(
# ----------------------------
# SET UP TRAINING
# ----------------------------
self.call_hook("on_before_accelerator_backend_setup", model)
self.accelerator_backend.setup(self, model)
self.setup_trainer(model)

Expand All @@ -469,7 +470,6 @@ def fit(

# plugin will setup training (e.g. ddp will launch child processes)
# TODO: the old setup is now called "pre_training", where should this hook be called now?
self.call_hook("on_before_accelerator_backend_setup", model)
self.training_type_plugin.pre_training()
self.precision_plugin.pre_training()

Expand Down
32 changes: 17 additions & 15 deletions tests/accelerators/legacy/test_accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_accelerator_choice_ddp_spawn(cuda_available_mock, device_count_mock):
assert isinstance(trainer.training_type_plugin.cluster_environment, TorchElasticEnvironment)


@pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires GPU")
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
@mock.patch.dict(
os.environ, {
"CUDA_VISIBLE_DEVICES": "0,1",
Expand All @@ -89,14 +89,14 @@ def test_accelerator_choice_ddp_slurm():

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp
assert trainer.accelerator_connector.is_slurm_managing_tasks
assert isinstance(trainer.accelerator_backend, GPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, SLURMEnvironment)
assert trainer.training_type_plugin.task_idx == 10
assert trainer.training_type_plugin.cluster_environment.local_rank() == 10
assert trainer.training_type_plugin.task_idx == 10
raise SystemExit()

model = BoringModel()
Expand Down Expand Up @@ -127,15 +127,14 @@ def test_accelerator_choice_ddp2_slurm(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp2
assert trainer.accelerator_connector.is_slurm_managing_tasks
assert isinstance(trainer.accelerator_backend, GPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDP2Plugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, SLURMEnvironment)
assert trainer.training_type_plugin.task_idx == 10
assert trainer.training_type_plugin.cluster_environment.local_rank() == 10

assert trainer.training_type_plugin.task_idx == 10
raise SystemExit()

model = BoringModel()
Expand All @@ -157,13 +156,13 @@ def test_accelerator_choice_ddp_te(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, GPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, TorchElasticEnvironment)
assert trainer.training_type_plugin.task_idx == 10
assert trainer.training_type_plugin.cluster_environment.local_rank() == 10
assert trainer.training_type_plugin.task_idx == 10
raise SystemExit()

model = BoringModel()
Expand All @@ -185,13 +184,13 @@ def test_accelerator_choice_ddp2_te(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp2
assert isinstance(trainer.accelerator_backend, GPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDP2Plugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, TorchElasticEnvironment)
assert trainer.training_type_plugin.task_idx == 10
assert trainer.training_type_plugin.cluster_environment.local_rank() == 10
assert trainer.training_type_plugin.task_idx == 10
raise SystemExit()

model = BoringModel()
Expand All @@ -216,13 +215,13 @@ def test_accelerator_choice_ddp_cpu_te(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, CPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, TorchElasticEnvironment)
assert trainer.training_type_plugin.task_idx == 10
assert trainer.training_type_plugin.cluster_environment.local_rank() == 10
assert trainer.training_type_plugin.task_idx == 10
raise SystemExit()

model = BoringModel()
Expand Down Expand Up @@ -251,12 +250,13 @@ def test_accelerator_choice_ddp_cpu_slurm(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp
assert trainer.accelerator_connector.is_slurm_managing_tasks
assert isinstance(trainer.accelerator_backend, CPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, SLURMEnvironment)
assert trainer.training_type_plugin.task_idx == 0
raise SystemExit()

model = BoringModel()
Expand Down Expand Up @@ -293,11 +293,12 @@ def master_address(self):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, CPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert isinstance(trainer.training_type_plugin.cluster_environment, CustomCluster)
assert trainer.training_type_plugin.task_idx is None
raise SystemExit()

model = BoringModel()
Expand Down Expand Up @@ -362,9 +363,10 @@ def test_dist_backend_accelerator_mapping(device_count_mock):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert isinstance(trainer.accelerator_backend, CPUAccelerator)
assert isinstance(trainer.training_type_plugin, DDPPlugin)
assert trainer.training_type_plugin.task_idx == 0
raise SystemExit()

model = BoringModel()
Expand Down
4 changes: 2 additions & 2 deletions tests/callbacks/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def test_trainer_callback_system(torch_save):
assert callback_mock.method_calls == [
call.on_init_start(trainer),
call.on_init_end(trainer),
call.on_fit_start(trainer, model),
call.on_before_accelerator_backend_setup(trainer, model),
call.on_fit_start(trainer, model),
call.setup(trainer, model, 'fit'),
call.on_pretrain_routine_start(trainer, model),
call.on_pretrain_routine_end(trainer, model),
Expand Down Expand Up @@ -108,8 +108,8 @@ def test_trainer_callback_system(torch_save):
assert callback_mock.method_calls == [
call.on_init_start(trainer),
call.on_init_end(trainer),
call.on_fit_start(trainer, model),
call.on_before_accelerator_backend_setup(trainer, model),
call.on_fit_start(trainer, model),
call.setup(trainer, model, 'test'),
call.on_test_start(trainer, model),
call.on_test_epoch_start(trainer, model),
Expand Down
2 changes: 1 addition & 1 deletion tests/deprecated_api/test_remove_1-4.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def configure_ddp(self):
assert isinstance(self.model.module, LightningDistributedModule)


@pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires GPU machine")
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
@pytest.mark.skipif(sys.platform == "win32", reason="DDP not available on windows")
def test_v1_4_0_deprecated_lightning_distributed_data_parallel(tmpdir):
model = BoringModel()
Expand Down
7 changes: 6 additions & 1 deletion tests/models/test_sync_batchnorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import pytest
import torch
import torch.nn as nn
Expand Down Expand Up @@ -67,6 +69,9 @@ def configure_optimizers(self):


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
@pytest.mark.skipif(
not os.getenv("PL_RUNNING_SPECIAL_TESTS", '0') == '1', reason="test should be run outside of pytest"
)
def test_sync_batchnorm_ddp(tmpdir):
seed_everything(234)
set_random_master_port()
Expand Down Expand Up @@ -105,7 +110,7 @@ def test_sync_batchnorm_ddp(tmpdir):
trainer = Trainer(
gpus=2,
num_nodes=1,
accelerator='ddp_spawn',
accelerator='ddp',
max_epochs=1,
max_steps=3,
sync_batchnorm=True,
Expand Down
2 changes: 1 addition & 1 deletion tests/plugins/legacy/test_rpc_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_rpc_choice(tmpdir, ddp_backend, gpus, num_processes):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert isinstance(trainer.training_type_plugin, RPCPlugin)
raise RuntimeError('finished plugin check')

Expand Down
4 changes: 2 additions & 2 deletions tests/plugins/test_apex_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_amp_choice_default_ddp_cpu(tmpdir, ddp_backend, gpus, num_processes):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert isinstance(trainer.precision_plugin, ApexMixedPrecisionPlugin)
raise SystemExit()

Expand Down Expand Up @@ -72,7 +72,7 @@ class MyApexPlugin(ApexMixedPrecisionPlugin):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert isinstance(trainer.precision_plugin, MyApexPlugin)
raise SystemExit()

Expand Down
6 changes: 3 additions & 3 deletions tests/plugins/test_sharded_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_sharded_ddp_choice(tmpdir, accelerator):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
if accelerator == 'ddp_sharded':
assert isinstance(trainer.accelerator_backend.training_type_plugin, DDPShardedPlugin)
elif accelerator == 'ddp_sharded_spawn':
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_ddp_choice_sharded_amp(tmpdir, accelerator):

class CB(Callback):

def on_fit_start(self, trainer, pl_module):
def on_before_accelerator_backend_setup(self, trainer, pl_module):
assert isinstance(trainer.accelerator_backend.precision_plugin, ShardedNativeMixedPrecisionPlugin)
raise SystemExit()

Expand Down Expand Up @@ -131,7 +131,7 @@ def test_ddp_sharded_plugin_checkpoint_multi_gpu(tmpdir):

# Assert model parameters are identical after loading
for ddp_param, shard_param in zip(model.parameters(), saved_model.parameters()):
assert torch.equal(ddp_param, shard_param)
assert torch.equal(ddp_param.to("cpu"), shard_param)


@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine")
Expand Down
2 changes: 2 additions & 0 deletions tests/special_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set -e
export PL_RUNNING_SPECIAL_TESTS=1
DEFAULTS="-m coverage run --source pytorch_lightning -a -m pytest --verbose --capture=no"
python ${DEFAULTS} tests/trainer/optimization/test_manual_optimization.py::test_step_with_optimizer_closure_with_different_frequencies_ddp
python ${DEFAULTS} tests/models/test_sync_batchnorm.py::test_sync_batchnorm_ddp
python ${DEFAULTS} tests/plugins/legacy/test_rpc_plugin.py::test_rpc_function_calls_ddp
python ${DEFAULTS} tests/plugins/legacy/test_ddp_sequential_plugin.py::test_ddp_sequential_plugin_ddp_rpc_manual
python ${DEFAULTS} tests/plugins/legacy/test_ddp_sequential_plugin.py::test_ddp_sequential_plugin_ddp_rpc_manual_amp
Expand All @@ -29,3 +30,4 @@ python ${DEFAULTS} tests/callbacks/test_pruning.py::test_pruning_callback_ddp
python ${DEFAULTS} tests/trainer/test_trainer.py::test_pytorch_profiler_trainer_ddp
python ${DEFAULTS} tests/models/test_hooks.py::test_transfer_batch_hook_ddp
python ${DEFAULTS} tests/trainer/test_data_loading.py::test_replace_distrubuted_sampler_custom_dataloader_custom_batch_sampler
python ${DEFAULTS} tests/trainer/test_data_loading.py::test_replace_distrubuted_sampler_custom_dataloader_custom_batch_sampler
Loading