diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index de003916b7d255..9f6422b34a49e2 100755 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -272,20 +272,7 @@ def init( self.strategy_compiler = StrategyCompiler() - if self._role_maker._is_non_distributed() and self._is_collective: - if paddle.framework.core.is_compiled_with_cuda(): - gpus_num = paddle.framework.core.get_cuda_device_count() - if gpus_num != 1: - raise ValueError( - "CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program." - ) - if in_dynamic_mode(): - if self.worker_num() == 1: - # if worker_num is 1, should construct default topology & hcg - self._topology = tp.CommunicateTopology() - self._hcg = tp.HybridCommunicateGroup(self._topology) - return if parallel_helper._is_parallel_ctx_initialized(): logger.warning( "The dygraph parallel environment has been initialized." diff --git a/python/paddle/distributed/fleet/model.py b/python/paddle/distributed/fleet/model.py index af230e28aca6f8..e7cd0b33e9d16d 100755 --- a/python/paddle/distributed/fleet/model.py +++ b/python/paddle/distributed/fleet/model.py @@ -85,7 +85,7 @@ def forward(self, x): fleet_env = fleet.fleet assert model is not None, "model should not be None" - if fleet_env.worker_num() <= 1: + if paddle.distributed.get_world_size() <= 1: return model amp_enable = False diff --git a/python/paddle/distributed/launch/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py index 6ab47b8ba54da0..bb938dd5c1f1d8 100644 --- a/python/paddle/distributed/launch/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -120,12 +120,14 @@ def _build_pod_with_args(self): "PADDLE_LOCAL_RANK": f"{i}", "PADDLE_NNODES": f"{len(ips)}", # compatible env - "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), "PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset], "PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINERS_NUM": f"{len(job_endpoints)}", "PADDLE_RANK_IN_NODE": str(i), } + if len(",".join(job_endpoints)) < 120 * 1024: + e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)}) + if self._tuner_run_mode is not None: e.update( { @@ -213,12 +215,14 @@ def _build_pod_with_master(self): "PADDLE_LOCAL_RANK": f"{i}", "PADDLE_NNODES": f"{self.job.replicas}", # compatible env - "PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints), "PADDLE_CURRENT_ENDPOINT": endpoints[i], "PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINERS_NUM": f"{global_size}", "PADDLE_RANK_IN_NODE": str(i), } + if len(",".join(job_endpoints)) < 120 * 1024: + e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)}) + if self._tuner_run_mode is not None: e.update( { diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index ebbcbdf1c7c875..1d5d07f01cbd69 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -38,7 +38,11 @@ _set_group_map_by_name, _valid_backend_list, ) -from paddle.distributed.communication.group import _add_new_group +from paddle.distributed.communication.group import ( + _add_new_group, + _get_global_group, + is_initialized, +) from paddle.distributed.fleet.base.private_helper_function import ( # noqa: F401 wait_server_ready, ) @@ -1017,7 +1021,6 @@ def train(): _check_var_exists("PADDLE_TRAINER_ID") _check_var_exists("PADDLE_CURRENT_ENDPOINT") _check_var_exists("PADDLE_TRAINERS_NUM") - _check_var_exists("PADDLE_TRAINER_ENDPOINTS") # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, @@ -1242,6 +1245,10 @@ def get_world_size(group=None): print("The world_size is %d" % dist.get_world_size()) # The world_size is 1 """ + if in_dynamic_mode() and (group is None): + if is_initialized(): + group = _get_global_group() + if in_dynamic_mode() and group: return group.world_size diff --git a/test/legacy_test/test_spawn_and_init_parallel_env.py b/test/legacy_test/test_spawn_and_init_parallel_env.py index a9e3e7f8f672d8..290d48d72c9113 100644 --- a/test/legacy_test/test_spawn_and_init_parallel_env.py +++ b/test/legacy_test/test_spawn_and_init_parallel_env.py @@ -37,7 +37,7 @@ class TestInitParallelEnv(unittest.TestCase): def test_check_env_failed(self): os.environ['FLAGS_selected_gpus'] = '0' os.environ['PADDLE_TRAINER_ID'] = '0' - os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170' + # os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170' os.environ['PADDLE_TRAINERS_NUM'] = '2' with self.assertRaises(ValueError): dist.init_parallel_env()