Skip to content

Commit

Permalink
add multi epoch train & fix train table change ins & save infer embed…
Browse files Browse the repository at this point in the history
…ing (PaddlePaddle#129)

* add multi epoch train & fix train table change ins & save infer embedding

* change epoch finish judge

* change epoch finish change

Co-authored-by: root <root@yq01-inf-hic-k8s-a100-ab2-0009.yq01.baidu.com>
  • Loading branch information
miaoli06 and root authored Sep 28, 2022
1 parent bdd64ef commit c6a07b2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 7 deletions.
36 changes: 29 additions & 7 deletions paddle/fluid/framework/data_feed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,16 @@ int GraphDataGenerator::FillInferBuf() {
return 0;
}

void GraphDataGenerator::ClearSampleState() {
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
auto &finish_node_type = gpu_graph_ptr->finish_node_type_[gpuid_];
auto &node_type_start = gpu_graph_ptr->node_type_start_[gpuid_];
finish_node_type.clear();
for (auto iter = node_type_start.begin(); iter != node_type_start.end(); iter++) {
iter->second = 0;
}
}

int GraphDataGenerator::FillWalkBuf() {
platform::CUDADeviceGuard guard(gpuid_);
size_t once_max_sample_keynum = walk_degree_ * once_sample_startid_len_;
Expand Down Expand Up @@ -992,9 +1002,12 @@ int GraphDataGenerator::FillWalkBuf() {
int tmp_len = start + once_sample_startid_len_ > device_key_size
? device_key_size - start
: once_sample_startid_len_;
bool update = true;
if (tmp_len == 0) {
finish_node_type.insert(node_type);
if (finish_node_type.size() == node_type_start.size()) {
cursor = 0;
epoch_finish_ = true;
break;
}
cursor += 1;
Expand Down Expand Up @@ -1027,12 +1040,14 @@ int GraphDataGenerator::FillWalkBuf() {

if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) {
if (InsertTable(d_type_keys + start, tmp_len) != 0) {
VLOG(2) << "table is full";
VLOG(2) << "in step 0, insert key stage, table is full";
update = false;
break;
}
if (InsertTable(sample_res.actual_val, sample_res.total_sample_size) !=
0) {
VLOG(2) << "table is full";
VLOG(2) << "in step 0, insert sample res stage, table is full";
update = false;
break;
}
}
Expand Down Expand Up @@ -1075,7 +1090,8 @@ int GraphDataGenerator::FillWalkBuf() {
// d_uniq_node_num, sample_stream_);
if (InsertTable(sample_res.actual_val, sample_res.total_sample_size) !=
0) {
VLOG(2) << "table is full";
VLOG(2) << "in step: " << step << ", table is full";
update = false;
break;
}
}
Expand All @@ -1095,10 +1111,15 @@ int GraphDataGenerator::FillWalkBuf() {
}
}
// 此时更新全局采样状态
node_type_start[node_type] = tmp_len + start;
i += jump_rows_ * walk_len_;
total_row_ += jump_rows_;
cursor += 1;
if (update == true) {
node_type_start[node_type] = tmp_len + start;
i += jump_rows_ * walk_len_;
total_row_ += jump_rows_;
cursor += 1;
} else {
VLOG(2) << "table is full, not update stat!";
break;
}
}
buf_state_.Reset(total_row_);
int *d_random_row = reinterpret_cast<int *>(d_random_row_->ptr());
Expand Down Expand Up @@ -1443,6 +1464,7 @@ void GraphDataGenerator::SetConfig(
once_sample_startid_len_ * walk_len_ * walk_degree_ * repeat_time_;
train_table_cap_ = graph_config.train_table_cap();
infer_table_cap_ = graph_config.infer_table_cap();
epoch_finish_ = false;
VLOG(0) << "Confirm GraphConfig, walk_degree : " << walk_degree_
<< ", walk_len : " << walk_len_ << ", window : " << window_
<< ", once_sample_startid_len : " << once_sample_startid_len_
Expand Down
29 changes: 29 additions & 0 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -924,12 +924,16 @@ class GraphDataGenerator {
int FillGraphSlotFeature(int total_instance, bool gpu_graph_training);
int MakeInsPair();
int GetPathNum() { return total_row_; }
void ResetPathNum() {total_row_ = 0; }
void ResetEpochFinish() {epoch_finish_ = false; }
void ClearSampleState();
void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
// type_to_index_[type] = h_device_keys_.size();
// h_device_keys_.push_back(device_keys);
}
int InsertTable(const unsigned long* d_keys, unsigned long len);
std::vector<uint64_t>& GetHostVec() { return host_vec_; }
bool get_epoch_finish() {return epoch_finish_; }
void clear_gpu_mem();

protected:
Expand Down Expand Up @@ -984,6 +988,7 @@ class GraphDataGenerator {
int shuffle_seed_;
int debug_mode_;
bool gpu_graph_training_;
bool epoch_finish_;
std::vector<uint64_t> host_vec_;
std::vector<uint64_t> h_device_keys_len_;
uint64_t train_table_cap_;
Expand Down Expand Up @@ -1072,6 +1077,12 @@ class DataFeed {
gpu_graph_data_generator_.clear_gpu_mem();
#endif
}
virtual bool get_epoch_finish() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
return gpu_graph_data_generator_.get_epoch_finish();
#endif
}

virtual void SetGpuGraphMode(int gpu_graph_mode) {
gpu_graph_mode_ = gpu_graph_mode;
}
Expand All @@ -1095,6 +1106,24 @@ class DataFeed {
return 0;
#endif
}
virtual void ResetPathNum() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.ResetPathNum();
#endif
}

virtual void ClearSampleState() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.ClearSampleState();
#endif
}

virtual void ResetEpochFinish() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.ResetEpochFinish();
#endif
}

virtual bool IsTrainMode() { return train_mode_; }
virtual void LoadIntoMemory() {
PADDLE_THROW(platform::errors::Unimplemented(
Expand Down
29 changes: 29 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#endif

USE_INT_STAT(STAT_total_feasign_num_in_mem);
USE_INT_STAT(STAT_epoch_finish);
DECLARE_bool(graph_get_neighbor_id);
DECLARE_int32(gpugraph_storage_mode);

Expand Down Expand Up @@ -461,6 +462,16 @@ void DatasetImpl<T>::LoadIntoMemory() {
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->SetGpuGraphMode(gpu_graph_mode_);
}

if (STAT_GET(STAT_epoch_finish) == 1) {
VLOG(0) << "get epoch finish true";
STAT_RESET(STAT_epoch_finish, 0);
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->ResetPathNum();
readers_[i]->ResetEpochFinish();
}
return;
}

for (int64_t i = 0; i < thread_num_; ++i) {
load_threads.push_back(
Expand All @@ -482,6 +493,13 @@ void DatasetImpl<T>::LoadIntoMemory() {
}
}

if (GetEpochFinish() == true) {
VLOG(0) << "epoch finish, set stat and clear sample stat!";
STAT_RESET(STAT_epoch_finish, 1);
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->ClearSampleState();
}
}
if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) {
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->clear_gpu_mem();
Expand Down Expand Up @@ -1097,6 +1115,17 @@ int64_t DatasetImpl<T>::GetMemoryDataSize() {
}
}

template <typename T>
bool DatasetImpl<T>::GetEpochFinish() {
bool is_epoch_finish = true;
if (gpu_graph_mode_) {
for (int i = 0; i < thread_num_; i++) {
is_epoch_finish = is_epoch_finish && readers_[i]->get_epoch_finish();
}
}
return is_epoch_finish;
}

template <typename T>
int64_t DatasetImpl<T>::GetPvDataSize() {
if (enable_pv_merge_) {
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class Dataset {

virtual void SetGpuGraphMode(int is_graph_mode) = 0;
virtual int GetGpuGraphMode() = 0;
virtual bool GetEpochFinish() = 0;

virtual void SetPassId(uint32_t pass_id) = 0;
virtual uint32_t GetPassID() = 0;
Expand Down Expand Up @@ -266,6 +267,7 @@ class DatasetImpl : public Dataset {
virtual void DynamicAdjustReadersNum(int thread_num);
virtual void SetFleetSendSleepSeconds(int seconds);
virtual std::vector<std::string> GetSlots();
virtual bool GetEpochFinish();

std::vector<paddle::framework::Channel<T>>& GetMultiOutputChannel() {
return multi_output_channel_;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/multi_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc,
places_.push_back(place);
}
#endif
user_define_dump_filename_ = trainer_desc.user_define_dump_filename();
// get filelist from trainer_desc here
const std::vector<paddle::framework::DataFeed*> readers =
dataset->GetReaders();
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/platform/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace platform {} // namespace platform
} // namespace paddle

DEFINE_INT_STATUS(STAT_total_feasign_num_in_mem)
DEFINE_INT_STATUS(STAT_epoch_finish)
DEFINE_INT_STATUS(STAT_gpu0_mem_size)
DEFINE_INT_STATUS(STAT_gpu1_mem_size)
DEFINE_INT_STATUS(STAT_gpu2_mem_size)
Expand Down
3 changes: 3 additions & 0 deletions python/paddle/fluid/trainer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def _create_trainer(self, opt_info=None):
if opt_info.get("dump_fields_path") is not None and len(
opt_info.get("dump_fields_path")) != 0:
trainer._set_dump_fields_path(opt_info["dump_fields_path"])
if opt_info.get("user_define_dump_filename") is not None and len(
opt_info.get("user_define_dump_filename")) != 0:
trainer._set_user_define_dump_filename(opt_info["user_define_dump_filename"])
if opt_info.get("dump_file_num") is not None:
trainer._set_dump_file_num(opt_info["dump_file_num"])
if opt_info.get("dump_converter") is not None:
Expand Down

0 comments on commit c6a07b2

Please sign in to comment.