Skip to content

Commit

Permalink
Run ddp_spawn dataloader checks on windows (#6930)
Browse files Browse the repository at this point in the history
  • Loading branch information
carmocca authored Apr 9, 2021
1 parent 3baac71 commit b85cfbe
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 41 deletions.
71 changes: 35 additions & 36 deletions pytorch_lightning/trainer/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import inspect
import multiprocessing
import platform
from abc import ABC
from copy import deepcopy
from typing import Iterable, List, Tuple, Union
Expand Down Expand Up @@ -54,53 +53,53 @@ class TrainerDataLoadingMixin(ABC):
dev_debugger: InternalDebugger

def _worker_check(self, dataloader: DataLoader, name: str) -> None:
on_windows = platform.system() == 'Windows'
if not isinstance(dataloader, DataLoader):
return

# ddp_spawn + num_workers > 0 don't mix! tell the user
is_dataloader = isinstance(dataloader, DataLoader)
using_spawn = self.accelerator_connector.distributed_backend == "ddp_spawn"
if is_dataloader and not on_windows:
if dataloader.num_workers > 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'num_workers>0, persistent_workers=False, and accelerator=ddp_spawn'
' may result in data loading bottlenecks.'
' Consider setting persistent_workers=True'
' (this is a limitation of Python .spawn() and PyTorch)'
)
else:
num_cpus = multiprocessing.cpu_count()

# ddp_spawn + num_workers > 0 don't mix! tell the user
if dataloader.num_workers > 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'num_workers>0 and accelerator=ddp_spawn do not mix well'
' and may result in data loading bottlenecks.'
' Consider setting accelerator=ddp to use num_workers>0'
'num_workers>0, persistent_workers=False, and accelerator=ddp_spawn'
' may result in data loading bottlenecks.'
' Consider setting persistent_workers=True'
' (this is a limitation of Python .spawn() and PyTorch)'
)
else:
rank_zero_warn(
'num_workers>0 and accelerator=ddp_spawn do not mix well'
' and may result in data loading bottlenecks.'
' Consider setting accelerator=ddp to use num_workers>0'
' (this is a limitation of Python .spawn() and PyTorch)'
)

elif dataloader.num_workers == 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting num_workers>0 and persistent_workers=True'
)
else:
elif dataloader.num_workers == 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting accelerator=ddp and set num_workers>0'
' Consider setting num_workers>0 and persistent_workers=True'
)

elif dataloader.num_workers <= 2 and multiprocessing.cpu_count() > 2 and not using_spawn:
num_cpus = multiprocessing.cpu_count()
else:
rank_zero_warn(
f'The dataloader, {name}, does not have many workers which may be a bottleneck.'
' Consider increasing the value of the `num_workers` argument`'
f' (try {num_cpus} which is the number of cpus on this machine)'
f' in the `DataLoader` init to improve performance.'
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting accelerator=ddp and set num_workers>0'
)

elif dataloader.num_workers <= 2 < num_cpus and not using_spawn:
rank_zero_warn(
f'The dataloader, {name}, does not have many workers which may be a bottleneck.'
' Consider increasing the value of the `num_workers` argument`'
f' (try {num_cpus} which is the number of cpus on this machine)'
f' in the `DataLoader` init to improve performance.'
)

def auto_add_sampler(self, dataloader: DataLoader, shuffle: bool) -> DataLoader:

# don't do anything if it's not a dataloader
Expand Down
15 changes: 10 additions & 5 deletions tests/models/test_horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def _run_horovod(trainer_options, on_gpu=False):
# for Horovod, we interpret `gpus` to be set per worker
trainer_options.update(gpus=1 if on_gpu else None)
tutils.reset_seed()
# todo: Find why coverage breaks CI.
# TODO: Find out why coverage breaks CI.
# append = '-a' if '.coverage' in os.listdir(_PROJECT_ROOT) else ''
# str(num_processes), sys.executable, '-m', 'coverage', 'run', '--source', 'pytorch_lightning', append, # noqa E265
# str(num_processes), sys.executable, '-m', 'coverage', 'run', '--source', 'pytorch_lightning', append,
cmdline = [
'horovodrun', '-np',
str(num_processes), sys.executable, TEST_SCRIPT, '--trainer-options',
Expand Down Expand Up @@ -151,9 +151,10 @@ def test_horovod_multi_gpu_grad_by_value(tmpdir):
_run_horovod(trainer_options, on_gpu=True)


# todo: need to be fixed :]
# https://discuss.pytorch.org/t/torch-cuda-amp-vs-nvidia-apex/74994
# Check with (tgaddair) on Horovod issues if this feature is needed
@pytest.mark.skip(reason="Horovod currently doesn't work with Apex") # todo
@pytest.mark.skip(reason="TODO: Horovod currently doesn't work with Apex")
@RunIf(min_gpus=2, skip_windows=True, amp_apex=True, horovod_nccl=True)
def test_horovod_apex(tmpdir):
"""Test Horovod with multi-GPU support using apex amp."""
Expand Down Expand Up @@ -240,6 +241,8 @@ def validation_step(self, batch, *args, **kwargs):
tpipes.run_model_test_without_loggers(trainer_options, model)


# todo: need to be fixed :]
@pytest.mark.skip('TODO: flaky test - Fatal Python error: Aborted')
@RunIf(skip_windows=True, horovod=True)
def test_horovod_multi_optimizer(tmpdir):
model = BasicGAN()
Expand Down Expand Up @@ -272,7 +275,8 @@ def get_optimizer_params(optimizer):
assert get_model_params(model.discriminator) == get_optimizer_params(trainer.optimizers[1])


@pytest.mark.skipif(reason="CI agent.jobstatus=Succeeded: Permission denied")
# todo: need to be fixed :]
@pytest.mark.skip(reason="TODO: CI agent.jobstatus=Succeeded: Permission denied")
@RunIf(skip_windows=True, horovod=True)
def test_result_reduce_horovod(tmpdir):
"""Make sure result logging works with Horovod.
Expand Down Expand Up @@ -322,7 +326,8 @@ def training_epoch_end(self, outputs) -> None:
horovod.run(hvd_test_fn, np=2)


@pytest.mark.skipif(reason="CI agent.jobstatus=Succeeded: Permission denied")
# todo: need to be fixed :]
@pytest.mark.skip(reason="TODO: CI agent.jobstatus=Succeeded: Permission denied")
@RunIf(skip_windows=True, horovod=True, num_gpus=2)
def test_accuracy_metric_horovod():
num_batches = 10
Expand Down

0 comments on commit b85cfbe

Please sign in to comment.