Skip to content

Commit

Permalink
Support both use_calc_stream and sync_op in send recv APIs (#46023)
Browse files Browse the repository at this point in the history
  • Loading branch information
HermitSun authored Sep 15, 2022
1 parent 92e1f64 commit ae00f42
Show file tree
Hide file tree
Showing 14 changed files with 922 additions and 40 deletions.
50 changes: 41 additions & 9 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,56 @@ class ProcessGroup {
"ProcessGroup%s does not support send", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send with sync_op flag",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, int) { // NOLINT
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support receive", GetBackendName()));
"ProcessGroup%s does not support recv", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(phi::DenseTensor&,
int,
int,
int) { // NOLINT
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send", GetBackendName()));
"ProcessGroup%s does not support recv with sync_op flag",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor&, // NOLINT
int,
int,
int) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send_partial", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Send_Partial(
phi::DenseTensor&, int, int, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send_partial with sync_op flag",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor& tensors, int, int, int) { // NOLINT
phi::DenseTensor&, // NOLINT
int,
int,
int) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support receive", GetBackendName()));
"ProcessGroup%s does not support recv_partial", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> Recv_Partial(
phi::DenseTensor&, int, int, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support recv_partial with sync_op flag",
GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather(
Expand Down
226 changes: 216 additions & 10 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ std::shared_ptr<ProcessGroupNCCL::NCCLTask> ProcessGroupNCCL::CreateTask(
places, rank, comm_type, inputs);
}

std::shared_ptr<ProcessGroupNCCL::NCCLTask> ProcessGroupNCCL::CreateTask(
const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool is_sync,
bool use_calc_stream) {
return std::make_shared<ProcessGroupNCCL::NCCLTask>(
places, rank, comm_type, inputs, is_sync, use_calc_stream);
}

ProcessGroupNCCL::NCCLTask::NCCLTask(
const std::vector<Place>& places,
int rank,
Expand Down Expand Up @@ -264,10 +275,12 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(

auto& nccl_comms = places_to_ncclcomm_[key];

SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
if (!use_calc_stream) {
SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
}

auto task = std::make_shared<ProcessGroupNCCL::NCCLTask>(
places, rank_, comm_type, inputs, sync_op, use_calc_stream);
auto task =
CreateTask(places, rank_, comm_type, inputs, sync_op, use_calc_stream);

platform::CUDADeviceGuard cuda_guard;

Expand Down Expand Up @@ -406,6 +419,78 @@ void ProcessGroupNCCL::Collective(const phi::DenseTensor* in,
cuda_guard.SetDevice(places[0]);
}

template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
std::vector<phi::DenseTensor>& tensors,
Fn fn,
int dst_rank,
CommType op_type,
bool sync_op,
bool use_calc_stream) {
const auto& places = GetPlaceList(tensors);
const auto& key = GetKeyFromPlaces(places);

{
std::lock_guard<std::mutex> lock(mutex_);
if (places_to_ncclcomm_.find(key) == places_to_ncclcomm_.end()) {
CreateNCCLManagerCache(key, places);
}
}

auto& nccl_comms = places_to_ncclcomm_[key];

if (!use_calc_stream) {
SyncDefaultStream(places, places_to_events_[key], places_to_ctx_[key]);
}

auto task =
CreateTask(places, rank_, op_type, tensors, sync_op, use_calc_stream);

platform::CUDADeviceGuard cuda_guard;

if (FLAGS_use_stream_safe_cuda_allocator) {
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}
memory::RecordStream(tensors[i].Holder(), nccl_stream);
}
}

{
platform::NCCLGroupGuard nccl_guard;
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_[key][i]->stream();
}
fn(tensors[i], nccl_comms[i]->GetNcclComm(), nccl_stream, dst_rank);
}
}

if (!use_calc_stream) {
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
task->control_events_[i].Record(*places_to_ctx_[key][i]);
}
}

return task;
}

template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
std::vector<phi::DenseTensor>& tensors,
Expand Down Expand Up @@ -617,6 +702,34 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send(
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send(
std::vector<phi::DenseTensor>& tensors,
int dst_rank,
bool sync_op,
bool use_calc_stream) {
CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

auto task = PointToPoint(
tensors,
[&](phi::DenseTensor& input,
ncclComm_t comm,
const gpuStream_t& stream,
int dst_rank) {
return platform::dynload::ncclSend(
input.data(),
input.numel(),
platform::ToNCCLDataType(input.dtype()),
dst_rank,
comm,
stream);
},
dst_rank,
CommType::SEND,
sync_op,
use_calc_stream);
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank) {
CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));
Expand All @@ -640,17 +753,43 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
std::vector<phi::DenseTensor>& tensors,
int src_rank,
bool sync_op,
bool use_calc_stream) {
CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

auto task = PointToPoint(
tensors,
[&](phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream,
int src_rank) {
return platform::dynload::ncclRecv(
output.data(),
output.numel(),
platform::ToNCCLDataType(output.dtype()),
src_rank,
comm,
stream);
},
src_rank,
CommType::RECV,
sync_op,
use_calc_stream);
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial(
phi::DenseTensor& tensors, int dst_rank, int offset, int length) {
// CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));

phi::DenseTensor flatten_tensor;
flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});

phi::DenseTensor shared_input = flatten_tensor.Slice(offset, offset + length);

std::vector<phi::DenseTensor> shared_tensors;
shared_tensors.push_back(shared_input);
std::vector<phi::DenseTensor> shared_tensors{
flatten_tensor.Slice(offset, offset + length)};

auto task = PointToPoint(
shared_tensors,
Expand All @@ -671,16 +810,49 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial(
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial(
phi::DenseTensor& tensors,
int dst_rank,
int offset,
int length,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor flatten_tensor;
flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});

std::vector<phi::DenseTensor> shared_tensors{
flatten_tensor.Slice(offset, offset + length)};

auto task = PointToPoint(
shared_tensors,
[&](phi::DenseTensor& input,
ncclComm_t comm,
const gpuStream_t& stream,
int dst_rank) {
return platform::dynload::ncclSend(
input.data(),
input.numel(),
platform::ToNCCLDataType(input.dtype()),
dst_rank,
comm,
stream);
},
dst_rank,
CommType::SEND,
sync_op,
use_calc_stream);
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial(
phi::DenseTensor& tensors, int src_rank, int offset, int length) {
// phi::DenseTensor shared_input = tensors.Slice(offset, offset+length);

phi::DenseTensor flatten_tensor;
flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});
phi::DenseTensor shared_input = flatten_tensor.Slice(offset, offset + length);

std::vector<phi::DenseTensor> shared_tensors;
shared_tensors.push_back(shared_input);
std::vector<phi::DenseTensor> shared_tensors{
flatten_tensor.Slice(offset, offset + length)};

auto task = PointToPoint(
shared_tensors,
Expand All @@ -701,6 +873,40 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial(
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial(
phi::DenseTensor& tensors,
int src_rank,
int offset,
int length,
bool sync_op,
bool use_calc_stream) {
phi::DenseTensor flatten_tensor;
flatten_tensor.ShareDataWith(tensors).Resize({tensors.numel()});

std::vector<phi::DenseTensor> shared_tensors{
flatten_tensor.Slice(offset, offset + length)};

auto task = PointToPoint(
shared_tensors,
[&](phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream,
int src_rank) {
return platform::dynload::ncclRecv(
output.data(),
output.numel(),
platform::ToNCCLDataType(output.dtype()),
src_rank,
comm,
stream);
},
src_rank,
CommType::RECV,
sync_op,
use_calc_stream);
return task;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) {
Expand Down
Loading

0 comments on commit ae00f42

Please sign in to comment.