Skip to content

Commit

Permalink
【HETERPS】pipeline adaptive for heterps (#33159)
Browse files Browse the repository at this point in the history
* pipeline adaptive for heterps;test=develop
* fix finalize hang;test=develop
* add is_compiled_with_heterps for dataset;test=develop
* fix hashtable core when pass ins_num=0;test=develop
  • Loading branch information
danleifeng authored Jul 6, 2021
1 parent 69ffb38 commit bfef7fe
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 34 deletions.
127 changes: 115 additions & 12 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ namespace framework {
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;

void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
platform::Timer timeline;
timeline.Start();
Expand Down Expand Up @@ -137,17 +136,16 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
local_ptr[i].resize(local_keys[i].size());
}
timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &table_id,
&fleet_ptr](int i) {
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
#ifdef PADDLE_WITH_PSLIB
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
#ifdef PADDLE_WITH_PSCORE
auto tt = fleet_ptr->_worker_ptr->pull_sparse_ptr(
reinterpret_cast<char**>(local_ptr[i].data()), table_id,
reinterpret_cast<char**>(local_ptr[i].data()), this->table_id_,
local_keys[i].data(), key_size);
#endif
tt.wait();
Expand Down Expand Up @@ -270,11 +268,8 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
<< " seconds.";
}

void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
int device_num = heter_devices_.size();
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
BuildTask(gpu_task, table_id, feature_dim);
platform::Timer timeline;
timeline.Start();

Expand All @@ -289,6 +284,10 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
delete HeterPs_;
HeterPs_ = nullptr;
}
if (size_max <= 0) {
VLOG(1) << "Skip build gpu ps cause feasign nums = " << size_max;
return;
}
std::vector<std::thread> threads(device_num);
HeterPs_ = HeterPsBase::get_instance(size_max, resource_);
HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
Expand All @@ -297,7 +296,9 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
HeterPs_->show_one_table(i);
if (feature_keys_count[i] > 0) {
HeterPs_->show_one_table(i);
}
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(build_func, i);
Expand All @@ -308,7 +309,109 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
timeline.Pause();
VLOG(1) << "GpuPs build table total costs: " << timeline.ElapsedSec()
<< " s.";
gpu_task_pool_.Push(gpu_task);
}

void PSGPUWrapper::LoadIntoMemory(bool is_shuffle) {
platform::Timer timer;
VLOG(3) << "Begin LoadIntoMemory(), dataset[" << dataset_ << "]";
timer.Start();
dataset_->LoadIntoMemory();
timer.Pause();
VLOG(0) << "LoadIntoMemory cost: " << timer.ElapsedSec() << "s";

// local shuffle
if (is_shuffle) {
dataset_->LocalShuffle();
}

std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
data_ready_channel_->Put(gpu_task);
VLOG(3) << "End LoadIntoMemory(), dataset[" << dataset_ << "]";
}

void PSGPUWrapper::start_build_thread() {
running_ = true;
VLOG(3) << "start build CPU&GPU ps thread.";
build_cpu_threads_ = std::thread([this] { build_cpu_thread(); });
build_gpu_threads_ = std::thread([this] { build_gpu_thread(); });
}

void PSGPUWrapper::build_cpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!data_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildTask start.";
platform::Timer timer;
timer.Start();
// build cpu ps data process
BuildTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildTask end, cost time: " << timer.ElapsedSec() << "s";
buildcpu_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build cpu thread end";
}

void PSGPUWrapper::build_gpu_thread() {
while (running_) {
std::shared_ptr<HeterContext> gpu_task = nullptr;
if (!gpu_free_channel_->Get(gpu_task)) {
continue;
}
if (!buildcpu_ready_channel_->Get(gpu_task)) {
continue;
}
VLOG(3) << "thread BuildGPUTask start.";
platform::Timer timer;
timer.Start();
BuildGPUTask(gpu_task);
timer.Pause();
VLOG(1) << "thread BuildGPUTask end, cost time: " << timer.ElapsedSec()
<< "s";

gpu_task_pool_.Push(gpu_task);
train_ready_channel_->Put(gpu_task);
}
VLOG(3) << "build gpu thread end";
}

void PSGPUWrapper::BeginPass() {
platform::Timer timer;
timer.Start();
if (current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[BeginPass] current task is not ended."));
}
// load+build done
if (!train_ready_channel_->Get(current_task_)) {
PADDLE_THROW(platform::errors::Fatal("train_ready_channel_ failed."));
}
timer.Pause();
VLOG(1) << "BeginPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::EndPass() {
if (!current_task_) {
PADDLE_THROW(
platform::errors::Fatal("[EndPass] current task has been ended."));
}
platform::Timer timer;
timer.Start();
size_t keysize_max = 0;
// in case of feasign_num = 0, skip dump_to_cpu
for (size_t i = 0; i < heter_devices_.size(); i++) {
keysize_max = std::max(keysize_max, current_task_->device_keys_[i].size());
}
if (keysize_max != 0) {
HeterPs_->end_pass();
}
current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);
timer.Pause();
VLOG(1) << "EndPass end, cost time: " << timer.ElapsedSec() << "s";
}

void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
Expand Down
80 changes: 67 additions & 13 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,33 @@ class PSGPUWrapper {
const int hidden_size, const int64_t total_length,
const int batch_size);

void BuildGPUPS(const uint64_t table_id, int feature_dim);
void BuildTask(std::shared_ptr<HeterContext> gpu_task, uint64_t table_id,
int feature_dim);
void BuildGPUTask(std::shared_ptr<HeterContext> gpu_task);
void BuildTask(std::shared_ptr<HeterContext> gpu_task);
void LoadIntoMemory(bool is_shuffle);
void BeginPass();
void EndPass();
void start_build_thread();
void build_cpu_thread();
void build_gpu_thread();

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
data_ready_channel_->Close();
buildcpu_ready_channel_->Close();
gpu_free_channel_->Close();
train_ready_channel_->Close();
running_ = false;
VLOG(3) << "begin stop build_cpu_threads_";
build_cpu_threads_.join();
VLOG(3) << "begin stop build_gpu_threads_";
build_gpu_threads_.join();
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

void InitializeGPU(const std::vector<int>& dev_ids) {
if (s_instance_ != NULL && is_initialized_ == false) {
VLOG(3) << "PSGPUWrapper Begin InitializeGPU";
Expand Down Expand Up @@ -129,6 +153,24 @@ class PSGPUWrapper {
#endif
}
heter_devices_ = dev_ids;
data_ready_channel_->Open();
data_ready_channel_->SetCapacity(3);
buildcpu_ready_channel_->Open();
buildcpu_ready_channel_->SetCapacity(3);
gpu_free_channel_->Open();
gpu_free_channel_->SetCapacity(1);
train_ready_channel_->Open();
train_ready_channel_->SetCapacity(1);

current_task_ = nullptr;
gpu_free_channel_->Put(current_task_);

table_id_ = 1;
#ifdef PADDLE_WITH_PSLIB
table_id_ = 0;
#endif
// start build cpu&gpu ps thread
start_build_thread();
}
}

Expand Down Expand Up @@ -206,18 +248,8 @@ class PSGPUWrapper {
slot_vector_ = slot_vector;
}

void EndPass() { HeterPs_->end_pass(); }
void ShowOneTable(int index) { HeterPs_->show_one_table(index); }

void Finalize() {
VLOG(3) << "PSGPUWrapper Begin Finalize.";
if (s_instance_ == nullptr) {
return;
}
s_instance_ = nullptr;
VLOG(3) << "PSGPUWrapper Finalize Finished.";
}

private:
static std::shared_ptr<PSGPUWrapper> s_instance_;
Dataset* dataset_;
Expand All @@ -231,6 +263,7 @@ class PSGPUWrapper {
std::vector<int> slot_vector_;
int multi_node_{0};
int node_size_;
uint64_t table_id_;
std::vector<ncclComm_t> inner_comms_;
std::vector<ncclComm_t> inter_comms_;
std::vector<ncclUniqueId> inter_ncclids_;
Expand All @@ -242,6 +275,27 @@ class PSGPUWrapper {
int thread_keys_shard_num_ = 37;
uint64_t max_fea_num_per_pass_ = 5000000000;

std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
data_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
buildcpu_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
gpu_free_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
train_ready_channel_ =
paddle::framework::MakeChannel<std::shared_ptr<HeterContext>>();
std::shared_ptr<HeterContext> current_task_ = nullptr;
std::thread build_cpu_threads_;
std::thread build_gpu_threads_;
bool running_ = false;

protected:
static bool is_initialized_;
};
Expand Down
10 changes: 7 additions & 3 deletions paddle/fluid/framework/heter_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum HeterTaskState { PULL_SPARSE, OP_RUN, XPU, OP_RUN_END, PUSH_GRAD, DONE };
class HeterTask {
public:
HeterTask() {}
virtual ~HeterTask(){};
virtual ~HeterTask() {}

void Update() {
if (state_ == PULL_SPARSE) {
Expand Down Expand Up @@ -111,7 +111,7 @@ template <class T>
class HeterObjectPool {
public:
HeterObjectPool() {}
virtual ~HeterObjectPool(){};
virtual ~HeterObjectPool() {}
std::shared_ptr<T> Get() {
std::lock_guard<std::mutex> lock(mutex_);
if (pool_.empty()) {
Expand All @@ -131,6 +131,10 @@ class HeterObjectPool {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.size();
}
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return pool_.empty();
}
std::shared_ptr<T>& GetElement(int i) { return pool_[i]; }

private:
Expand Down Expand Up @@ -160,7 +164,7 @@ class BtObjectPool {
virtual ~BtObjectPool() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
};
}

std::shared_ptr<T> Get() {
BthreadMutextGuard guard(&mutex_);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/pybind/ps_gpu_wrapper_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("end_pass", &framework::PSGPUWrapper::EndPass,
py::call_guard<py::gil_scoped_release>())
.def("build_gpu_ps", &framework::PSGPUWrapper::BuildGPUPS,
.def("begin_pass", &framework::PSGPUWrapper::BeginPass,
py::call_guard<py::gil_scoped_release>())
.def("load_into_memory", &framework::PSGPUWrapper::LoadIntoMemory,
py::call_guard<py::gil_scoped_release>())
.def("finalize", &framework::PSGPUWrapper::Finalize,
py::call_guard<py::gil_scoped_release>());
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ bool IsCompiledWithMKLDNN() {
#endif
}

bool IsCompiledWithHETERPS() {
#ifndef PADDLE_WITH_HETERPS
return false;
#else
return true;
#endif
}

bool SupportsBfloat16() {
#ifndef PADDLE_WITH_MKLDNN
return false;
Expand Down Expand Up @@ -1910,6 +1918,7 @@ All parameter, weight, gradient are variables in Paddle.
m.def("is_compiled_with_npu", IsCompiledWithNPU);
m.def("is_compiled_with_xpu", IsCompiledWithXPU);
m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN);
m.def("_is_compiled_with_heterps", IsCompiledWithHETERPS);
m.def("supports_bfloat16", SupportsBfloat16);
m.def("supports_bfloat16_fast_performance", SupportsBfloat16FastPerformance);
m.def("op_supported_infos", OpSupportedInfos);
Expand Down
Loading

0 comments on commit bfef7fe

Please sign in to comment.