Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] Adapt BKCL comm for XPUPS #42266

Merged
merged 10 commits into from
Apr 26, 2022
7 changes: 6 additions & 1 deletion paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ class HeterCpuWorker : public HogwildWorker {
};
#endif

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
class PSGPUWorker : public HogwildWorker {
public:
Expand All @@ -537,8 +538,10 @@ class PSGPUWorker : public HogwildWorker {
new (&program_) ProgramDesc(main_program);
}
void ProduceTasks() override;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
virtual void SetStream(const gpuStream_t stream) { copy_stream_ = stream; }
virtual void SetEvent(const gpuEvent_t event) { event_ = event; }
#endif
void ResetStat();

protected:
Expand Down Expand Up @@ -588,8 +591,10 @@ class PSGPUWorker : public HogwildWorker {
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
paddle::framework::Channel<std::shared_ptr<HeterTask>> pull_queue_;
paddle::framework::Channel<std::shared_ptr<HeterTask>> push_queue_;
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
gpuEvent_t event_;
gpuStream_t copy_stream_;
#endif
int batch_cnt_{0};
std::atomic<int> done_cnt_{0};

Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/device_worker_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ REGISTER_DEVICE_WORKER_CLASS(HeterSectionWorker);
REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker);
#endif

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker);
#endif
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/ps_gpu_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include "paddle/fluid/framework/trainer.h"
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
Expand Down
13 changes: 12 additions & 1 deletion paddle/fluid/framework/ps_gpu_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ limitations under the License. */
#include "paddle/fluid/platform/lodtensor_printer.h"
#include "paddle/fluid/string/string_helper.h"

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
Expand Down Expand Up @@ -131,6 +132,11 @@ void PSGPUWorker::TrainFiles() {
device_reader_->Start();
int cur_batch;
int batch_cnt = 0;
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
platform::SetDeviceId(thread_id_);
#elif defined(PADDLE_WITH_XPU_BKCL)
platform::SetXPUDeviceId(thread_id_);
#endif
while ((cur_batch = device_reader_->Next()) > 0) {
total_ins_num += cur_batch;
for (auto& op : ops_) {
Expand Down Expand Up @@ -227,6 +233,11 @@ void PSGPUWorker::TrainFilesWithProfiler() {
int total_ins_num = 0;
int cur_batch;
timeline.Start();
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
platform::SetDeviceId(thread_id_);
#elif defined(PADDLE_WITH_XPU_BKCL)
platform::SetXPUDeviceId(thread_id_);
#endif
while ((cur_batch = device_reader_->Next()) > 0) {
total_ins_num += cur_batch;
timeline.Pause();
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/trainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ class HeterXpuTrainer : public TrainerBase {

#endif

#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
class PSGPUTrainer : public TrainerBase {
public:
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/trainer_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ REGISTER_TRAINER_CLASS(HeterPipelineTrainer);
(defined PADDLE_WITH_PSLIB) && (!defined(PADDLE_WITH_HETERPS))
REGISTER_TRAINER_CLASS(HeterXpuTrainer);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL || \
defined PADDLE_WITH_XPU_BKCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_TRAINER_CLASS(PSGPUTrainer);
#endif
Expand Down
11 changes: 10 additions & 1 deletion paddle/fluid/operators/collective/c_comm_init_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class CCommInitOp : public framework::OperatorBase {
UniqueId* comm_id = var->GetMutable<UniqueId>();

int nranks = Attr<int>("nranks");
int rank_id = Attr<int>("rank");
int rid = Attr<int>("ring_id");

#if defined(PADDLE_WITH_XPU_BKCL)
Expand All @@ -98,8 +97,18 @@ class CCommInitOp : public framework::OperatorBase {
if (Attr<int>("device_id") >= 0) {
device_id = Attr<int>("device_id");
}

#if defined(PADDLE_WITH_XPU_BKCL) && defined(PADDLE_WITH_HETERPS) && \
defined(PADDLE_WITH_PSLIB)
// XPUPS rank_id only equals 0, so replace rank_id with device_id
CommContext::Instance().CreateComm(comm_id, nranks, device_id, device_id,
rid);
#else
int rank_id = Attr<int>("rank");
CommContext::Instance().CreateComm(comm_id, nranks, rank_id, device_id,
rid);
#endif

#endif
}
};
Expand Down
10 changes: 10 additions & 0 deletions paddle/fluid/operators/collective/c_sync_calc_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ class CSyncCalcStreamKernel : public framework::OpKernel<T> {
auto dev_ctx = static_cast<platform::MLUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
platform::MLUStreamSync(dev_ctx->stream());
#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on xpu place only for now."));

auto dev_ctx = static_cast<platform::XPUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
dev_ctx->Wait();
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand All @@ -97,3 +105,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);
REGISTER_OP_NPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);

REGISTER_OP_MLU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);

REGISTER_OP_XPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);
18 changes: 16 additions & 2 deletions paddle/fluid/operators/collective/c_sync_comm_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ limitations under the License. */
#endif

#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#endif

#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif

#if defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#endif

namespace paddle {
namespace operators {

Expand Down Expand Up @@ -94,7 +97,16 @@ class CSyncCommStreamKernel : public framework::OpKernel<T> {
auto stream =
platform::CNCLCommContext::Instance().Get(ring_id, place)->stream();
platform::MLUStreamSync(stream);

#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on xpu place only for now."));
int ring_id = ctx.Attr<int>("ring_id");
auto comm_dev_ctx = platform::BKCLCommContext::Instance()
.Get(ring_id, place)
->dev_context();
comm_dev_ctx->Wait();
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand All @@ -115,3 +127,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);
REGISTER_OP_NPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);

REGISTER_OP_MLU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);

REGISTER_OP_XPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);
Original file line number Diff line number Diff line change
Expand Up @@ -1139,10 +1139,11 @@ def minimize(self,
from paddle.fluid.transpiler.collective import MultiThread
# check start program
if program_mode not in [
"all_reduce", "fuse_all_reduce", "all_gather"
"all_reduce", "fuse_all_reduce", "all_gather",
"all_reduce_xpu"
]:
raise ValueError("You should set program_mode in [ all_reduce, \
fuse_all_reduce, all_gather ]")
fuse_all_reduce, all_gather, all_reduce_xpu ]")
env = self.get_dist_env()
if not isinstance(losses, list):
startup_programs = [startup_programs]
Expand Down
45 changes: 42 additions & 3 deletions python/paddle/fluid/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, nrings):
self.nrings = nrings
self.endpoints = None
self.current_endpoint = None
self.other_endpoints = None
self.nranks = None
self.rank = None
self.startup_program = None
Expand Down Expand Up @@ -79,6 +80,12 @@ def transpile(self, startup_program, main_program, rank, endpoints,
self.endpoints = endpoints
self.current_endpoint = current_endpoint

if current_endpoint:
nranks = len(endpoints)
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
self.other_endpoints = other_endpoints

self.wait_port = wait_port

self.startup_program._origin_program = self.startup_program.clone()
Expand Down Expand Up @@ -462,9 +469,41 @@ def _transpile_startup_program(self):
self.rank, ring_id, self.wait_port, True)

else:
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})
if "xpu" in self.trans_mode:
print(
"begin to _transpile_startup_program for single-node in XPU")
block = self.startup_program.global_block()
comm_id_var = block.create_var(
name=unique_name.generate('comm_id'),
persistable=True,
type=core.VarDesc.VarType.RAW)
block.append_op(
type='c_gen_bkcl_id',
inputs={},
outputs={'Out': comm_id_var},
attrs={
'rank': self.rank,
'endpoint': self.current_endpoint,
'other_endpoints': self.other_endpoints,
'ring_id': 0,
self.op_role_key: OpRole.Forward
})
block.append_op(
type='c_comm_init',
inputs={'X': comm_id_var},
outputs={},
attrs={
'nranks':
len(os.getenv("FLAGS_selected_gpus").split(",")),
'rank': self.rank,
'ring_id': 0,
self.op_role_key: OpRole.Forward
})

else:
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})

def _transpile_main_program(self):
self._insert_scale_loss_grad_ops()
Expand Down