Skip to content

Commit

Permalink
Adapt XPUPS - adapt BKCL comm for XPUPS - 4.24
Browse files Browse the repository at this point in the history
  • Loading branch information
WorgenZhang committed Apr 24, 2022
1 parent 27633f4 commit 32d5d20
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 7 deletions.
9 changes: 9 additions & 0 deletions paddle/fluid/operators/collective/c_comm_init_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,17 @@ class CCommInitOp : public framework::OperatorBase {
if (Attr<int>("device_id") >= 0) {
device_id = Attr<int>("device_id");
}

#if defined(PADDLE_WITH_XPU_BKCL) && defined(PADDLE_WITH_HETERPS) && \
defined(PADDLE_WITH_PSLIB)
// XPUPS rank_id only equals 0, so replace rank_id with device_id
CommContext::Instance().CreateComm(comm_id, nranks, device_id, device_id,
rid);
#else
CommContext::Instance().CreateComm(comm_id, nranks, rank_id, device_id,
rid);
#endif

#endif
}
};
Expand Down
10 changes: 10 additions & 0 deletions paddle/fluid/operators/collective/c_sync_calc_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ class CSyncCalcStreamKernel : public framework::OpKernel<T> {
auto dev_ctx = static_cast<platform::MLUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
platform::MLUStreamSync(dev_ctx->stream());
#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on xpu place only for now."));

auto dev_ctx = static_cast<platform::XPUDeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
dev_ctx->Wait();
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand All @@ -97,3 +105,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);
REGISTER_OP_NPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);

REGISTER_OP_MLU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);

REGISTER_OP_XPU_KERNEL(c_sync_calc_stream, ops::CSyncCalcStreamKernel<float>);
18 changes: 16 additions & 2 deletions paddle/fluid/operators/collective/c_sync_comm_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ limitations under the License. */
#endif

#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#endif

#if defined(PADDLE_WITH_CNCL)
#include "paddle/fluid/platform/device/mlu/cncl_helper.h"
#endif

#if defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/platform/collective_helper.h"
#endif

namespace paddle {
namespace operators {

Expand Down Expand Up @@ -94,7 +97,16 @@ class CSyncCommStreamKernel : public framework::OpKernel<T> {
auto stream =
platform::CNCLCommContext::Instance().Get(ring_id, place)->stream();
platform::MLUStreamSync(stream);

#elif defined(PADDLE_WITH_XPU_BKCL)
auto place = ctx.GetPlace();
PADDLE_ENFORCE_EQ(platform::is_xpu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on xpu place only for now."));
int ring_id = ctx.Attr<int>("ring_id");
auto comm_dev_ctx = platform::BKCLCommContext::Instance()
.Get(ring_id, place)
->dev_context();
comm_dev_ctx->Wait();
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand All @@ -115,3 +127,5 @@ REGISTER_OP_CUDA_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);
REGISTER_OP_NPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);

REGISTER_OP_MLU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);

REGISTER_OP_XPU_KERNEL(c_sync_comm_stream, ops::CSyncCommStreamKernel<float>);
Original file line number Diff line number Diff line change
Expand Up @@ -1139,10 +1139,11 @@ def minimize(self,
from paddle.fluid.transpiler.collective import MultiThread
# check start program
if program_mode not in [
"all_reduce", "fuse_all_reduce", "all_gather"
"all_reduce", "fuse_all_reduce", "all_gather",
"all_reduce_xpu"
]:
raise ValueError("You should set program_mode in [ all_reduce, \
fuse_all_reduce, all_gather ]")
fuse_all_reduce, all_gather, all_reduce_xpu ]")
env = self.get_dist_env()
if not isinstance(losses, list):
startup_programs = [startup_programs]
Expand Down
45 changes: 42 additions & 3 deletions python/paddle/fluid/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, nrings):
self.nrings = nrings
self.endpoints = None
self.current_endpoint = None
self.other_endpoints = None
self.nranks = None
self.rank = None
self.startup_program = None
Expand Down Expand Up @@ -79,6 +80,12 @@ def transpile(self, startup_program, main_program, rank, endpoints,
self.endpoints = endpoints
self.current_endpoint = current_endpoint

if current_endpoint:
nranks = len(endpoints)
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
self.other_endpoints = other_endpoints

self.wait_port = wait_port

self.startup_program._origin_program = self.startup_program.clone()
Expand Down Expand Up @@ -462,9 +469,41 @@ def _transpile_startup_program(self):
self.rank, ring_id, self.wait_port, True)

else:
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})
if "xpu" in self.trans_mode:
print(
"begin to _transpile_startup_program for single-node in XPU")
block = self.startup_program.global_block()
comm_id_var = block.create_var(
name=unique_name.generate('comm_id'),
persistable=True,
type=core.VarDesc.VarType.RAW)
block.append_op(
type='c_gen_bkcl_id',
inputs={},
outputs={'Out': comm_id_var},
attrs={
'rank': self.rank,
'endpoint': self.current_endpoint,
'other_endpoints': self.other_endpoints,
'ring_id': 0,
self.op_role_key: OpRole.Forward
})
block.append_op(
type='c_comm_init',
inputs={'X': comm_id_var},
outputs={},
attrs={
'nranks':
len(os.getenv("FLAGS_selected_gpus").split(",")),
'rank': self.rank,
'ring_id': 0,
self.op_role_key: OpRole.Forward
})

else:
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})

def _transpile_main_program(self):
self._insert_scale_loss_grad_ops()
Expand Down

1 comment on commit 32d5d20

@paddle-bot-old
Copy link

@paddle-bot-old paddle-bot-old bot commented on 32d5d20 Apr 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🕵️ CI failures summary

🔍 PR: #42168 Commit ID: 32d5d20 contains failed CI.

🔹 Failed: PR-CI-ROCM-Compile

Unknown Failed
Unknown Failed

Please sign in to comment.