Skip to content

Commit

Permalink
gather with doc (#52105)
Browse files Browse the repository at this point in the history
* gather with doc

* resolve comment

* polish

* polish

* code style

* polish doc

* add_test

* polish

* polish

* add test check

* add test check

* polish

* polish

* polish

* polish

* fix_time_out

* polish

* fix timeout

* fix_timeout

* polish

* polish

* polish

* polish

* polish
  • Loading branch information
liuzhenhai93 authored Mar 31, 2023
1 parent 20ee0d7 commit 77d2485
Show file tree
Hide file tree
Showing 20 changed files with 523 additions and 26 deletions.
24 changes: 24 additions & 0 deletions paddle/fluid/distributed/collective/process_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,30 @@ class ProcessGroup {
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Gather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support gather "
"with sync_op and use_calc_stream flag.",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Gather(
std::vector<phi::DenseTensor>* gather_tensors_ptr,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support gather "
"with sync_op and use_calc_stream flag.",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
bool sync_op,
Expand Down
65 changes: 65 additions & 0 deletions paddle/fluid/distributed/collective/process_group_nccl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,71 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Scatter(
use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Gather(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
std::vector<phi::DenseTensor> partial_tensors;
if (rank_ == opts.root_rank) {
partial_tensors.reserve(size_);
size_t offset = 0;
size_t numel = out_tensor->numel() / size_;
for (auto i = 0; i < size_; i++) {
partial_tensors.push_back(GetPartialTensor(*out_tensor, offset, numel));
offset += numel;
}
}
return Gather(&partial_tensors, in_tensor, opts, sync_op, use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Gather(
std::vector<phi::DenseTensor>* gather_tensors_ptr,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) {
auto& gather_tensors = *gather_tensors_ptr;
PADDLE_ENFORCE_GT(size_,
opts.root_rank,
phi::errors::InvalidArgument(
"root world size [%d] is less than root rank [%d]",
size_,
opts.root_rank));
auto gather_func = [&](ncclComm_t comm, gpuStream_t stream) {
// shape check
if (FLAGS_enable_nccl_dynamic_check) {
phi::distributed::NCCLDynamicCheck::CheckGatherShape(
in_tensor, gather_tensors, opts.root_rank, rank_, size_, comm);
}
GroupStart();
// root receive from all devices
if (rank_ == opts.root_rank) {
for (auto i = 0; i < size_; i++) {
auto& gather_tensor = gather_tensors[i];
NCCL_CHECK(
phi::dynload::ncclRecv(gather_tensor.data(),
gather_tensor.numel(),
phi::ToNCCLDataType(gather_tensor.dtype()),
i,
comm,
stream));
}
}
// send to root
NCCL_CHECK(phi::dynload::ncclSend(in_tensor.data(),
in_tensor.numel(),
phi::ToNCCLDataType(in_tensor.dtype()),
opts.root_rank,
comm,
stream));
GroupEnd();
};
return RunFnInNCCLEnv(
gather_func, in_tensor, CommType::GATHER, sync_op, use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
phi::DenseTensor* tensor,
int src_rank,
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/distributed/collective/process_group_nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ class ProcessGroupNCCL final : public ProcessGroupWithStream {
bool sync_op,
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> Gather(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> Gather(
std::vector<phi::DenseTensor>* gather_tensors_ptr,
const phi::DenseTensor& in_tensor,
const GatherOptions& opts,
bool sync_op,
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> Recv(phi::DenseTensor* tensor,
int src_rank,
int64_t offset,
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/collective/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ struct ScatterOptions {
int root_rank = 0;
};

struct GatherOptions {
int root_rank = 0;
};

struct ReduceScatterOptions {
ReduceOp reduce_op = ReduceOp::SUM;
};
Expand Down
41 changes: 41 additions & 0 deletions paddle/fluid/pybind/distributed_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ void BindDistributed(py::module *m) {
.def_readwrite("reduce_op", &distributed::ReduceOptions::reduce_op)
.def_readwrite("source_root", &distributed::ReduceOptions::root_rank);

py::class_<distributed::GatherOptions>(*m, "GatherOptions")
.def(py::init<>())
.def_readwrite("root_rank", &distributed::GatherOptions::root_rank);

auto ProcessGroup =
py::class_<distributed::ProcessGroup,
std::shared_ptr<distributed::ProcessGroup>>(*m, "ProcessGroup")
Expand Down Expand Up @@ -521,7 +525,44 @@ void BindDistributed(py::module *m) {
py::arg("src"),
py::arg("sync_op"),
py::call_guard<py::gil_scoped_release>())
.def(
"gather",
[](distributed::ProcessGroup &self,
py::handle py_in_tensor,
py::handle py_gather_tensor_list,
int dst,
bool sync_op,
bool use_calc_stream) {
auto out_tensor_list =
CastPyArg2VectorOfTensor(py_gather_tensor_list.ptr(), 0);
Tensor stack_out_tensor = paddle::stack(out_tensor_list, 0);
auto p_out_tensor = std::dynamic_pointer_cast<phi::DenseTensor>(
stack_out_tensor.impl());
auto *out_dense = p_out_tensor.get();

auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0);
auto p_in_tensor = std::dynamic_pointer_cast<phi::DenseTensor>(
in_tensor.impl());
auto in_dense = *p_in_tensor;

auto *dev_ctx =
self.GetDeviceContext(in_tensor.place(), use_calc_stream);
distributed::GatherOptions gather_ops{dst};
auto task = self.Gather(
out_dense, in_dense, gather_ops, sync_op, use_calc_stream);
SplitTensor(*dev_ctx, *out_dense, &out_tensor_list);
if (!use_calc_stream) {
// calculate stream will wait comm stream
task->UpdateWaitChain(*dev_ctx);
}
return task;
},
py::arg("in"),
py::arg("out"),
py::arg("dst"),
py::arg("sync_op"),
py::arg("use_calc_stream"),
py::call_guard<py::gil_scoped_release>())
.def(
"barrier",
[](distributed::ProcessGroup &self, int8_t device_id) {
Expand Down
44 changes: 40 additions & 4 deletions paddle/phi/core/distributed/check/nccl_dynamic_check.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void NCCLDynamicCheck::CheckDataType(const phi::DenseTensor& tensor,

PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclBroadcast(dtype_device,
dtype_device,
kSize,
1,
ncclInt64,
root_rank,
comm,
Expand Down Expand Up @@ -106,7 +106,7 @@ void NCCLDynamicCheck::CheckShape(const phi::DenseTensor& tensor,

PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclBroadcast(shape_device,
shape_device,
kSize,
1,
ncclInt64,
root_rank,
comm,
Expand Down Expand Up @@ -141,10 +141,9 @@ void NCCLDynamicCheck::CheckShape(const phi::DenseTensor& out_tensor,
PADDLE_ENFORCE_GPU_SUCCESS(gpuMalloc(&in_shape_device, kSize));
PADDLE_ENFORCE_GPU_SUCCESS(gpuMemcpy(
in_shape_device, &in_shape_host, kSize, gpuMemcpyHostToDevice));

PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclReduce(in_shape_device,
in_shape_device,
kSize,
1,
ncclInt64,
ncclSum,
rank,
Expand All @@ -159,5 +158,42 @@ void NCCLDynamicCheck::CheckShape(const phi::DenseTensor& out_tensor,
PADDLE_ENFORCE_GPU_SUCCESS(gpuFree(in_shape_device));
}
}

void NCCLDynamicCheck::CheckGatherShape(
const phi::DenseTensor& in_tensor,
const std::vector<phi::DenseTensor>& out_tensors,
int root_rank,
int cur_rank,
int world_size,
ncclComm_t comm) {
std::vector<int64_t> shapes(world_size, 0);
shapes[cur_rank] = in_tensor.numel();
int64_t* in_shape_device;
PADDLE_ENFORCE_GPU_SUCCESS(
gpuMalloc(&in_shape_device, world_size * sizeof(int64_t)));
PADDLE_ENFORCE_GPU_SUCCESS(gpuMemcpy(in_shape_device,
shapes.data(),
world_size * sizeof(int64_t),
gpuMemcpyHostToDevice));

PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclAllReduce(in_shape_device,
in_shape_device,
world_size,
ncclInt64,
ncclSum,
comm,
kDefaultStream));
PADDLE_ENFORCE_GPU_SUCCESS(gpuMemcpy(shapes.data(),
in_shape_device,
world_size * sizeof(int64_t),
gpuMemcpyDeviceToHost));
PADDLE_ENFORCE_GPU_SUCCESS(gpuFree(in_shape_device));

if (cur_rank == root_rank) {
for (int i = 0; i < world_size; i++) {
CheckShape(out_tensors[i], shapes[i]);
}
}
}
} // namespace distributed
} // namespace phi
8 changes: 8 additions & 0 deletions paddle/phi/core/distributed/check/nccl_dynamic_check.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct NCCLDynamicCheck {
int world_size,
ncclComm_t comm);

// can be used to check gather and all gather
static void CheckGatherShape(const phi::DenseTensor& in_tensor,
const std::vector<phi::DenseTensor>& out_tensors,
int root_rank,
int cur_rank,
int world_size,
ncclComm_t comm);

private:
// `0` represents default stream for both cuda & hip
static constexpr gpuStream_t kDefaultStream = 0;
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
reduce,
send,
scatter,
gather,
scatter_object_list,
isend,
recv,
Expand Down Expand Up @@ -82,6 +83,7 @@
"spawn",
"launch",
"scatter",
"gather",
"scatter_object_list",
"broadcast",
"broadcast_object_list",
Expand Down
1 change: 1 addition & 0 deletions python/paddle/distributed/communication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .send import send, isend
from .recv import recv, irecv
from .scatter import scatter, scatter_object_list
from .gather import gather
from .batch_isend_irecv import batch_isend_irecv, P2POp
from .reduce_scatter import reduce_scatter
from .all_to_all import alltoall, alltoall_single
Expand Down
60 changes: 60 additions & 0 deletions python/paddle/distributed/communication/gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from paddle import framework
from paddle.distributed.communication import stream


def gather(tensor, gather_list=None, dst=0, group=None, sync_op=True):
"""
Gather tensors from all participators.
Args:
tensor (Tensor): The input Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
gather_list (list): A list of Tensors to hold the gathered tensors. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. Default value is None.
dst (int): The dst rank id. Default value is 0.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Async work handle,which can be wait on, if async_op is set to True.
None, if not async_op
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
gather_list = []
if dist.get_rank() == 0:
data = paddle.to_tensor([1, 2, 3])
dist.gather(data, gather_list, dst=0)
else:
data = paddle.to_tensor([4, 5, 6])
dist.gather(data1, gather_list, dst=0)
print(gather_list)
# [[1, 2, 3], [4, 5, 6]] (2 GPUs, out for rank 0)
# [] (2 GPUs, out for rank 1)
"""
assert (
framework.in_dygraph_mode()
), "gather doesn't support static graph mode yet."
return stream.gather(tensor, gather_list, dst, group, sync_op)
2 changes: 2 additions & 0 deletions python/paddle/distributed/communication/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .recv import recv
from .scatter import scatter
from .send import send
from .gather import gather

__all__ = [
"all_gather",
Expand All @@ -33,4 +34,5 @@
"recv",
"scatter",
"send",
"gather",
]
Loading

0 comments on commit 77d2485

Please sign in to comment.