Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completes basic dtypes for collective api in eager mode #45574

Merged
merged 8 commits into from
Sep 6, 2022
13 changes: 11 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,23 @@ void* GetPointerByOffset(void* raw_pointer,
} else if (type == experimental::DataType::FLOAT64) {
return reinterpret_cast<void*>(reinterpret_cast<double*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::FLOAT16) {
return reinterpret_cast<void*>(reinterpret_cast<int16_t*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::INT32) {
return reinterpret_cast<void*>(reinterpret_cast<int32_t*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::INT64) {
return reinterpret_cast<void*>(reinterpret_cast<int64_t*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::FLOAT16) {
return reinterpret_cast<void*>(reinterpret_cast<int16_t*>(raw_pointer) +
} else if (type == experimental::DataType::INT8) {
return reinterpret_cast<void*>(reinterpret_cast<int8_t*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::UINT8) {
return reinterpret_cast<void*>(reinterpret_cast<uint8_t*>(raw_pointer) +
offset);
} else if (type == experimental::DataType::BOOL) {
return reinterpret_cast<void*>(reinterpret_cast<bool*>(raw_pointer) +
offset);
} else {
PADDLE_THROW(platform::errors::Unimplemented(
Expand Down
2 changes: 2 additions & 0 deletions paddle/phi/kernels/cpu/concat_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ PD_REGISTER_KERNEL(concat,
int64_t,
int,
uint8_t,
int8_t,
phi::dtype::float16,
phi::dtype::bfloat16,
phi::dtype::complex<float>,
phi::dtype::complex<double>) {}
1 change: 1 addition & 0 deletions paddle/phi/kernels/gpu/concat_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ PD_REGISTER_KERNEL(concat,
int64_t,
int,
uint8_t,
int8_t,
phi::dtype::float16,
phi::dtype::bfloat16,
phi::dtype::complex<float>,
Expand Down
379 changes: 183 additions & 196 deletions python/paddle/distributed/collective.py

Large diffs are not rendered by default.

29 changes: 26 additions & 3 deletions python/paddle/fluid/tests/unittests/collective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_alltoall_api MODULES test_collective_alltoall_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
Expand All @@ -92,6 +92,14 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
)
set_tests_properties(test_collective_alltoall_single PROPERTIES TIMEOUT "350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_alltoall_single_api MODULES
test_collective_alltoall_single_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_single_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_barrier_api MODULES test_collective_barrier_api ENVS
Expand All @@ -117,7 +125,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_broadcast_api MODULES test_collective_broadcast_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_broadcast_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
Expand All @@ -141,6 +149,13 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
set_tests_properties(test_collective_global_scatter
PROPERTIES TIMEOUT "200" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_isend_irecv_api MODULES test_collective_isend_irecv_api
ENVS "http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_isend_irecv_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_optimizer MODULES test_collective_optimizer ENVS
Expand Down Expand Up @@ -186,6 +201,14 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
)
set_tests_properties(test_collective_reduce_scatter PROPERTIES TIMEOUT "350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_reduce_scatter_api MODULES
test_collective_reduce_scatter_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce_scatter_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_scatter MODULES test_collective_scatter ENVS
Expand All @@ -212,7 +235,7 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_collective_sendrecv_api MODULES test_collective_sendrecv_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_sendrecv_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
tindata = paddle.split(tindata, 2, axis=0)
tout_data = []
paddle.distributed.alltoall(tindata, tout_data)
output_data = []
for data in tout_data:
output_data.append(data.numpy())
return output_data
toutdata = []
paddle.distributed.alltoall(tindata, toutdata)
return [data.numpy() for data in toutdata]


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import test_collective_api_base as test_base


class TestCollectiveAllToAllSingleAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
toutdata = paddle.to_tensor(indata)
paddle.distributed.alltoall_single(tindata, toutdata)
return [toutdata.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllToAllSingleAPI, "alltoall")
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base


class TestCollectiveBroadcastAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
paddle.distributed.broadcast(tindata, src=1)
return [tindata.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveBroadcastAPI, "broadcast")
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base


class TestCollectiveIsendIrecvAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
if rank == 0:
task = paddle.distributed.isend(tindata, dst=1)
else:
task = paddle.distributed.irecv(tindata, src=0)
task.wait()
return [tindata.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveIsendIrecvAPI, "sendrecv")
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base


class TestCollectiveReduceAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
paddle.distributed.reduce(tindata, dst=0)
return [tindata.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveReduceAPI, "reduce")
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base


class TestCollectiveReduceScatterAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
paddle.distributed.reduce_scatter(subdata1, [subdata1, subdata2])
return [subdata1.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveReduceScatterAPI, "reduce_scatter")
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import paddle
import paddle.fluid as fluid
import unittest
import test_collective_api_base as test_base


class TestCollectiveScatterAPI(test_base.TestCollectiveAPIRunnerBase):

def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.to_tensor(indata)
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
if rank == 0:
paddle.distributed.scatter(subdata1, src=1)
else:
paddle.distributed.scatter(subdata1,
tensor_list=[subdata1, subdata2],
src=1)
return [subdata1.numpy()]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveScatterAPI, "scatter")
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ def test_alltoall_nccl(self):
self.check_with_place("collective_alltoall_api.py", "alltoall", "nccl")

def test_alltoall_nccl_dygraph(self):
self.check_with_place("collective_alltoall_api_dygraph.py",
"alltoall",
"nccl",
static_mode="0")
dtypes_to_test = [
'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8',
'bool'
]
for dtype in dtypes_to_test:
self.check_with_place("collective_alltoall_api_dygraph.py",
"alltoall",
"nccl",
static_mode="0",
dtype=dtype)


if __name__ == '__main__':
Expand Down
Loading