From c6a07b203524ad772b4821bf876f546cdcab8111 Mon Sep 17 00:00:00 2001 From: miaoli06 <106585574+miaoli06@users.noreply.github.com> Date: Wed, 28 Sep 2022 19:32:27 +0800 Subject: [PATCH] add multi epoch train & fix train table change ins & save infer embeding (#129) * add multi epoch train & fix train table change ins & save infer embedding * change epoch finish judge * change epoch finish change Co-authored-by: root --- paddle/fluid/framework/data_feed.cu | 36 ++++++++++++++++++++----- paddle/fluid/framework/data_feed.h | 29 ++++++++++++++++++++ paddle/fluid/framework/data_set.cc | 29 ++++++++++++++++++++ paddle/fluid/framework/data_set.h | 2 ++ paddle/fluid/framework/multi_trainer.cc | 1 + paddle/fluid/platform/monitor.cc | 1 + python/paddle/fluid/trainer_factory.py | 3 +++ 7 files changed, 94 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cu b/paddle/fluid/framework/data_feed.cu index 1bc34de231a45..319ea8671c2d3 100644 --- a/paddle/fluid/framework/data_feed.cu +++ b/paddle/fluid/framework/data_feed.cu @@ -933,6 +933,16 @@ int GraphDataGenerator::FillInferBuf() { return 0; } +void GraphDataGenerator::ClearSampleState() { + auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); + auto &finish_node_type = gpu_graph_ptr->finish_node_type_[gpuid_]; + auto &node_type_start = gpu_graph_ptr->node_type_start_[gpuid_]; + finish_node_type.clear(); + for (auto iter = node_type_start.begin(); iter != node_type_start.end(); iter++) { + iter->second = 0; + } +} + int GraphDataGenerator::FillWalkBuf() { platform::CUDADeviceGuard guard(gpuid_); size_t once_max_sample_keynum = walk_degree_ * once_sample_startid_len_; @@ -992,9 +1002,12 @@ int GraphDataGenerator::FillWalkBuf() { int tmp_len = start + once_sample_startid_len_ > device_key_size ? device_key_size - start : once_sample_startid_len_; + bool update = true; if (tmp_len == 0) { finish_node_type.insert(node_type); if (finish_node_type.size() == node_type_start.size()) { + cursor = 0; + epoch_finish_ = true; break; } cursor += 1; @@ -1027,12 +1040,14 @@ int GraphDataGenerator::FillWalkBuf() { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { if (InsertTable(d_type_keys + start, tmp_len) != 0) { - VLOG(2) << "table is full"; + VLOG(2) << "in step 0, insert key stage, table is full"; + update = false; break; } if (InsertTable(sample_res.actual_val, sample_res.total_sample_size) != 0) { - VLOG(2) << "table is full"; + VLOG(2) << "in step 0, insert sample res stage, table is full"; + update = false; break; } } @@ -1075,7 +1090,8 @@ int GraphDataGenerator::FillWalkBuf() { // d_uniq_node_num, sample_stream_); if (InsertTable(sample_res.actual_val, sample_res.total_sample_size) != 0) { - VLOG(2) << "table is full"; + VLOG(2) << "in step: " << step << ", table is full"; + update = false; break; } } @@ -1095,10 +1111,15 @@ int GraphDataGenerator::FillWalkBuf() { } } // 此时更新全局采样状态 - node_type_start[node_type] = tmp_len + start; - i += jump_rows_ * walk_len_; - total_row_ += jump_rows_; - cursor += 1; + if (update == true) { + node_type_start[node_type] = tmp_len + start; + i += jump_rows_ * walk_len_; + total_row_ += jump_rows_; + cursor += 1; + } else { + VLOG(2) << "table is full, not update stat!"; + break; + } } buf_state_.Reset(total_row_); int *d_random_row = reinterpret_cast(d_random_row_->ptr()); @@ -1443,6 +1464,7 @@ void GraphDataGenerator::SetConfig( once_sample_startid_len_ * walk_len_ * walk_degree_ * repeat_time_; train_table_cap_ = graph_config.train_table_cap(); infer_table_cap_ = graph_config.infer_table_cap(); + epoch_finish_ = false; VLOG(0) << "Confirm GraphConfig, walk_degree : " << walk_degree_ << ", walk_len : " << walk_len_ << ", window : " << window_ << ", once_sample_startid_len : " << once_sample_startid_len_ diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 35774170d13f0..2268a2c3efa28 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -924,12 +924,16 @@ class GraphDataGenerator { int FillGraphSlotFeature(int total_instance, bool gpu_graph_training); int MakeInsPair(); int GetPathNum() { return total_row_; } + void ResetPathNum() {total_row_ = 0; } + void ResetEpochFinish() {epoch_finish_ = false; } + void ClearSampleState(); void SetDeviceKeys(std::vector* device_keys, int type) { // type_to_index_[type] = h_device_keys_.size(); // h_device_keys_.push_back(device_keys); } int InsertTable(const unsigned long* d_keys, unsigned long len); std::vector& GetHostVec() { return host_vec_; } + bool get_epoch_finish() {return epoch_finish_; } void clear_gpu_mem(); protected: @@ -984,6 +988,7 @@ class GraphDataGenerator { int shuffle_seed_; int debug_mode_; bool gpu_graph_training_; + bool epoch_finish_; std::vector host_vec_; std::vector h_device_keys_len_; uint64_t train_table_cap_; @@ -1072,6 +1077,12 @@ class DataFeed { gpu_graph_data_generator_.clear_gpu_mem(); #endif } + virtual bool get_epoch_finish() { +#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS) + return gpu_graph_data_generator_.get_epoch_finish(); +#endif + } + virtual void SetGpuGraphMode(int gpu_graph_mode) { gpu_graph_mode_ = gpu_graph_mode; } @@ -1095,6 +1106,24 @@ class DataFeed { return 0; #endif } + virtual void ResetPathNum() { +#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS) + gpu_graph_data_generator_.ResetPathNum(); +#endif + } + + virtual void ClearSampleState() { +#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS) + gpu_graph_data_generator_.ClearSampleState(); +#endif + } + + virtual void ResetEpochFinish() { +#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS) + gpu_graph_data_generator_.ResetEpochFinish(); +#endif +} + virtual bool IsTrainMode() { return train_mode_; } virtual void LoadIntoMemory() { PADDLE_THROW(platform::errors::Unimplemented( diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 789c9139400c5..e22136fdaf707 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -36,6 +36,7 @@ #endif USE_INT_STAT(STAT_total_feasign_num_in_mem); +USE_INT_STAT(STAT_epoch_finish); DECLARE_bool(graph_get_neighbor_id); DECLARE_int32(gpugraph_storage_mode); @@ -461,6 +462,16 @@ void DatasetImpl::LoadIntoMemory() { for (size_t i = 0; i < readers_.size(); i++) { readers_[i]->SetGpuGraphMode(gpu_graph_mode_); } + + if (STAT_GET(STAT_epoch_finish) == 1) { + VLOG(0) << "get epoch finish true"; + STAT_RESET(STAT_epoch_finish, 0); + for (size_t i = 0; i < readers_.size(); i++) { + readers_[i]->ResetPathNum(); + readers_[i]->ResetEpochFinish(); + } + return; + } for (int64_t i = 0; i < thread_num_; ++i) { load_threads.push_back( @@ -482,6 +493,13 @@ void DatasetImpl::LoadIntoMemory() { } } + if (GetEpochFinish() == true) { + VLOG(0) << "epoch finish, set stat and clear sample stat!"; + STAT_RESET(STAT_epoch_finish, 1); + for (size_t i = 0; i < readers_.size(); i++) { + readers_[i]->ClearSampleState(); + } + } if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { for (size_t i = 0; i < readers_.size(); i++) { readers_[i]->clear_gpu_mem(); @@ -1097,6 +1115,17 @@ int64_t DatasetImpl::GetMemoryDataSize() { } } +template +bool DatasetImpl::GetEpochFinish() { + bool is_epoch_finish = true; + if (gpu_graph_mode_) { + for (int i = 0; i < thread_num_; i++) { + is_epoch_finish = is_epoch_finish && readers_[i]->get_epoch_finish(); + } + } + return is_epoch_finish; +} + template int64_t DatasetImpl::GetPvDataSize() { if (enable_pv_merge_) { diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 57c42780de9c7..9e1998a35fd64 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -169,6 +169,7 @@ class Dataset { virtual void SetGpuGraphMode(int is_graph_mode) = 0; virtual int GetGpuGraphMode() = 0; + virtual bool GetEpochFinish() = 0; virtual void SetPassId(uint32_t pass_id) = 0; virtual uint32_t GetPassID() = 0; @@ -266,6 +267,7 @@ class DatasetImpl : public Dataset { virtual void DynamicAdjustReadersNum(int thread_num); virtual void SetFleetSendSleepSeconds(int seconds); virtual std::vector GetSlots(); + virtual bool GetEpochFinish(); std::vector>& GetMultiOutputChannel() { return multi_output_channel_; diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index ceed8cb6bfa63..96a473be1aa8c 100755 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -48,6 +48,7 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, places_.push_back(place); } #endif + user_define_dump_filename_ = trainer_desc.user_define_dump_filename(); // get filelist from trainer_desc here const std::vector readers = dataset->GetReaders(); diff --git a/paddle/fluid/platform/monitor.cc b/paddle/fluid/platform/monitor.cc index ea6240b649cad..dd38ce7956309 100644 --- a/paddle/fluid/platform/monitor.cc +++ b/paddle/fluid/platform/monitor.cc @@ -19,6 +19,7 @@ namespace platform {} // namespace platform } // namespace paddle DEFINE_INT_STATUS(STAT_total_feasign_num_in_mem) +DEFINE_INT_STATUS(STAT_epoch_finish) DEFINE_INT_STATUS(STAT_gpu0_mem_size) DEFINE_INT_STATUS(STAT_gpu1_mem_size) DEFINE_INT_STATUS(STAT_gpu2_mem_size) diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 3ba9f9eea46d1..945b28aac88de 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -73,6 +73,9 @@ def _create_trainer(self, opt_info=None): if opt_info.get("dump_fields_path") is not None and len( opt_info.get("dump_fields_path")) != 0: trainer._set_dump_fields_path(opt_info["dump_fields_path"]) + if opt_info.get("user_define_dump_filename") is not None and len( + opt_info.get("user_define_dump_filename")) != 0: + trainer._set_user_define_dump_filename(opt_info["user_define_dump_filename"]) if opt_info.get("dump_file_num") is not None: trainer._set_dump_file_num(opt_info["dump_file_num"]) if opt_info.get("dump_converter") is not None: