Skip to content

Commit

Permalink
[LAUNCH] no endpoints env in dynamic mode (#54636)
Browse files Browse the repository at this point in the history
* no endpoints in dy mode

* fix fleet api inconsistent
  • Loading branch information
kuizhiqing authored Jun 19, 2023
1 parent 855650e commit cf515d9
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 19 deletions.
13 changes: 0 additions & 13 deletions python/paddle/distributed/fleet/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,7 @@ def init(

self.strategy_compiler = StrategyCompiler()

if self._role_maker._is_non_distributed() and self._is_collective:
if paddle.framework.core.is_compiled_with_cuda():
gpus_num = paddle.framework.core.get_cuda_device_count()
if gpus_num != 1:
raise ValueError(
"CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program."
)

if in_dynamic_mode():
if self.worker_num() == 1:
# if worker_num is 1, should construct default topology & hcg
self._topology = tp.CommunicateTopology()
self._hcg = tp.HybridCommunicateGroup(self._topology)
return
if parallel_helper._is_parallel_ctx_initialized():
logger.warning(
"The dygraph parallel environment has been initialized."
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def forward(self, x):
fleet_env = fleet.fleet

assert model is not None, "model should not be None"
if fleet_env.worker_num() <= 1:
if paddle.distributed.get_world_size() <= 1:
return model

amp_enable = False
Expand Down
8 changes: 6 additions & 2 deletions python/paddle/distributed/launch/controllers/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ def _build_pod_with_args(self):
"PADDLE_LOCAL_RANK": f"{i}",
"PADDLE_NNODES": f"{len(ips)}",
# compatible env
"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints),
"PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset],
"PADDLE_TRAINER_ID": f"{i + rank_offset}",
"PADDLE_TRAINERS_NUM": f"{len(job_endpoints)}",
"PADDLE_RANK_IN_NODE": str(i),
}
if len(",".join(job_endpoints)) < 120 * 1024:
e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)})

if self._tuner_run_mode is not None:
e.update(
{
Expand Down Expand Up @@ -213,12 +215,14 @@ def _build_pod_with_master(self):
"PADDLE_LOCAL_RANK": f"{i}",
"PADDLE_NNODES": f"{self.job.replicas}",
# compatible env
"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints),
"PADDLE_CURRENT_ENDPOINT": endpoints[i],
"PADDLE_TRAINER_ID": f"{i + rank_offset}",
"PADDLE_TRAINERS_NUM": f"{global_size}",
"PADDLE_RANK_IN_NODE": str(i),
}
if len(",".join(job_endpoints)) < 120 * 1024:
e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)})

if self._tuner_run_mode is not None:
e.update(
{
Expand Down
11 changes: 9 additions & 2 deletions python/paddle/distributed/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
_set_group_map_by_name,
_valid_backend_list,
)
from paddle.distributed.communication.group import _add_new_group
from paddle.distributed.communication.group import (
_add_new_group,
_get_global_group,
is_initialized,
)
from paddle.distributed.fleet.base.private_helper_function import ( # noqa: F401
wait_server_ready,
)
Expand Down Expand Up @@ -1017,7 +1021,6 @@ def train():
_check_var_exists("PADDLE_TRAINER_ID")
_check_var_exists("PADDLE_CURRENT_ENDPOINT")
_check_var_exists("PADDLE_TRAINERS_NUM")
_check_var_exists("PADDLE_TRAINER_ENDPOINTS")

# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
Expand Down Expand Up @@ -1242,6 +1245,10 @@ def get_world_size(group=None):
print("The world_size is %d" % dist.get_world_size())
# The world_size is 1
"""
if in_dynamic_mode() and (group is None):
if is_initialized():
group = _get_global_group()

if in_dynamic_mode() and group:
return group.world_size

Expand Down
2 changes: 1 addition & 1 deletion test/legacy_test/test_spawn_and_init_parallel_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TestInitParallelEnv(unittest.TestCase):
def test_check_env_failed(self):
os.environ['FLAGS_selected_gpus'] = '0'
os.environ['PADDLE_TRAINER_ID'] = '0'
os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170'
# os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170'
os.environ['PADDLE_TRAINERS_NUM'] = '2'
with self.assertRaises(ValueError):
dist.init_parallel_env()
Expand Down

0 comments on commit cf515d9

Please sign in to comment.