diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index 482dc9cb1f3f6..04f01714b371b 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -5,6 +5,7 @@ if(WITH_CUSTOM_DEVICE) "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") + list(REMOVE_ITEM TEST_OPS "test_collective_process_group_xccl") foreach(TEST_OP ${TEST_OPS}) py_test(${TEST_OP} SRCS ${TEST_OP}.py) endforeach() diff --git a/python/paddle/fluid/tests/custom_runtime/custom_device_multi_process_collective.py b/python/paddle/fluid/tests/custom_runtime/custom_device_multi_process_collective.py new file mode 100644 index 0000000000000..903cc9c7899ed --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/custom_device_multi_process_collective.py @@ -0,0 +1,42 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time + + +def train(prefix): + selected_accelerators = os.getenv("FLAGS_selected_accelerators") + selected_custom_devices = os.getenv("FLAGS_selected_custom_cpus") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + worker_endpoints = worker_endpoints_env + trainers_num = len(worker_endpoints.split(',')) + device_ids = os.getenv("PADDLE_WORLD_DEVICE_IDS") + current_device_id = os.getenv("PADDLE_LOCAL_DEVICE_IDS") + + details = "selected_accelerators:{} selected_custom_devices:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{} device_ids:{} device_id:{}"\ + .format(selected_accelerators, selected_custom_devices, worker_endpoints, trainers_num, current_endpoint,trainer_id,device_ids, current_device_id) + + print(details) + with open("multi_process_{}.check_{}.log".format(prefix, trainer_id), + "w") as f: + f.write(details) + + +if __name__ == '__main__': + prefix = sys.argv[1] + train(prefix) diff --git a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py new file mode 100644 index 0000000000000..cba032241fb22 --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py @@ -0,0 +1,241 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import random +import numpy as np +import os +import shutil + +import paddle +from paddle.fluid import core +from datetime import timedelta +import paddle.fluid.core as core +from paddle.fluid.framework import _test_eager_guard +from paddle.fluid.dygraph.parallel import ParallelEnv + + +def init_process_group(strategy=None): + nranks = ParallelEnv().nranks + rank = ParallelEnv().local_rank + is_master = True if rank == 0 else False + store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) + pg_group = core.ProcessGroupCustom( + store, rank, nranks, + paddle.CustomPlace(ParallelEnv().device_type, + ParallelEnv().device_id)) + + return pg_group + + +class TestProcessGroupFp32(unittest.TestCase): + + def setUp(self): + paddle.seed(2022) + random.seed(2022) + np.random.seed(2022) + self.config() + + def config(self): + self.dtype = "float32" + self.shape = (2, 10, 5) + + def test_create_process_group_xccl(self): + with _test_eager_guard(): + paddle.set_device('custom_cpu:%d' % + paddle.distributed.ParallelEnv().dev_id) + + pg = init_process_group() + + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = pg.allreduce(tensor_x) + task.wait() + # assert np.array_equal(tensor_x, sum_result) + else: + task = pg.allreduce(tensor_y) + task.wait() + # assert np.array_equal(tensor_y, sum_result) + + print("test allreduce sum api ok") + + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = pg.allreduce(tensor_x, core.ReduceOp.MAX) + task.wait() + # assert np.array_equal(tensor_x, max_result) + else: + task = pg.allreduce(tensor_y, core.ReduceOp.MAX) + task.wait() + # assert np.array_equal(tensor_y, max_result) + + print("test allreduce max api ok") + + # test broadcast + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if pg.rank() == 0: + task = pg.broadcast(tensor_x, 0) + task.synchronize() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + assert task.is_completed() + # assert np.array_equal(broadcast_result, tensor_x) + else: + task = pg.broadcast(tensor_y, 0) + task.synchronize() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + assert task.is_completed() + # assert np.array_equal(broadcast_result, tensor_y) + + print("test broadcast api ok") + + # test barrier + # rank 0 + if pg.rank() == 0: + task = pg.barrier() + task.wait() + # rank 1 + else: + task = pg.barrier() + task.wait() + + print("test barrier api ok\n") + return + + # test allgather + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + out_shape = list(self.shape) + out_shape[0] *= 2 + out = np.random.random(out_shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + # rank 1 + else: + task = pg.all_gather(tensor_y, tensor_out) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2], + [out_shape[0]]) + # assert np.array_equal(tensor_x, out_1) + # assert np.array_equal(tensor_y, out_2) + print("test allgather api ok\n") + + # test alltoall + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + out1 = np.random.random(self.shape).astype(self.dtype) + out2 = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + tensor_out1 = paddle.to_tensor(out1) + tensor_out2 = paddle.to_tensor(out2) + raw_tensor_x_2 = paddle.slice(tensor_x, [0], [self.shape[0] // 2], + [self.shape[0]]) + raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0], + [self.shape[0] // 2]) + if pg.rank() == 0: + task = pg.alltoall(tensor_x, tensor_out1) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + # rank 1 + else: + task = pg.alltoall(tensor_y, tensor_out2) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + out1_2 = paddle.slice(tensor_out1, [0], [self.shape[0] // 2], + [self.shape[0]]) + out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) + # if pg.rank() == 0: + # assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) + # else: + # assert np.array_equal(out2_1, raw_tensor_x_2) + print("test alltoall api ok\n") + + # test Reduce + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = pg.reduce(tensor_x, 0) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + # rank 1 + else: + task = pg.reduce(tensor_y, 0) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + # if pg.rank() == 0: + # assert np.array_equal(tensor_x, sum_result) + print("test reduce sum api ok\n") + + # test Scatter + # rank 0 + in_shape = list(self.shape) + in_shape[0] *= 2 + x = np.random.random(in_shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + # rank 1 + else: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + # paddle.fluid.core._custom_device_synchronize("custom_cpu", -1) + out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) + out2 = paddle.slice(tensor_x, [0], [self.shape[0]], + [self.shape[0] * 2]) + # if pg.rank() == 0: + # assert np.array_equal(tensor_y, out1) + # else: + # assert np.array_equal(tensor_y, out2) + print("test scatter api ok\n") + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py new file mode 100644 index 0000000000000..f2e22a292fed4 --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py @@ -0,0 +1,154 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import os +import sys +import copy +import subprocess +import time + + +def start_local_trainers(cluster, + pod, + training_script, + training_script_args, + eager_mode=True, + log_dir=None): + from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + + current_env = copy.copy(os.environ.copy()) + #paddle broadcast ncclUniqueId use socket, and + #proxy maybe make trainers unreachable, so delete them. + #if we set them to "", grpc will log error message "bad uri" + #so just delete them. + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + + os.system("rm -rf log && mkdir -p log") + for idx, t in enumerate(pod.trainers): + proc_env = { + "FLAGS_selected_custom_cpus": + "%s" % ",".join([str(g) for g in t.gpus]), + "PADDLE_TRAINER_ID": "%d" % t.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), + "PADDLE_DISTRI_CUSTOM_DEVICE_TYPE": "custom_cpu", + } + + if not eager_mode: + proc_env["FLAGS_enable_eager_mode"] = "%d" % 0 + + current_env.update(proc_env) + + print("trainer proc env:{}".format(current_env)) + + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + cmd = "python -m coverage run --branch -p " + training_script + else: + cmd = "python -u " + training_script + + print("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = open("workerlog.%d" % idx, "a") + proc = subprocess.Popen(cmd.split(" "), + env=current_env, + stdout=fn, + stderr=fn) + + tp = TrainerProc() + tp.proc = proc + tp.rank = t.rank + tp.log_fn = fn + tp.cmd = cmd + + procs.append(tp) + + return procs + + +def get_cluster_from_args(selected_gpus): + from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + + cluster_node_ips = '127.0.0.1' + node_ip = '127.0.0.1' + + node_ips = [x.strip() for x in cluster_node_ips.split(',')] + + node_ips.index(node_ip) + + free_ports = None + + free_ports = find_free_ports(len(selected_gpus)) + if free_ports is not None: + free_ports = list(free_ports) + + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus) + + +class TestMultipleCustomCPU(unittest.TestCase): + + def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True): + from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + + selected_devices = [0, 1] + cluster = None + pod = None + + cluster, pod = get_cluster_from_args(selected_devices) + + procs = start_local_trainers(cluster, + pod, + eager_mode=eager_mode, + training_script=target_file_name, + training_script_args=[]) + + while True: + alive = watch_local_trainers(procs, cluster.trainers_endpoints()) + + if not alive: + print("Local procs complete, POD info:{}".format(pod)) + break + time.sleep(3) + + +class TestProcessGroup(TestMultipleCustomCPU): + + def setUp(self): + # compile so and set to current path + cur_dir = os.path.dirname(os.path.abspath(__file__)) + cmd = 'rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && cd PaddleCustomDevice/backends/custom_cpu && mkdir build && cd build && cmake .. && make -j8' + os.system(cmd) + + # set environment for loading and registering compiled custom kernels + # only valid in current process + os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( + cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + + def test_process_group_xccl(self): + from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + + self.run_mnist_2custom_cpu('process_group_xccl.py') + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh new file mode 100644 index 0000000000000..3afb1979905d3 --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + + + +rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && pushd PaddleCustomDevice/backends/custom_cpu && mkdir build && pushd build && cmake .. && make -j8 && popd && popd + +echo "begin test use custom_cpu" + +export FLAGS_selected_custom_cpus=0,1 + +distributed_args="--ips=127.0.0.1 --backend=xccl --custom_device_type=custom_cpu --custom_devices=0,1 --run_mode=collective --log_dir=testlog" +python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu