diff --git a/ci/test/2node_op_test_multi_client.sh b/ci/test/2node_op_test_multi_client.sh index 33efe6c8ce3..148ae2fe374 100755 --- a/ci/test/2node_op_test_multi_client.sh +++ b/ci/test/2node_op_test_multi_client.sh @@ -17,7 +17,7 @@ cd ${test_tmp_dir}/$(basename $test_dir) for device_num in 1 2 4 do - ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose + ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr $_MASTER_ADDR -m unittest discover ${PWD} --failfast --verbose # use a invalid ibverbs lib to test if falling back to epoll works - ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose + ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr $_MASTER_ADDR -m unittest discover ${PWD} --failfast --verbose done diff --git a/ci/test/distributed_run.py b/ci/test/distributed_run.py index 4002befd658..55e97d9edde 100644 --- a/ci/test/distributed_run.py +++ b/ci/test/distributed_run.py @@ -60,13 +60,13 @@ def find_free_port(): return s.getsockname()[1] -async def spawn_shell_and_check(cmd: str = None): +async def spawn_shell(cmd: str = None): p = await asyncio.create_subprocess_shell(cmd,) await p.wait() assert p.returncode == 0, cmd -async def spawn_shell(cmd: str = None): +async def spawn_shell_ignoring_failure(cmd: str = None): p = await asyncio.create_subprocess_shell(cmd,) await p.wait() @@ -74,34 +74,32 @@ async def spawn_shell(cmd: str = None): async def build_docker_img(remote_host=None, workspace_dir=None): if remote_host: assert workspace_dir - await spawn_shell_and_check("rm -f > oneflow-src.zip") - await spawn_shell_and_check("git archive --format zip HEAD > oneflow-src.zip") - await spawn_shell_and_check( + await spawn_shell("rm -f > oneflow-src.zip") + await spawn_shell("git archive --format zip HEAD > oneflow-src.zip") + await spawn_shell( f"scp oneflow-src.zip {remote_host}:{workspace_dir}/oneflow-src.zip", ) - await spawn_shell_and_check( + await spawn_shell( f"ssh {remote_host} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src", ) - await spawn_shell_and_check( + await spawn_shell( f"ssh {remote_host} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh", ) else: - await spawn_shell_and_check(f"bash docker/ci/test/build.sh") + await spawn_shell(f"bash docker/ci/test/build.sh") async def create_remote_workspace_dir( remote_host=None, workspace_dir=None, copy_files=None ): - await spawn_shell_and_check(f"ssh {remote_host} mkdir -p {workspace_dir}") + await spawn_shell(f"ssh {remote_host} mkdir -p {workspace_dir}") if copy_files is not None: for path in copy_files: # Reference: https://stackoverflow.com/a/31278462 if os.path.isdir(path) and path[-1] != "/": path += "/" - await spawn_shell_and_check( - f"ssh {remote_host} mkdir -p {workspace_dir}/{path}" - ) - await spawn_shell_and_check( + await spawn_shell(f"ssh {remote_host} mkdir -p {workspace_dir}/{path}") + await spawn_shell( f"rsync -azPq --omit-dir-times --no-perms --no-group --copy-links --exclude='__pycache__' {path} {remote_host}:{workspace_dir}/{path}" ) print("create_remote_workspace_dir done") @@ -126,9 +124,17 @@ async def launch_remote_container( oneflow_python_path=None, cmd=None, node_rank=None, + master_addr=None, ): print("launching remote container at", remote_host) assert img_tag + multi_client_args = [node_rank, master_addr] + multi_client_arg_has_value = [x is not None for x in multi_client_args] + if any(multi_client_arg_has_value): + assert all(multi_client_arg_has_value) + is_multi_client = True + else: + is_multi_client = False pythonpath_args = None if oneflow_wheel_path: pythonpath_args = "" @@ -138,25 +144,28 @@ async def launch_remote_container( raise ValueError("must have oneflow_wheel_path or oneflow_python_path") docker_cmd = f"""docker run --privileged -d --network host --shm-size=8g --rm {get_docker_cache_args()} -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo --name {container_name} {pythonpath_args} {img_tag} sleep {survival_time} """ - await spawn_shell_and_check(f"ssh {remote_host} {docker_cmd}") + await spawn_shell(f"ssh {remote_host} {docker_cmd}") if oneflow_wheel_path: whl_basename = os.path.basename(oneflow_wheel_path) - await spawn_shell_and_check( + await spawn_shell( f"ssh {remote_host} docker exec {container_name} python3 -m pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple" ) - await spawn_shell_and_check( + await spawn_shell( f"ssh {remote_host} docker exec {container_name} python3 -m pip install {workspace_dir}/{whl_basename}" ) await spawn_shell( f"ssh {remote_host} docker exec {container_name} python3 -m oneflow --doctor" ) if cmd: - if node_rank is not None: - node_rank_args = f"--env NODE_RANK={node_rank}" + if is_multi_client: + multi_client_docker_args = ( + # Use _MASTER_ADDR to avoid name conflict with OneFlow's built-in MASTER_ADDR + f"--env NODE_RANK={node_rank} --env _MASTER_ADDR={master_addr}" + ) else: - node_rank_args = "" + multi_client_docker_args = "" await spawn_shell( - f"ssh {remote_host} docker exec {node_rank_args} {container_name} {cmd}" + f"ssh {remote_host} docker exec {multi_client_docker_args} {container_name} {cmd}" ) @@ -318,7 +327,7 @@ async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None): tmp_dir = tempfile.TemporaryDirectory() tmp_lib_dir = os.path.join(tmp_dir.name, "libs") os.mkdir(tmp_lib_dir) - await spawn_shell_and_check( + await spawn_shell( """ldd file | grep "=> /" | awk '{print $3}' | xargs -I '{}' cp -v '{}' destination""".replace( "file", oneflow_internal_path ).replace( @@ -331,17 +340,15 @@ async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None): pathlib.Path(__file__).parent.absolute(), "excludelist" ) excludelist = open(excludelist_path).read().split("\n") - await spawn_shell_and_check(f"cp {oneflow_internal_path} {tmp_dir.name}") + await spawn_shell(f"cp {oneflow_internal_path} {tmp_dir.name}") def handle_lib(lib): if lib in excludelist or "libpython" in lib: print("excluding", lib) - return spawn_shell_and_check(f"rm {tmp_lib_dir}/{lib}") + return spawn_shell(f"rm {tmp_lib_dir}/{lib}") else: print("keeping", lib) - return spawn_shell_and_check( - f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}" - ) + return spawn_shell(f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}") await asyncio.gather(*(handle_lib(lib) for lib in libs)) @@ -349,15 +356,15 @@ def handle_lib(lib): tmp_dir.name, pathlib.Path(oneflow_internal_path).name ) print("before fixing .so") - await spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}") + await spawn_shell(f"ldd {tmp_oneflow_internal_path}") print("fixing .so") - await spawn_shell_and_check( + await spawn_shell( f"patchelf --set-rpath '$ORIGIN/libs' {tmp_oneflow_internal_path}" ) await asyncio.gather( *[ - spawn_shell_and_check( + spawn_shell( f"ssh {remote_host} 'mkdir -p {workspace_dir}/python/oneflow/libs'", ) for remote_host in remote_hosts @@ -366,7 +373,7 @@ def handle_lib(lib): async def copy_file(path=None, remote_host=None): relpath = os.path.relpath(path, tmp_dir.name) - await spawn_shell_and_check( + await spawn_shell( f"scp {path} {remote_host}:{workspace_dir}/python/oneflow/{relpath}", ) @@ -382,7 +389,7 @@ async def copy_file(path=None, remote_host=None): for remote_host in remote_hosts for f in files ], - spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}"), + spawn_shell(f"ldd {tmp_oneflow_internal_path}"), ) @@ -391,8 +398,11 @@ async def remove_containers_by_name(remote_hosts=None, container_name=None): assert container_name assert remote_hosts await asyncio.gather( - *[spawn_shell(f"ssh {remote_host} {rm_cmd}") for remote_host in remote_hosts], - spawn_shell(rm_cmd), + *[ + spawn_shell_ignoring_failure(f"ssh {remote_host} {rm_cmd}") + for remote_host in remote_hosts + ], + spawn_shell_ignoring_failure(rm_cmd), ) @@ -504,9 +514,7 @@ def get_remote_hosts(args): loop.run_until_complete( asyncio.gather( *[ - spawn_shell_and_check( - f"ssh -o StrictHostKeyChecking=no {remote_host} true" - ) + spawn_shell(f"ssh -o StrictHostKeyChecking=no {remote_host} true") for remote_host in remote_hosts ], ), @@ -545,7 +553,7 @@ def get_remote_hosts(args): loop.run_until_complete( asyncio.gather( *[ - spawn_shell_and_check( + spawn_shell( f"rsync -azPq --omit-dir-times --no-perms --no-group --copy-links --include='*.py' --exclude='*.so' --exclude='__pycache__' --exclude='oneflow/include' --include='*/' --exclude='*' {args.oneflow_python_path} {remote_host}:{workspace_dir}" ) for remote_host in remote_hosts @@ -564,7 +572,7 @@ def get_remote_hosts(args): loop.run_until_complete( asyncio.gather( *[ - spawn_shell_and_check( + spawn_shell( f"rsync -azPq --omit-dir-times --no-perms --no-group {oneflow_wheel_path} {remote_host}:{workspace_dir}" ) for remote_host in remote_hosts @@ -611,7 +619,7 @@ def exit_handler(): loop.run_until_complete( asyncio.gather( *[ - spawn_shell( + spawn_shell_ignoring_failure( f"ssh {remote_host} docker run --rm -v {workspace_dir}:/p -w /p busybox chmod -R 777 .", ) for remote_host in remote_hosts @@ -625,7 +633,7 @@ def exit_handler(): loop.run_until_complete( asyncio.gather( *[ - spawn_shell( + spawn_shell_ignoring_failure( f"rsync -azPq --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='python' {extra_exclude_args} {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}" ) for remote_host in remote_hosts @@ -638,7 +646,9 @@ def exit_handler(): loop.run_until_complete( asyncio.gather( *[ - spawn_shell(f"ssh {remote_host} rm -rf {workspace_dir}",) + spawn_shell_ignoring_failure( + f"ssh {remote_host} rm -rf {workspace_dir}", + ) for remote_host in remote_hosts ], ) @@ -667,6 +677,7 @@ def exit_handler(): img_tag=img_tag, cmd=args.cmd, node_rank=node_rank, + master_addr=this_host, ) for node_rank, remote_host in enumerate(remote_hosts) ], diff --git a/python/oneflow/framework/env_util.py b/python/oneflow/framework/env_util.py index e6d6d0f629b..fa5461f002f 100644 --- a/python/oneflow/framework/env_util.py +++ b/python/oneflow/framework/env_util.py @@ -337,10 +337,10 @@ def _FindFreePort(): def HasAllMultiClientEnvVars(): env_var_names = ["MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "RANK", "LOCAL_RANK"] - has_all_env_vars = all([os.getenv(x) for x in env_var_names]) - if not has_all_env_vars: - has_at_least_one_env_var = any([os.getenv(x) for x in env_var_names]) - assert not has_at_least_one_env_var + env_var_values = [os.getenv(x) for x in env_var_names] + has_no_env_vars = not any(env_var_values) + has_all_env_vars = all(env_var_values) + assert has_no_env_vars or has_all_env_vars, list(zip(env_var_names, env_var_values)) return has_all_env_vars