Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed test bugfix #7057

Merged
merged 17 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/test/2node_op_test_multi_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cd ${test_tmp_dir}/$(basename $test_dir)

for device_num in 1 2 4
do
ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose
ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr $_MASTER_ADDR -m unittest discover ${PWD} --failfast --verbose
# use a invalid ibverbs lib to test if falling back to epoll works
ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose
ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr $_MASTER_ADDR -m unittest discover ${PWD} --failfast --verbose
done
93 changes: 52 additions & 41 deletions ci/test/distributed_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,48 +60,46 @@ def find_free_port():
return s.getsockname()[1]


async def spawn_shell_and_check(cmd: str = None):
async def spawn_shell(cmd: str = None):
p = await asyncio.create_subprocess_shell(cmd,)
await p.wait()
assert p.returncode == 0, cmd


async def spawn_shell(cmd: str = None):
async def spawn_shell_ignoring_failure(cmd: str = None):
p = await asyncio.create_subprocess_shell(cmd,)
await p.wait()


async def build_docker_img(remote_host=None, workspace_dir=None):
if remote_host:
assert workspace_dir
await spawn_shell_and_check("rm -f > oneflow-src.zip")
await spawn_shell_and_check("git archive --format zip HEAD > oneflow-src.zip")
await spawn_shell_and_check(
await spawn_shell("rm -f > oneflow-src.zip")
await spawn_shell("git archive --format zip HEAD > oneflow-src.zip")
await spawn_shell(
f"scp oneflow-src.zip {remote_host}:{workspace_dir}/oneflow-src.zip",
)
await spawn_shell_and_check(
await spawn_shell(
f"ssh {remote_host} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src",
)
await spawn_shell_and_check(
await spawn_shell(
f"ssh {remote_host} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh",
)
else:
await spawn_shell_and_check(f"bash docker/ci/test/build.sh")
await spawn_shell(f"bash docker/ci/test/build.sh")


async def create_remote_workspace_dir(
remote_host=None, workspace_dir=None, copy_files=None
):
await spawn_shell_and_check(f"ssh {remote_host} mkdir -p {workspace_dir}")
await spawn_shell(f"ssh {remote_host} mkdir -p {workspace_dir}")
if copy_files is not None:
for path in copy_files:
# Reference: https://stackoverflow.com/a/31278462
if os.path.isdir(path) and path[-1] != "/":
path += "/"
await spawn_shell_and_check(
f"ssh {remote_host} mkdir -p {workspace_dir}/{path}"
)
await spawn_shell_and_check(
await spawn_shell(f"ssh {remote_host} mkdir -p {workspace_dir}/{path}")
await spawn_shell(
f"rsync -azPq --omit-dir-times --no-perms --no-group --copy-links --exclude='__pycache__' {path} {remote_host}:{workspace_dir}/{path}"
)
print("create_remote_workspace_dir done")
Expand All @@ -126,9 +124,17 @@ async def launch_remote_container(
oneflow_python_path=None,
cmd=None,
node_rank=None,
master_addr=None,
):
print("launching remote container at", remote_host)
assert img_tag
multi_client_args = [node_rank, master_addr]
multi_client_arg_has_value = [x is not None for x in multi_client_args]
if any(multi_client_arg_has_value):
assert all(multi_client_arg_has_value)
is_multi_client = True
else:
is_multi_client = False
pythonpath_args = None
if oneflow_wheel_path:
pythonpath_args = ""
Expand All @@ -138,25 +144,28 @@ async def launch_remote_container(
raise ValueError("must have oneflow_wheel_path or oneflow_python_path")
docker_cmd = f"""docker run --privileged -d --network host --shm-size=8g --rm {get_docker_cache_args()} -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo --name {container_name} {pythonpath_args} {img_tag} sleep {survival_time}
"""
await spawn_shell_and_check(f"ssh {remote_host} {docker_cmd}")
await spawn_shell(f"ssh {remote_host} {docker_cmd}")
if oneflow_wheel_path:
whl_basename = os.path.basename(oneflow_wheel_path)
await spawn_shell_and_check(
await spawn_shell(
f"ssh {remote_host} docker exec {container_name} python3 -m pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple"
)
await spawn_shell_and_check(
await spawn_shell(
f"ssh {remote_host} docker exec {container_name} python3 -m pip install {workspace_dir}/{whl_basename}"
)
await spawn_shell(
f"ssh {remote_host} docker exec {container_name} python3 -m oneflow --doctor"
)
if cmd:
if node_rank is not None:
node_rank_args = f"--env NODE_RANK={node_rank}"
if is_multi_client:
multi_client_docker_args = (
# Use _MASTER_ADDR to avoid name conflict with OneFlow's built-in MASTER_ADDR
f"--env NODE_RANK={node_rank} --env _MASTER_ADDR={master_addr}"
)
else:
node_rank_args = ""
multi_client_docker_args = ""
await spawn_shell(
f"ssh {remote_host} docker exec {node_rank_args} {container_name} {cmd}"
f"ssh {remote_host} docker exec {multi_client_docker_args} {container_name} {cmd}"
)


Expand Down Expand Up @@ -318,7 +327,7 @@ async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None):
tmp_dir = tempfile.TemporaryDirectory()
tmp_lib_dir = os.path.join(tmp_dir.name, "libs")
os.mkdir(tmp_lib_dir)
await spawn_shell_and_check(
await spawn_shell(
"""ldd file | grep "=> /" | awk '{print $3}' | xargs -I '{}' cp -v '{}' destination""".replace(
"file", oneflow_internal_path
).replace(
Expand All @@ -331,33 +340,31 @@ async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None):
pathlib.Path(__file__).parent.absolute(), "excludelist"
)
excludelist = open(excludelist_path).read().split("\n")
await spawn_shell_and_check(f"cp {oneflow_internal_path} {tmp_dir.name}")
await spawn_shell(f"cp {oneflow_internal_path} {tmp_dir.name}")

def handle_lib(lib):
if lib in excludelist or "libpython" in lib:
print("excluding", lib)
return spawn_shell_and_check(f"rm {tmp_lib_dir}/{lib}")
return spawn_shell(f"rm {tmp_lib_dir}/{lib}")
else:
print("keeping", lib)
return spawn_shell_and_check(
f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}"
)
return spawn_shell(f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}")

await asyncio.gather(*(handle_lib(lib) for lib in libs))

tmp_oneflow_internal_path = os.path.join(
tmp_dir.name, pathlib.Path(oneflow_internal_path).name
)
print("before fixing .so")
await spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}")
await spawn_shell(f"ldd {tmp_oneflow_internal_path}")
print("fixing .so")
await spawn_shell_and_check(
await spawn_shell(
f"patchelf --set-rpath '$ORIGIN/libs' {tmp_oneflow_internal_path}"
)

await asyncio.gather(
*[
spawn_shell_and_check(
spawn_shell(
f"ssh {remote_host} 'mkdir -p {workspace_dir}/python/oneflow/libs'",
)
for remote_host in remote_hosts
Expand All @@ -366,7 +373,7 @@ def handle_lib(lib):

async def copy_file(path=None, remote_host=None):
relpath = os.path.relpath(path, tmp_dir.name)
await spawn_shell_and_check(
await spawn_shell(
f"scp {path} {remote_host}:{workspace_dir}/python/oneflow/{relpath}",
)

Expand All @@ -382,7 +389,7 @@ async def copy_file(path=None, remote_host=None):
for remote_host in remote_hosts
for f in files
],
spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}"),
spawn_shell(f"ldd {tmp_oneflow_internal_path}"),
)


Expand All @@ -391,8 +398,11 @@ async def remove_containers_by_name(remote_hosts=None, container_name=None):
assert container_name
assert remote_hosts
await asyncio.gather(
*[spawn_shell(f"ssh {remote_host} {rm_cmd}") for remote_host in remote_hosts],
spawn_shell(rm_cmd),
*[
spawn_shell_ignoring_failure(f"ssh {remote_host} {rm_cmd}")
for remote_host in remote_hosts
],
spawn_shell_ignoring_failure(rm_cmd),
)


Expand Down Expand Up @@ -504,9 +514,7 @@ def get_remote_hosts(args):
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell_and_check(
f"ssh -o StrictHostKeyChecking=no {remote_host} true"
)
spawn_shell(f"ssh -o StrictHostKeyChecking=no {remote_host} true")
for remote_host in remote_hosts
],
),
Expand Down Expand Up @@ -545,7 +553,7 @@ def get_remote_hosts(args):
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell_and_check(
spawn_shell(
f"rsync -azPq --omit-dir-times --no-perms --no-group --copy-links --include='*.py' --exclude='*.so' --exclude='__pycache__' --exclude='oneflow/include' --include='*/' --exclude='*' {args.oneflow_python_path} {remote_host}:{workspace_dir}"
)
for remote_host in remote_hosts
Expand All @@ -564,7 +572,7 @@ def get_remote_hosts(args):
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell_and_check(
spawn_shell(
f"rsync -azPq --omit-dir-times --no-perms --no-group {oneflow_wheel_path} {remote_host}:{workspace_dir}"
)
for remote_host in remote_hosts
Expand Down Expand Up @@ -611,7 +619,7 @@ def exit_handler():
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell(
spawn_shell_ignoring_failure(
f"ssh {remote_host} docker run --rm -v {workspace_dir}:/p -w /p busybox chmod -R 777 .",
)
for remote_host in remote_hosts
Expand All @@ -625,7 +633,7 @@ def exit_handler():
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell(
spawn_shell_ignoring_failure(
f"rsync -azPq --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='python' {extra_exclude_args} {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}"
)
for remote_host in remote_hosts
Expand All @@ -638,7 +646,9 @@ def exit_handler():
loop.run_until_complete(
asyncio.gather(
*[
spawn_shell(f"ssh {remote_host} rm -rf {workspace_dir}",)
spawn_shell_ignoring_failure(
f"ssh {remote_host} rm -rf {workspace_dir}",
)
for remote_host in remote_hosts
],
)
Expand Down Expand Up @@ -667,6 +677,7 @@ def exit_handler():
img_tag=img_tag,
cmd=args.cmd,
node_rank=node_rank,
master_addr=this_host,
)
for node_rank, remote_host in enumerate(remote_hosts)
],
Expand Down
8 changes: 4 additions & 4 deletions python/oneflow/framework/env_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ def _FindFreePort():

def HasAllMultiClientEnvVars():
env_var_names = ["MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "RANK", "LOCAL_RANK"]
has_all_env_vars = all([os.getenv(x) for x in env_var_names])
if not has_all_env_vars:
has_at_least_one_env_var = any([os.getenv(x) for x in env_var_names])
assert not has_at_least_one_env_var
env_var_values = [os.getenv(x) for x in env_var_names]
has_no_env_vars = not any(env_var_values)
has_all_env_vars = all(env_var_values)
assert has_no_env_vars or has_all_env_vars, list(zip(env_var_names, env_var_values))
return has_all_env_vars


Expand Down