Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cuda12 timeout #54540

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/auto_parallel/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
py_test_modules(test_optimization_tuner_api MODULES
test_optimization_tuner_api)
set_tests_properties(test_optimization_tuner_api
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80)
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100)
py_test_modules(test_converter MODULES test_converter)
set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE"
TIMEOUT 50)
Expand Down
2 changes: 1 addition & 1 deletion test/legacy_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1304,4 +1304,4 @@ set_tests_properties(test_reduce_op_static_build PROPERTIES TIMEOUT 500)
set_tests_properties(test_sync_batch_norm_op_static_build
PROPERTIES LABELS "RUN_TYPE=DIST")
set_tests_properties(test_sync_batch_norm_op_static_build PROPERTIES TIMEOUT
120)
160)
5 changes: 2 additions & 3 deletions test/legacy_test/dist_sharding_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
# limitations under the License.

import os
import pickle
import sys

from dist_mnist import cnn_model # noqa: F401
from test_dist_base import dump_output

import paddle
from paddle import fluid
Expand Down Expand Up @@ -83,7 +82,7 @@ def runtime_main():
)

out_losses = []
sys.stdout.buffer.write(pickle.dumps(out_losses))
dump_output(out_losses)


if __name__ == "__main__":
Expand Down
68 changes: 55 additions & 13 deletions test/legacy_test/test_dist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ def print_to_out(out_losses):
sys.stdout.buffer.write(pickle.dumps(out_losses))


def dump_output(x):
path = os.environ['DUMP_FILE']
with open(path, 'wb') as f:
pickle.dump(x, f)


def load_and_remove(path):
with open(path, 'rb') as f:
out = pickle.load(f)
os.remove(path)
return out


def print_to_err(class_name, log_str):
localtime = time.asctime(time.localtime(time.time()))
print_str = localtime + "\t" + class_name + "\t" + log_str
Expand Down Expand Up @@ -210,7 +223,7 @@ def run_pipeline_trainer(self, args):
data_loader.reset()
print_to_err(type(self).__name__, "trainer run finished")

sys.stdout.buffer.write(pickle.dumps(out_losses))
dump_output(out_losses)

def run_use_fleet_api_20_trainer(self, args):
"""
Expand Down Expand Up @@ -291,7 +304,7 @@ def get_data():
print_to_err(type(self).__name__, "trainer run finished")
print_to_err(type(self).__name__, f"dist losses: {out_losses}")

sys.stdout.buffer.write(pickle.dumps(out_losses))
dump_output(out_losses)

def run_use_fleet_api_trainer(self, args):
assert args.update_method == "nccl2" or "bkcl"
Expand Down Expand Up @@ -386,7 +399,7 @@ def get_data():
print_to_err(type(self).__name__, "run step %d finished" % i)
print_to_err(type(self).__name__, "trainer run finished")

sys.stdout.buffer.write(pickle.dumps(out_losses))
dump_output(out_losses)

if args.save_model:
model_save_dir = "/tmp"
Expand Down Expand Up @@ -628,7 +641,7 @@ def get_data():
# print_to_err(type(self).__name__, "out_losses")

sys.stdout = old_stdout
print_to_out(out_losses)
dump_output(out_losses)


class TestParallelDyGraphRunnerBase:
Expand Down Expand Up @@ -751,7 +764,7 @@ def run_trainer(self, args):
opt.minimize(loss)
if not args.accumulate_gradient:
model.clear_gradients()
print_to_out(out_losses)
dump_output(out_losses)

def run_trainer_with_spawn(self, args):
# 1. enable dygraph
Expand Down Expand Up @@ -836,7 +849,7 @@ def run_use_fleet_api_trainer(self, args):
opt.step()
if not args.accumulate_gradient:
opt.clear_grad()
print_to_out(out_losses)
dump_output(out_losses)


def runtime_main(test_class):
Expand Down Expand Up @@ -1140,6 +1153,9 @@ def _run_local(
cmd += " --find_unused_parameters"

env_local.update(envs)
cur_pid = os.getpid()
dump_file = f"out_data_local_{cur_pid}.pickled"
env_local["DUMP_FILE"] = dump_file
print(f"local_cmd: {cmd}, env: {env_local}")

if check_error_log:
Expand All @@ -1165,9 +1181,8 @@ def _run_local(
err_log.close()

sys.stderr.write('local_stderr: %s\n' % local_err)
sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))

return pickle.loads(local_out)
return load_and_remove(dump_file)

def _run_local_gloo(
self,
Expand Down Expand Up @@ -1246,6 +1261,14 @@ def _run_cluster(self, model, envs, check_error_log, log_name):
env0.update(envs)
env1.update(envs)

cur_pid = os.getpid()
dump_files = [
f'./out_data_0_{cur_pid}.pickled',
f'./out_data_1_{cur_pid}.pickled',
]
env0["DUMP_FILE"] = dump_files[0]
env1["DUMP_FILE"] = dump_files[1]

print(f"tr0_cmd: {tr0_cmd}, env: {env0}")
print(f"tr1_cmd: {tr1_cmd}, env: {env1}")

Expand Down Expand Up @@ -1293,7 +1316,7 @@ def _run_cluster(self, model, envs, check_error_log, log_name):
ps0.terminate()
ps1.terminate()

return pickle.loads(tr0_out), pickle.loads(tr1_out)
return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])

def _get_gloo_trainer_cmd(
self, model, ep, update_method, trainer_id, trainer_num
Expand Down Expand Up @@ -1480,13 +1503,19 @@ def _run_cluster_gloo(

procs = []
pipes = []
dump_files = []
cur_pid = os.getpid()
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_gloo_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num
)
tr_env.update(envs)
tr_env["GLOG_vmodule"] = 'gloo_context=4'
tr_env["GLOG_v"] = '3'

dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print(
"use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env
Expand Down Expand Up @@ -1522,13 +1551,15 @@ def _run_cluster_gloo(
if trainer_num == 1:
if check_error_log:
print("outs[0]:", outs[0])
return pickle.loads(outs[0])
return load_and_remove(dump_files[0])

else:
if check_error_log:
print("outs[0]:", outs[0])
print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1])
return load_and_remove(dump_files[0]), load_and_remove(
dump_files[1]
)

def _run_cluster_nccl2(
self, model, envs, update_method, check_error_log, log_name
Expand Down Expand Up @@ -1556,11 +1587,16 @@ def _run_cluster_nccl2(

procs = []
pipes = []
cur_pid = os.getpid()
dump_files = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num
)
tr_env.update(envs)
dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print(
"use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env
Expand Down Expand Up @@ -1597,7 +1633,7 @@ def _run_cluster_nccl2(
print("outs[0]:", outs[0])
print("outs[1]:", outs[1])

return pickle.loads(outs[0]), pickle.loads(outs[1])
return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])

def _run_pipeline(self, model, envs, check_error_log, log_name):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
Expand All @@ -1608,6 +1644,8 @@ def _run_pipeline(self, model, envs, check_error_log, log_name):

procs = []
pipes = []
cur_pid = os.getpid()
dump_files = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num
Expand All @@ -1617,6 +1655,10 @@ def _run_pipeline(self, model, envs, check_error_log, log_name):
tr_env['NCCL_SHM_DISABLE'] = '1'
tr_env['FLAGS_selected_gpus'] = str(i)
tr_env['FLAGS_cudnn_deterministic'] = '0'

dump_file = f'./out_data_{i}_{cur_pid}.pickled'
dump_files.append(dump_file)
tr_env["DUMP_FILE"] = dump_file
print(f"tr_cmd:{tr_cmd}, env: {tr_env}")

path = os.path.join(self.temp_dir.name + f"tr{i}_err.log")
Expand Down Expand Up @@ -1646,7 +1688,7 @@ def _run_pipeline(self, model, envs, check_error_log, log_name):
if check_error_log:
print("outs[0]:", outs[0])
print("outs[1]:", outs[1])
return pickle.loads(outs[0]), pickle.loads(outs[1])
return load_and_remove(dump_files[0]), load_and_remove(dump_files[1])

def _get_required_envs(self, check_error_log=False, need_envs={}):
# TODO(typhoonzero): should auto adapt GPU count on the machine.
Expand Down