Skip to content

Commit

Permalink
support multi-node (PaddlePaddle#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
hutuxian authored Jul 14, 2020
1 parent 9bee597 commit 3d16147
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 18 deletions.
1 change: 1 addition & 0 deletions paddle/fluid/framework/details/multi_devices_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static std::unordered_set<std::string> kMultiDeviceOps{
"c_broadcast",
"c_comm_init",
"c_comm_init_all",
"c_comm_init_multitrainer",
"c_gen_nccl_id",
"c_sync_comm_stream",
"send",
Expand Down
101 changes: 101 additions & 0 deletions paddle/fluid/operators/collective/c_comm_init_multitrainer_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#if defined(PADDLE_WITH_NCCL)
#include <nccl.h>
#endif
#include <stdint.h>
#include <ostream>
#include <string>

#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/operators/distributed/request_handler_impl.h"
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/nccl_helper.h"
#endif

namespace paddle {
namespace operators {

class CCommInitMultiTrainerInferShape : public framework::InferShapeBase {
public:
~CCommInitMultiTrainerInferShape() {}
void operator()(framework::InferShapeContext* ctx) const override{};
};

class CCommInitMultiTrainerOp : public framework::OperatorBase {
public:
CCommInitMultiTrainerOp(const std::string& type,
const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}

void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
auto var = scope.FindVar(Input("X"));
PADDLE_ENFORCE_NOT_NULL(var);
#if defined(PADDLE_WITH_NCCL)
ncclUniqueId* nccl_id = var->GetMutable<ncclUniqueId>();

int ntrainers = Attr<int>("ntrainers");
int train_id = Attr<int>("trainer_id");
int rid = Attr<int>("ring_id");

std::vector<int> devices = Attr<std::vector<int>>("devices");
if (devices.empty()) {
devices = platform::GetSelectedDevices();
}
platform::NCCLCommContext::Instance().CreateNCCLCommMultiTrainer(
devices, nccl_id, ntrainers, train_id, rid);
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
#endif
}
};

class CCommInitMultiTrainerOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X", "Raw variable contains a NCCL UniqueId instaces.");
AddComment(R"DOC(
CCommInitMultiTrainer operator
Initialize collective communicatoin context within this trainer
)DOC");
AddAttr<int>("ntrainers",
"(int) The number of trainers of distributed trainers");
AddAttr<int>("trainer_id",
"(int) The id of the trainer in distributed training.");
AddAttr<std::vector<int>>("devices",
"(std::vector<int>) which devices does the nccl "
"comm initialized on in each trainer")
.SetDefault({});
AddAttr<int>("ring_id", "(int default 0) user specified ring id")
.SetDefault(0);
}
};

} // namespace operators
} // namespace paddle

namespace ops = paddle::operators;

REGISTER_OPERATOR(c_comm_init_multitrainer, ops::CCommInitMultiTrainerOp,
ops::CCommInitMultiTrainerInferShape,
ops::CCommInitMultiTrainerOpMaker);
32 changes: 32 additions & 0 deletions paddle/fluid/platform/collective_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,38 @@ void NCCLCommContext::CreateAllNCCLComms(const std::vector<int>& dev_ids,
});
}

void NCCLCommContext::CreateNCCLCommMultiTrainer(
const std::vector<int>& dev_ids, ncclUniqueId* nccl_id, int ntrainers,
int train_id, int ring_id) {
PADDLE_ENFORCE_GT(dev_ids.size(), 0);
const int kDevices = dev_ids.size();
VLOG(0) << "Begin CreateNCCLCommMultiTrainer. device number: " << kDevices
<< ", ntrainers: " << ntrainers << ", train_id: " << train_id
<< ", rind_id: " << ring_id;
ncclComm_t comms[kDevices];
{
PADDLE_ENFORCE_CUDA_SUCCESS(dynload::ncclGroupStart());
for (int i = 0; i < kDevices; i++) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaSetDevice(i));
platform::dynload::ncclCommInitRank(comms + i, kDevices * ntrainers,
*nccl_id, train_id * kDevices + i);
}
PADDLE_ENFORCE_CUDA_SUCCESS(dynload::ncclGroupEnd());
}
PADDLE_ENFORCE_EQ(comm_map_.count(ring_id), 0);
for (int i = 0; i < kDevices; ++i) {
AssignNCCLComm(comms[i], kDevices * ntrainers, train_id * kDevices + i,
dev_ids[i], ring_id);
VLOG(0) << "nccl communicator of train_id " << train_id * kDevices + i
<< " in ring " << ring_id << " has been created on device "
<< dev_ids[i];
}

std::call_once(once_flag_, []() {
std::atexit([]() { NCCLCommContext::Instance().ReleaseNCCLComms(); });
});
}

NCCLComm* NCCLCommContext::AssignNCCLComm(ncclComm_t comm, int nranks, int rank,
int dev_id, int ring_id) {
std::unique_ptr<CUDADeviceContext> dev_ctx(
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/platform/collective_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class NCCLCommContext {

void CreateAllNCCLComms(const std::vector<int>& dev_ids, int ring_id = 0);

void CreateNCCLCommMultiTrainer(const std::vector<int>& dev_ids,
ncclUniqueId* nccl_id, int nranks, int rank,
int ring_id);

// a latter comm with the same dev_id and the same ring_id
// will override the former
NCCLComm* AssignNCCLComm(ncclComm_t comm, int nranks, int rank, int dev_id,
Expand Down
65 changes: 47 additions & 18 deletions python/paddle/fluid/transpiler/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..framework import Program, default_main_program, default_startup_program
from .details import wait_server_ready

__all__ = ['GradAllReduce', 'LocalSGD']
__all__ = ['GradAllReduce', 'LocalSGD', 'MultiThread']

OpRole = core.op_proto_and_checker_maker.OpRole

Expand Down Expand Up @@ -96,8 +96,14 @@ def _transpile_startup_program(self):
self.wait_port)
self._broadcast_params()

def _init_communicator(self, program, current_endpoint, endpoints, rank,
ring_id, wait_port):
def _init_communicator(self,
program,
current_endpoint,
endpoints,
rank,
ring_id,
wait_port,
has_multitrainer=False):
nranks = len(endpoints)
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
Expand All @@ -119,16 +125,28 @@ def _init_communicator(self, program, current_endpoint, endpoints, rank,
'other_endpoints': other_endpoints,
self.op_role_key: OpRole.Forward
})
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': ring_id,
self.op_role_key: OpRole.Forward
})
if not has_multitrainer:
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': ring_id,
self.op_role_key: OpRole.Forward
})
else:
block.append_op(
type='c_comm_init_multitrainer',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'ntrainers': nranks,
'trainer_id': rank,
'ring_id': ring_id,
self.op_role_key: OpRole.Forward
})

def _broadcast_params(self):
block = self.startup_program.global_block()
Expand Down Expand Up @@ -375,14 +393,25 @@ def _transpile_main_program(self):
attrs={self.op_role_key: OpRole.Optimize})


class SingleProcessMultiThread(GradAllReduce):
class MultiThread(GradAllReduce):
'''
'''

def __init__(self):
GradAllReduce.__init__(self, 1)
def __init__(self, nrings=1):
GradAllReduce.__init__(self, nrings)
self.mode = "single_process_multi_thread"

def _transpile_startup_program(self):
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})
if len(self.endpoints) > 1:
print("begin to _transpile_startup_program for multi-node")
print("current_endpoint: ", self.current_endpoint)
print("total endpoints: ", self.endpoints)
print("rank: %d, ring_id: %d" % (self.rank, self.nrings))
for ring_id in range(self.nrings):
self._init_communicator(
self.startup_program, self.current_endpoint, self.endpoints,
self.rank, ring_id, self.wait_port, True)
else:
print("begin to _transpile_startup_program for single-node")
block = self.startup_program.global_block()
block.append_op(type='c_comm_init_all', attrs={'ring_id': 0})

0 comments on commit 3d16147

Please sign in to comment.