From 7a2e177151b38ce2138ea3c9a6632c3cbd70188b Mon Sep 17 00:00:00 2001 From: taozhiwei Date: Fri, 15 Aug 2025 15:52:30 +0800 Subject: [PATCH 1/5] let `_get_default_process_group_backend_for_device` support more hardware platforms (#21057) * support more hardware platforms and no longer hard code cuda when call _get_default_process_group_backend_for_device * Apply suggestions from code review --------- Signed-off-by: taozhiwei Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Nicki Skafte Detlefsen Co-authored-by: Jirka Borovec <6035284+Borda@users.noreply.github.com> (cherry picked from commit 119a640e43ee676d8491609f739a31b69857f4fe) --- src/lightning/fabric/utilities/distributed.py | 6 ++++- .../utilities/test_distributed.py | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/lightning/fabric/utilities/distributed.py b/src/lightning/fabric/utilities/distributed.py index ec4eb261f2d3e..500f3a3e2aa92 100644 --- a/src/lightning/fabric/utilities/distributed.py +++ b/src/lightning/fabric/utilities/distributed.py @@ -319,7 +319,11 @@ def _destroy_dist_connection() -> None: def _get_default_process_group_backend_for_device(device: torch.device) -> str: - return "nccl" if device.type == "cuda" else "gloo" + """Return corresponding distributed backend for a given device.""" + device_backend_map = torch.distributed.Backend.default_device_backend_map + if device.type in device_backend_map: + return device_backend_map[device.type] + return "gloo" class _DatasetSamplerWrapper(Dataset): diff --git a/tests/tests_fabric/utilities/test_distributed.py b/tests/tests_fabric/utilities/test_distributed.py index d65eaa810ff4d..51c4b320d5525 100644 --- a/tests/tests_fabric/utilities/test_distributed.py +++ b/tests/tests_fabric/utilities/test_distributed.py @@ -17,6 +17,7 @@ from lightning.fabric.utilities.distributed import ( _destroy_dist_connection, _gather_all_tensors, + _get_default_process_group_backend_for_device, _InfiniteBarrier, _init_dist_connection, _is_dtensor, @@ -243,6 +244,27 @@ def test_init_dist_connection_registers_destruction_handler(_, atexit_mock): atexit_mock.register.assert_not_called() +def test_get_default_process_group_backend_for_device(): + """Test that each device type maps to its correct default process group backend.""" + # register a custom backend for test + torch.utils.rename_privateuse1_backend("pcu") + + def mock_backend(store, group_rank, group_size, timeout): + pass + + torch.distributed.Backend.register_backend( + "pccl", + lambda store, group_rank, group_size, timeout: mock_backend(store, group_rank, group_size, timeout), + devices=["pcu"], + ) + + # test that the default backend is correctly set for each device + devices = [torch.device("cpu"), torch.device("cuda:0"), torch.device("pcu:0")] + backends = ["gloo", "nccl", "pccl"] + for device, backend in zip(devices, backends): + assert _get_default_process_group_backend_for_device(device) == backend + + @RunIf(min_torch="2.4") def test_is_dtensor(monkeypatch): from torch.distributed._tensor import DTensor From 3bdd540b42e4ce6bce75506294e2280ab3d21756 Mon Sep 17 00:00:00 2001 From: Jirka B Date: Wed, 3 Sep 2025 12:37:28 +0200 Subject: [PATCH 2/5] PrivateUse1HooksInterface --- src/lightning/fabric/strategies/ddp.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/lightning/fabric/strategies/ddp.py b/src/lightning/fabric/strategies/ddp.py index ce47e4e403c34..881f4844e6a13 100644 --- a/src/lightning/fabric/strategies/ddp.py +++ b/src/lightning/fabric/strategies/ddp.py @@ -15,6 +15,7 @@ from datetime import timedelta from typing import Any, Literal, Optional, Union +import inspect import torch import torch.distributed from lightning_utilities.core.rank_zero import rank_zero_only as utils_rank_zero_only @@ -156,10 +157,24 @@ def all_reduce( def barrier(self, *args: Any, **kwargs: Any) -> None: if not _distributed_is_initialized(): return - if torch.distributed.get_backend() == "nccl": + backend = torch.distributed.get_backend() + if backend == "nccl": torch.distributed.barrier(device_ids=self._determine_ddp_device_ids()) - else: + return + # For CPU backends (e.g., gloo), recent PyTorch may attempt to resolve an accelerator and crash on CPU-only runs. + try: torch.distributed.barrier() + except RuntimeError as e: + # Handle: "Please register PrivateUse1HooksInterface by `RegisterPrivateUse1HooksInterface` first." + if "PrivateUse1HooksInterface" in str(e): + # Use explicit CPU device if supported in this PyTorch version + if "device" in inspect.signature(torch.distributed.barrier).parameters: + torch.distributed.barrier(device=torch.device("cpu")) + else: + # Older versions shouldn't trigger this path; re-raise to avoid masking other issues + raise + else: + raise @override def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast: From a1ccbb8cf70657cc5a6a4fc918de0b29d32088e8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Sep 2025 10:38:01 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning/fabric/strategies/ddp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning/fabric/strategies/ddp.py b/src/lightning/fabric/strategies/ddp.py index 881f4844e6a13..124f54a83c805 100644 --- a/src/lightning/fabric/strategies/ddp.py +++ b/src/lightning/fabric/strategies/ddp.py @@ -11,11 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import inspect from contextlib import AbstractContextManager, nullcontext from datetime import timedelta from typing import Any, Literal, Optional, Union -import inspect import torch import torch.distributed from lightning_utilities.core.rank_zero import rank_zero_only as utils_rank_zero_only From f6aab1527da4b4ca53a8988ac950979d10e54289 Mon Sep 17 00:00:00 2001 From: Jirka B Date: Wed, 3 Sep 2025 16:05:01 +0200 Subject: [PATCH 4/5] try it --- src/lightning/fabric/strategies/ddp.py | 27 +++++++++++--------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/lightning/fabric/strategies/ddp.py b/src/lightning/fabric/strategies/ddp.py index 2314323e338a3..af182ad7f422f 100644 --- a/src/lightning/fabric/strategies/ddp.py +++ b/src/lightning/fabric/strategies/ddp.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import inspect from contextlib import AbstractContextManager, nullcontext from datetime import timedelta from typing import Any, Literal, Optional, Union @@ -158,24 +157,20 @@ def all_reduce( def barrier(self, *args: Any, **kwargs: Any) -> None: if not _distributed_is_initialized(): return - backend = torch.distributed.get_backend() - if backend == "nccl": + if torch.distributed.get_backend() == "nccl": torch.distributed.barrier(device_ids=self._determine_ddp_device_ids()) - return - # For CPU backends (e.g., gloo), recent PyTorch may attempt to resolve an accelerator and crash on CPU-only runs. - try: - torch.distributed.barrier() - except RuntimeError as e: - # Handle: "Please register PrivateUse1HooksInterface by `RegisterPrivateUse1HooksInterface` first." - if "PrivateUse1HooksInterface" in str(e): - # Use explicit CPU device if supported in this PyTorch version - if "device" in inspect.signature(torch.distributed.barrier).parameters: - torch.distributed.barrier(device=torch.device("cpu")) + else: + # Handle PyTorch bug where barrier() fails on CPU with "PrivateUse1HooksInterface" error + try: + torch.distributed.barrier() + except RuntimeError as e: + if "PrivateUse1HooksInterface" in str(e): + # Fallback: Use all_reduce as barrier - all processes must participate + # This achieves the same synchronization effect as barrier() + dummy_tensor = torch.tensor(0.0, device=self.root_device) + torch.distributed.all_reduce(dummy_tensor) else: - # Older versions shouldn't trigger this path; re-raise to avoid masking other issues raise - else: - raise @override def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast: From f9a6728b6f212f85f6242dbcc0026d707b7a85bb Mon Sep 17 00:00:00 2001 From: Jirka B Date: Wed, 3 Sep 2025 17:51:52 +0200 Subject: [PATCH 5/5] chlog --- src/lightning/fabric/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lightning/fabric/CHANGELOG.md b/src/lightning/fabric/CHANGELOG.md index 9f55de94d9135..b4611e165f917 100644 --- a/src/lightning/fabric/CHANGELOG.md +++ b/src/lightning/fabric/CHANGELOG.md @@ -22,7 +22,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Changed -- +- let `_get_default_process_group_backend_for_device` support more hardware platforms ( + [#21057](https://github.com/Lightning-AI/pytorch-lightning/pull/21057), [#21093](https://github.com/Lightning-AI/pytorch-lightning/pull/21093)) ### Fixed