Skip to content

Commit

Permalink
Merge branch 'develop' into phi_matmul_kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
Silv3S committed Nov 28, 2022
2 parents 6328e85 + ea830d4 commit 65cf6df
Show file tree
Hide file tree
Showing 1,699 changed files with 20,443 additions and 15,205 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ paddle/fluid/operators/generated_op.cc
paddle/fluid/operators/generated_sparse_op.cc
paddle/phi/ops/compat/generated_sig.cc
paddle/phi/ops/compat/generated_sparse_sig.cc
paddle/phi/api/yaml/parsed_apis/
paddle/fluid/operators/generator/parsed_ops/
paddle/fluid/pybind/tmp_eager_op_function_impl.h
paddle/fluid/pybind/eager_op_function_impl.h
Expand Down
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ repos:
hooks:
- id: black
files: (.*\.(py|pyi|bzl)|BUILD|.*\.BUILD|WORKSPACE)$
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ set(XPU_RT_LIB_NAME "libxpurt.so")
if(NOT DEFINED XPU_BASE_URL)
set(XPU_BASE_URL_WITHOUT_DATE
"https://baidu-kunlun-product.su.bcebos.com/KL-SDK/klsdk-dev")
set(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20221116")
set(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20221124")
else()
set(XPU_BASE_URL "${XPU_BASE_URL}")
endif()
Expand Down
104 changes: 104 additions & 0 deletions paddle/fluid/distributed/collective/NCCLTools.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,109 @@ std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) {
return oss.str();
}

void StaticCheckTensor(const phi::DenseTensor& tensor,
int rank,
int world_size) {
// place check
PADDLE_ENFORCE_EQ(
platform::is_gpu_place(tensor.place()),
true,
platform::errors::InvalidArgument("Tensor should be in GPU place."));
// rank check
PADDLE_ENFORCE_GE(rank,
0,
platform::errors::InvalidArgument(
"Rank should be greater than or equal to 0."));
PADDLE_ENFORCE_LT(
rank,
world_size,
platform::errors::InvalidArgument("Rank is out of the process group."));
}

// static check for collective
void StaticCheckTensors(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size,
int out_size_factor,
int in_size_factor) {
// place check
PADDLE_ENFORCE_EQ(platform::is_gpu_place(out_tensor.place()),
true,
platform::errors::InvalidArgument(
"Output tensor should be in GPU place."));
PADDLE_ENFORCE_EQ(platform::is_gpu_place(in_tensor.place()),
true,
platform::errors::InvalidArgument(
"Input tensor should be in GPU place."));
// rank check
PADDLE_ENFORCE_GE(rank,
0,
platform::errors::InvalidArgument(
"Rank should be greater than or equal to 0."));
PADDLE_ENFORCE_LT(
rank,
world_size,
platform::errors::InvalidArgument("Rank is out of the process group."));
// shape check
int64_t out_size = out_tensor.numel();
PADDLE_ENFORCE_GT(out_size,
0,
platform::errors::InvalidArgument(
"Size of output tensor should be greater than 0."));
int64_t in_size = in_tensor.numel();
PADDLE_ENFORCE_GT(in_size,
0,
platform::errors::InvalidArgument(
"Size of input tensor should be greater than 0."));
PADDLE_ENFORCE_EQ(
out_size * out_size_factor,
in_size * in_size_factor,
platform::errors::InvalidArgument(
"Input and output tensors should have matching sizes."));
// dtype check
PADDLE_ENFORCE_EQ(
out_tensor.dtype(),
in_tensor.dtype(),
platform::errors::InvalidArgument(
"Input and output tensors should have the same data type."));
}

void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size) {
StaticCheckTensors(out_tensor,
in_tensor,
rank,
world_size,
/*out_size_factor*/ 1,
/*in_size_factor*/ 1);
}

void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size) {
StaticCheckTensors(out_tensor,
in_tensor,
rank,
world_size,
/*out_size_factor*/ world_size,
/*in_size_factor*/ 1);
}

void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size) {
StaticCheckTensors(out_tensor,
in_tensor,
rank,
world_size,
/*out_size_factor*/ 1,
/*in_size_factor*/ world_size);
}

} // namespace distributed
} // namespace paddle
27 changes: 27 additions & 0 deletions paddle/fluid/distributed/collective/NCCLTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,32 @@ ncclRedOp_t ToNCCLRedType(ReduceOp reduction);

std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID);

// static check for p2p
void StaticCheckTensor(const phi::DenseTensor& tensor,
int rank,
int world_size);

// static check for collective
void StaticCheckTensors(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size,
int out_size_factor,
int in_size_factor);

void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size);

void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size);

void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor,
const phi::DenseTensor& in_tensor,
int rank,
int world_size);
} // namespace distributed
} // namespace paddle
51 changes: 51 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupBKCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,57 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::AllGather(
use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Reduce(
phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
return Collective(
out_tensor,
in_tensor,
[&](phi::DenseTensor* output,
const phi::DenseTensor& input,
BKCLContext_t comm,
const XPUStream& stream) {
phi::DenseTensor output_t(*output);
const auto& place = input.place();
auto* calc_ctx = static_cast<phi::XPUContext*>(
platform::DeviceContextPool::Instance().Get(place));
switch (input.dtype()) {
case phi::DataType::FLOAT32:
calc_ctx->template Alloc<float>(&output_t);
break;
case phi::DataType::FLOAT16:
calc_ctx->template Alloc<float16>(&output_t);
break;
case phi::DataType::INT32:
calc_ctx->template Alloc<int>(&output_t);
break;
default:
VLOG(0) << "Error: type " << input.dtype() << " not supported for "
<< GetBackendName();
break;
}
int ret =
bkcl_all_reduce(comm,
input.data(),
output_t.data(),
input.numel(),
platform::ToBKCLDataType(
framework::TransToProtoVarType(input.type())),
ToBKCLRedType(opts.reduce_op),
stream);
if (rank_ == opts.root_rank) {
*output = output_t;
}
return ret;
},
CommType::ALLREDUCE,
sync_op,
use_calc_stream);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Barrier(
const BarrierOptions& opts) {
PADDLE_ENFORCE_GE(opts.device_id,
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupBKCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class ProcessGroupBKCL : public ProcessGroupStream {
bool sync_op,
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> Reduce(phi::DenseTensor* out_tensor,
const phi::DenseTensor& in_tensor,
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) override;

std::shared_ptr<ProcessGroup::Task> Barrier(
const BarrierOptions& = BarrierOptions()) override;

Expand Down
21 changes: 21 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"

#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
Expand Down Expand Up @@ -137,6 +138,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather(
// numel > 0 indicates the tensor need to be sliced
const phi::DenseTensor& in_tensor_maybe_partial =
numel > 0 ? GetPartialTensor(in_tensor, offset, numel) : in_tensor;
StaticCheckTensorsGatherLikeShape(
*out_tensor, in_tensor_maybe_partial, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclAllGather(
Expand All @@ -159,6 +162,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
const AllreduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclAllReduce(
Expand Down Expand Up @@ -207,6 +211,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
CheckSizeOnEachRank(out_dim, out_size_each_rank, size_);
CheckSizeOnEachRank(in_dim, in_size_each_rank, size_);

// NOTE: Since `all_to_all` needs other processes's participation, it cannot
// simply be covered by static checks. Factors are set to 0 here to skip the
// shape check. Its shape check will be done by dynamic checks in debug mode.
StaticCheckTensors(*out_tensor,
in_tensor,
rank_,
size_,
/*out_size_factor*/ 0,
/*in_size_factor*/ 0);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
int64_t in_row_size = in_tensor.numel() / in_dim[0],
Expand Down Expand Up @@ -274,6 +287,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
const BroadcastOptions& opts,
bool sync_op,
bool use_calc_stream) {
StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
int root = opts.source_rank + opts.source_root;
Expand All @@ -298,6 +312,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Reduce(
const ReduceOptions& opts,
bool sync_op,
bool use_calc_stream) {
StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclReduce(
Expand All @@ -322,6 +337,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::ReduceScatter(
const ReduceScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclReduceScatter(
Expand All @@ -345,6 +361,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Scatter(
const ScatterOptions& opts,
bool sync_op,
bool use_calc_stream) {
StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
int64_t numel = in_tensor.numel() / size_;
Expand Down Expand Up @@ -400,6 +417,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
partial_tensor = GetPartialTensor(*tensor, offset, numel);
tensor = &partial_tensor;
}

StaticCheckTensor(*tensor, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclRecv(
Expand All @@ -426,6 +445,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send(
// numel > 0 indicates the tensor need to be sliced
const phi::DenseTensor& tensor_maybe_partial =
numel > 0 ? GetPartialTensor(tensor, offset, numel) : tensor;

StaticCheckTensor(tensor_maybe_partial, rank_, size_);
return RunFnInNCCLEnv(
[&](ncclComm_t comm, gpuStream_t stream) {
NCCL_CHECK(platform::dynload::ncclSend(
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ class ProcessGroupNCCL final : public ProcessGroupStream {

void CreateNCCLEnvCache(const Place& place, const std::string& place_key);

void SyncCalcStream(const Place& place);

std::shared_ptr<ProcessGroup::Task> RunFnInNCCLEnv(
std::function<void(ncclComm_t, gpuStream_t)> fn,
const phi::DenseTensor& tensor,
CommType comm_type,
bool sync_op,
bool use_calc_stream);

void SyncCalcStream(const Place& place);

// TODO(sunyilun): methods below will be removed later
std::shared_ptr<ProcessGroupNCCL::NCCLTask> CreateTask(
std::vector<Place> places,
Expand All @@ -245,6 +245,7 @@ class ProcessGroupNCCL final : public ProcessGroupStream {

private:
std::shared_ptr<Store> store_;

std::unordered_map<std::string, platform::DeviceEvent>
place_to_calc_event_; // event on calc stream
std::unordered_map<std::string, phi::GPUContext*> place_to_calc_ctx_;
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace paddle {
namespace distributed {

inline phi::DenseTensor GetPartialTensor(const phi::DenseTensor &tensor,
inline phi::DenseTensor GetPartialTensor(const phi::DenseTensor& tensor,
int64_t offset,
int64_t numel) {
phi::DenseTensor tensor_flattened;
Expand Down
Loading

0 comments on commit 65cf6df

Please sign in to comment.