From bd45afdaff616586994912307b1366ab50607904 Mon Sep 17 00:00:00 2001 From: qshuihu Date: Wed, 12 Jan 2022 12:05:35 +0800 Subject: [PATCH 1/6] 1. add local ssd cache mode --- paddle/fluid/framework/data_feed.cc | 310 +++++++++++++++++---- paddle/fluid/framework/data_feed.h | 35 ++- paddle/fluid/framework/data_set.cc | 234 ++++++++++++++-- paddle/fluid/framework/data_set.h | 40 ++- paddle/fluid/framework/fleet/box_wrapper.h | 5 +- paddle/fluid/framework/threadpool.h | 10 +- paddle/fluid/pybind/data_set_py.cc | 11 +- python/paddle/fluid/dataset.py | 38 +++ 8 files changed, 605 insertions(+), 78 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index d953f1966e14c..1b17e1db3a092 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -2021,11 +2021,172 @@ void PaddleBoxDataFeed::PutToFeedVec(const std::vector& ins_vec) { //================================ new boxps //============================================= #ifdef PADDLE_WITH_BOX_PS +static const int MAX_FILE_BUFF = 4 * 1024 * 1024; +static const int PAGE_BLOCK_SIZE = 4096; +static const int INT_BYTES = sizeof(int); +BinaryArchiveWriter::BinaryArchiveWriter() : fd_(-1) { + capacity_ = MAX_FILE_BUFF + 64 * 1024; + buff_ = reinterpret_cast(malloc(capacity_)); +} +BinaryArchiveWriter::~BinaryArchiveWriter() { + close(); + if (buff_ != nullptr) { + free(buff_); + buff_ = nullptr; + } +} +bool BinaryArchiveWriter::open(const std::string& path) { + fd_ = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC | O_APPEND | O_DIRECT, + 0777); + if (fd_ < 0) { + VLOG(0) << "open [" << path << "] failed"; + return false; + } + head_ = buff_; + woffset_ = INT_BYTES; + return true; +} +bool BinaryArchiveWriter::write(const SlotRecord& rec) { + thread_local BinaryArchive ar; + mutex_.lock(); + ar.SetWriteBuffer(buff_ + woffset_, capacity_ - woffset_, nullptr); + ar << rec; + woffset_ += ar.Length(); + if (woffset_ < MAX_FILE_BUFF) { + mutex_.unlock(); + return true; + } + // set data length + int data_len = woffset_ - (head_ - buff_) - INT_BYTES; + CHECK(data_len > 0 && woffset_ <= capacity_) + << "write offset: " << woffset_ << ", head offset:" << (head_ - buff_) + << ", capacity: " << capacity_; + *(reinterpret_cast(head_)) = data_len; + // dio padding 4k + int left = (woffset_ % PAGE_BLOCK_SIZE); + int write_len = (woffset_ - left); + int ret = ::write(fd_, buff_, write_len); + memmove(buff_, buff_ + write_len, left); + woffset_ = left + INT_BYTES; + head_ = buff_ + left; + mutex_.unlock(); + + return (ret == write_len); +} +void BinaryArchiveWriter::close(void) { + if (fd_ < 0) { + return; + } + mutex_.lock(); + if (woffset_ > INT_BYTES) { + // set data length + int data_len = woffset_ - (head_ - buff_) - INT_BYTES; + CHECK(data_len >= 0) << "write offset: " << woffset_ + << ", head offset: " << (head_ - buff_); + int pad_len = woffset_; + if (data_len == 0) { + pad_len = pad_len - INT_BYTES; + } else { + *(reinterpret_cast(head_)) = data_len; + } + if ((pad_len % PAGE_BLOCK_SIZE) != 0) { + *(reinterpret_cast(&buff_[pad_len])) = 0; + pad_len += (PAGE_BLOCK_SIZE - (pad_len % PAGE_BLOCK_SIZE)); + } + // dio write 4k + CHECK(::write(fd_, buff_, pad_len) == pad_len); + woffset_ = 0; + } + mutex_.unlock(); + ::close(fd_); + fd_ = -1; +} +class BinaryArchiveReader { + public: + BinaryArchiveReader() { + capacity_ = MAX_FILE_BUFF + 64 * 1024; + buff_ = reinterpret_cast(malloc(capacity_)); + } + ~BinaryArchiveReader() { + if (buff_ != nullptr) { + free(buff_); + buff_ = nullptr; + } + } + bool open(const std::string& path) { + fd_ = ::open(path.c_str(), O_RDONLY); + if (fd_ < 0) { + VLOG(0) << "open [" << path << "] failed"; + return false; + } + return true; + } + int read_all(std::function proc_func) { // NOLINT + int lines = 0; + + int ret = 0; + int body_len = 0; + int left_len = 0; + int need_len = 0; + char* ptr = buff_; + + BinaryArchive ar; + while ((ret = ::read(fd_, ptr, capacity_ - left_len)) > 0) { + left_len += ret; + ptr = &buff_[0]; + body_len = *(reinterpret_cast(ptr)); + if (body_len <= 0) { + break; + } + need_len = body_len + INT_BYTES; + if (left_len < need_len) { + VLOG(0) << "left length: " << left_len + << " less need length: " << need_len; + break; + } + while (left_len >= need_len) { + ar.SetReadBuffer(ptr + INT_BYTES, body_len, nullptr); + lines += proc_func(ar); + ptr += need_len; + left_len -= need_len; + if (left_len < INT_BYTES) { + break; + } + body_len = *(reinterpret_cast(ptr)); + if (body_len <= 0) { + break; + } + need_len = body_len + INT_BYTES; + } + if (left_len > 0) { + memmove(&buff_[0], ptr, left_len); + ptr = &buff_[0] + left_len; + } else { + ptr = &buff_[0]; + } + } + + return lines; + } + void close(void) { + if (fd_ < 0) { + return; + } + ::close(fd_); + } + + private: + int fd_ = -1; + char* buff_ = nullptr; + size_t capacity_ = 0; +}; void SlotPaddleBoxDataFeed::Init(const DataFeedDesc& data_feed_desc) { finish_init_ = false; finish_set_filelist_ = false; finish_start_ = false; + slot_pool_ = &SlotRecordPool(); + PADDLE_ENFORCE(data_feed_desc.has_multi_slot_desc(), "Multi_slot_desc has not been set."); paddle::framework::MultiSlotDesc multi_slot_desc = @@ -2694,14 +2855,14 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLine(void) { int offset = 0; int old_offset = 0; - SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + slot_pool_->get(&record_vec, OBJPOOL_BLOCK_SIZE); // get slotrecord object function auto record_func = [this, &offset, &record_vec, &old_offset]( std::vector& vec, int num) { vec.resize(num); if (offset + num > OBJPOOL_BLOCK_SIZE) { input_channel_->WriteMove(offset, &record_vec[0]); - SlotRecordPool().get(&record_vec[0], offset); + slot_pool_->get(&record_vec[0], offset); record_vec.resize(OBJPOOL_BLOCK_SIZE); offset = 0; old_offset = 0; @@ -2724,9 +2885,9 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLine(void) { return false; } if (offset >= OBJPOOL_BLOCK_SIZE) { - input_channel_->Write(std::move(record_vec)); + input_channel_->WriteMove(offset, &record_vec[0]); record_vec.clear(); - SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + slot_pool_->get(&record_vec, OBJPOOL_BLOCK_SIZE); offset = 0; } return true; @@ -2756,11 +2917,10 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLine(void) { if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); if (offset < OBJPOOL_BLOCK_SIZE) { - SlotRecordPool().put(&record_vec[offset], - (OBJPOOL_BLOCK_SIZE - offset)); + slot_pool_->put(&record_vec[offset], (OBJPOOL_BLOCK_SIZE - offset)); } } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } record_vec.clear(); record_vec.shrink_to_fit(); @@ -2796,17 +2956,17 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByFile(void) { if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); if (max_fetch_num > 0) { - SlotRecordPool().get(&record_vec[0], offset); + slot_pool_->get(&record_vec[0], offset); } else { // free all max_fetch_num = static_cast(record_vec.size()); if (max_fetch_num > offset) { - SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + slot_pool_->put(&record_vec[offset], (max_fetch_num - offset)); } } } else if (max_fetch_num > 0) { - SlotRecordPool().get(&record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } }; @@ -2861,8 +3021,69 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByFile(void) { } } +// load local archive file +void SlotPaddleBoxDataFeed::LoadIntoMemoryByArchive(void) { + BinaryArchiveReader reader; + std::string filename; + while (this->PickOneFile(&filename)) { + VLOG(3) << "LoadIntoMemoryByArchive PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + platform::Timer timeline; + timeline.Start(); + + int lines = 0; + while (!reader.open(filename)) { + sleep(1); + } + + int offset = 0; + std::vector data; + slot_pool_->get(&data, OBJPOOL_BLOCK_SIZE); + + auto func = [this, &offset, &data](BinaryArchive& ar) { + int lines = 0; + while (ar.Cursor() < ar.Finish()) { + auto& r = data[offset++]; + // CHECK(r != nullptr); + ar >> r; + // r->debug(); + if (offset >= OBJPOOL_BLOCK_SIZE) { + CHECK(input_channel_->WriteMove(offset, &data[0]) == + static_cast(offset)); + data.clear(); + offset = 0; + slot_pool_->get(&data, OBJPOOL_BLOCK_SIZE); + } + ++lines; + } + return lines; + }; + lines = reader.read_all(func); + + if (offset > 0) { + CHECK(input_channel_->WriteMove(offset, &data[0]) == + static_cast(offset)); + if (offset < OBJPOOL_BLOCK_SIZE) { + slot_pool_->put(&data[offset], (OBJPOOL_BLOCK_SIZE - offset)); + } + } else { + slot_pool_->put(&data); + } + + reader.close(); + + timeline.Pause(); + + VLOG(3) << "LoadIntoMemoryByArchive() read all file, file=" << filename + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_ << ", lines=" << lines; + } +} + void SlotPaddleBoxDataFeed::LoadIntoMemoryByLib(void) { - if (FLAGS_enable_ins_parser_file) { + if (is_archive_file_) { + LoadIntoMemoryByArchive(); + } else if (FLAGS_enable_ins_parser_file) { // user defined file format analysis LoadIntoMemoryByFile(); } else { @@ -2882,7 +3103,7 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByCommand(void) { std::vector record_vec; platform::Timer timeline; timeline.Start(); - SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + slot_pool_->get(&record_vec, OBJPOOL_BLOCK_SIZE); int offset = 0; do { @@ -2907,9 +3128,9 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByCommand(void) { return false; } if (offset >= OBJPOOL_BLOCK_SIZE) { - input_channel_->Write(std::move(record_vec)); + input_channel_->WriteMove(offset, &record_vec[0]); record_vec.clear(); - SlotRecordPool().get(&record_vec, OBJPOOL_BLOCK_SIZE); + slot_pool_->get(&record_vec, OBJPOOL_BLOCK_SIZE); offset = 0; } return true; @@ -2919,11 +3140,10 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByCommand(void) { if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); if (offset < OBJPOOL_BLOCK_SIZE) { - SlotRecordPool().put(&record_vec[offset], - (OBJPOOL_BLOCK_SIZE - offset)); + slot_pool_->put(&record_vec[offset], (OBJPOOL_BLOCK_SIZE - offset)); } } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } record_vec.clear(); record_vec.shrink_to_fit(); @@ -3070,7 +3290,7 @@ void SlotPaddleBoxDataFeed::UnrollInstance(std::vector& items) { CHECK(parser != nullptr); if (parser->UnrollInstance(items, items.size(), [this](std::vector& release) { - SlotRecordPool().put(&release); + slot_pool_->put(&release); release.clear(); release.shrink_to_fit(); })) { @@ -3090,7 +3310,6 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { } std::string filename; BufferedLineFileReader line_reader; - int from_pool_num = 0; while (this->PickOneFile(&filename)) { VLOG(3) << "PickOneFile, filename=" << filename << ", thread_id=" << thread_id_; @@ -3100,12 +3319,11 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { const int max_fetch_num = OBJPOOL_BLOCK_SIZE; int offset = 0; - SlotRecordPool().get(&record_vec, max_fetch_num); - from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); auto& set = box_ptr->gpu_replica_cache.back(); auto func = [this, &parser, &set, &record_vec, &offset, &max_fetch_num, - &from_pool_num, &filename](const std::string& line) { + &filename](const std::string& line) { int old_offset = offset; if (!parser->ParseOneInstance( line, @@ -3119,7 +3337,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { // Considering the prob of show expanding is low, so we don't // update STAT here input_channel_->WriteMove(offset, &record_vec[0]); - SlotRecordPool().get(&record_vec[0], offset); + slot_pool_->get(&record_vec[0], offset); record_vec.resize(max_fetch_num); offset = 0; old_offset = 0; @@ -3137,12 +3355,9 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { return false; } if (offset >= max_fetch_num) { - input_channel_->Write(std::move(record_vec)); - STAT_ADD(STAT_total_feasign_num_in_mem, - GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); + input_channel_->WriteMove(offset, &record_vec[0]); record_vec.clear(); - SlotRecordPool().get(&record_vec, max_fetch_num); - from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); offset = 0; } return true; @@ -3170,13 +3385,11 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) { } while (line_reader.is_error()); if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); - STAT_ADD(STAT_total_feasign_num_in_mem, - GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); if (offset < max_fetch_num) { - SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + slot_pool_->put(&record_vec[offset], (max_fetch_num - offset)); } } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } record_vec.clear(); record_vec.shrink_to_fit(); @@ -3209,7 +3422,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByCommand(void) { int offset = 0; int gpu_cache_offset; int max_fetch_num = OBJPOOL_BLOCK_SIZE; - SlotRecordPool().get(&record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); do { if (box_ptr->UseAfsApi()) { this->fp_ = box_ptr->OpenReadFile(filename, this->pipe_command_); @@ -3242,9 +3455,9 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByCommand(void) { return false; } if (offset >= max_fetch_num) { - input_channel_->Write(std::move(record_vec)); + input_channel_->WriteMove(offset, &record_vec[0]); record_vec.clear(); - SlotRecordPool().get(&record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); offset = 0; } return true; @@ -3254,10 +3467,10 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByCommand(void) { if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); if (offset < max_fetch_num) { - SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + slot_pool_->put(&record_vec[offset], (max_fetch_num - offset)); } } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } record_vec.clear(); record_vec.shrink_to_fit(); @@ -3395,7 +3608,6 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { BufferedLineFileReader line_reader; line_reader.set_sample_rate(sample_rate_); - int from_pool_num = 0; auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); PADDLE_ENFORCE(!box_ptr->input_table_deque_.empty()); while (this->PickOneFile(&filename)) { @@ -3407,10 +3619,9 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { const int max_fetch_num = OBJPOOL_BLOCK_SIZE; int offset = 0; - SlotRecordPool().get(&record_vec, max_fetch_num); - from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); auto func = [this, &box_ptr, &parser, &record_vec, &offset, &max_fetch_num, - &from_pool_num, &filename](const std::string& line) { + &filename](const std::string& line) { int old_offset = offset; auto GetOffsetFunc = [&box_ptr](std::string& key) -> uint64_t { return box_ptr->input_table_deque_.back().GetIndexOffset(key); @@ -3425,7 +3636,7 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { // Considering the prob of show expanding is low, so we don't // update STAT here input_channel_->WriteMove(offset, &record_vec[0]); - SlotRecordPool().get(&record_vec[0], offset); + slot_pool_->get(&record_vec[0], offset); record_vec.resize(max_fetch_num); offset = 0; old_offset = 0; @@ -3443,12 +3654,9 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { return false; } if (offset >= max_fetch_num) { - input_channel_->Write(std::move(record_vec)); - STAT_ADD(STAT_total_feasign_num_in_mem, - GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); + input_channel_->WriteMove(offset, &record_vec[0]); record_vec.clear(); - SlotRecordPool().get(&record_vec, max_fetch_num); - from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + slot_pool_->get(&record_vec, max_fetch_num); offset = 0; } return true; @@ -3476,13 +3684,11 @@ void InputTableDataFeed::LoadIntoMemoryByLib() { } while (line_reader.is_error()); if (offset > 0) { input_channel_->WriteMove(offset, &record_vec[0]); - STAT_ADD(STAT_total_feasign_num_in_mem, - GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); if (offset < max_fetch_num) { - SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + slot_pool_->put(&record_vec[offset], (max_fetch_num - offset)); } } else { - SlotRecordPool().put(&record_vec); + slot_pool_->put(&record_vec); } record_vec.clear(); record_vec.shrink_to_fit(); diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index aa9259be4f8a9..7c07d935ef950 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -11,7 +11,6 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - #pragma once #if defined _WIN32 || defined __APPLE__ #else @@ -229,6 +228,7 @@ class DataFeed { } virtual const paddle::platform::Place& GetPlace() const { return place_; } virtual void SetSampleRate(float r) { sample_rate_ = r; } + virtual void SetLoadArchiveFile(bool archive) { is_archive_file_ = archive; } protected: // The following three functions are used to check if it is executed in this @@ -289,6 +289,7 @@ class DataFeed { // The input type of pipe reader, 0 for one sample, 1 for one batch int input_type_; float sample_rate_ = 1.0f; + bool is_archive_file_ = false; }; // PrivateQueueDataFeed is the base virtual class for ohther DataFeeds. @@ -838,6 +839,11 @@ struct SlotRecordObject { slot_uint64_feasigns_.clear(shrink); slot_float_feasigns_.clear(shrink); } + void debug(void) { + VLOG(0) << "ins:" << ins_id_ + << ", uint64:" << slot_uint64_feasigns_.slot_values.size() + << ", float:" << slot_float_feasigns_.slot_values.size(); + } }; using SlotRecord = SlotRecordObject*; @@ -1049,6 +1055,11 @@ inline SlotObjPool& SlotRecordPool() { static SlotObjPool pool; return pool; } +// down disk pool +inline SlotObjPool& SlotRecordDownPool() { + static SlotObjPool pool; + return pool; +} using FeasignValues = SlotValues; @@ -1584,7 +1595,25 @@ inline MiniBatchGpuPackMgr& BatchGpuPackMgr() { return mgr; } #endif +/** + * @Brief binary archive file + */ +class BinaryArchiveWriter { + public: + BinaryArchiveWriter(); + ~BinaryArchiveWriter(); + bool open(const std::string& path); + bool write(const SlotRecord& rec); + void close(void); + private: + std::mutex mutex_; + int fd_; + char* buff_ = nullptr; + int woffset_ = 0; + int capacity_ = 0; + char* head_ = nullptr; +}; class SlotPaddleBoxDataFeed : public DataFeed { public: SlotPaddleBoxDataFeed() { finish_start_ = false; } @@ -1648,6 +1677,7 @@ class SlotPaddleBoxDataFeed : public DataFeed { bool EnablePvMerge(void); int GetPackInstance(SlotRecord** ins); int GetPackPvInstance(SlotPvInstance** pv_ins); + void SetSlotRecordPool(SlotObjPool* pool) { slot_pool_ = pool; } public: virtual void Init(const DataFeedDesc& data_feed_desc); @@ -1673,6 +1703,8 @@ class SlotPaddleBoxDataFeed : public DataFeed { virtual void LoadIntoMemoryByLine(void); // split all file virtual void LoadIntoMemoryByFile(void); + // load local archive file + virtual void LoadIntoMemoryByArchive(void); private: #if defined(PADDLE_WITH_CUDA) && defined(_LINUX) @@ -1734,6 +1766,7 @@ class SlotPaddleBoxDataFeed : public DataFeed { platform::Timer data_timer_; platform::Timer trans_timer_; platform::Timer copy_timer_; + SlotObjPool* slot_pool_ = nullptr; }; class SlotPaddleBoxDataFeedWithGpuReplicaCache : public SlotPaddleBoxDataFeed { diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 3f9156dc19ca2..1a55896163b58 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -35,8 +35,6 @@ #define _LINUX #endif -DECLARE_bool(padbox_dataset_disable_shuffle); -DECLARE_bool(padbox_dataset_disable_polling); DECLARE_bool(padbox_dataset_enable_unrollinstance); namespace paddle { @@ -1426,7 +1424,8 @@ class PadBoxSlotDataConsumer : public boxps::DataConsumer { PadBoxSlotDataset::PadBoxSlotDataset() { mpi_size_ = boxps::MPICluster::Ins().size(); mpi_rank_ = boxps::MPICluster::Ins().rank(); - SlotRecordPool(); + + slot_pool_ = &SlotRecordPool(); auto boxps_ptr = BoxWrapper::GetInstance(); int thread_num = boxps_ptr->GetFeedpassThreadNum(); @@ -1434,7 +1433,6 @@ PadBoxSlotDataset::PadBoxSlotDataset() { thread_num = FLAGS_padbox_dataset_merge_thread_num; } merge_thread_num_ = thread_num; - pass_id_ = boxps_ptr->GetDataSetId(); } PadBoxSlotDataset::~PadBoxSlotDataset() {} // create input channel and output channel @@ -1451,7 +1449,7 @@ void PadBoxSlotDataset::CreateChannel() { // set filelist, file_idx_ will reset to zero. void PadBoxSlotDataset::SetFileList(const std::vector& filelist) { VLOG(3) << "filelist size: " << filelist.size(); - if (mpi_size_ > 1 && !FLAGS_padbox_dataset_disable_polling) { + if (mpi_size_ > 1 && !disable_polling_) { // dualbox int num = static_cast(filelist.size()); for (int i = mpi_rank_; i < num; i = i + mpi_size_) { @@ -1488,6 +1486,15 @@ void PadBoxSlotDataset::CheckThreadPool(void) { if (thread_pool_ != nullptr && merge_pool_ != nullptr) { return; } + if (enable_pv_merge_ || FLAGS_enable_shuffle_by_searchid) { // shuffle by pv + VLOG(0) << "pass id=" << pass_id_ << ", use shuffle by search id"; + } else if (merge_by_insid_) { // shuffle by lineid + VLOG(0) << "pass id=" << pass_id_ << ", use shuffle by line id"; + } else { // shuffle + VLOG(0) << "pass id=" << pass_id_ << ", use shuffle by random id"; + } + VLOG(0) << "pass id=" << pass_id_ << ", shuffle disable: " << disable_shuffle_ + << ", polling disable: " << disable_polling_; used_fea_index_.clear(); auto feed_obj = reinterpret_cast(readers_[0].get()); feed_obj->GetUsedSlotIndex(&used_fea_index_); @@ -1497,12 +1504,12 @@ void PadBoxSlotDataset::CheckThreadPool(void) { // merge thread merge_pool_ = GetMergePool(merge_thread_num_ * 2); // shuffle thread - if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + if (!disable_shuffle_ && mpi_size_ > 1) { shuffle_pool_ = GetShufflePool(shuffle_thread_num_ * 2); } std::vector& cores = boxps::get_readins_cores(); - if (cores.empty()) { + if (cores.empty() || is_archive_file_) { return; } thread_pool_->SetCPUAffinity(cores, false); @@ -1511,12 +1518,199 @@ void PadBoxSlotDataset::CheckThreadPool(void) { shuffle_pool_->SetCPUAffinity(cores, false); } } +inline paddle::framework::ThreadPool* GetDownPool(int thread_num) { + static std::shared_ptr thread_pool = nullptr; + if (thread_pool == nullptr) { + thread_pool.reset(new paddle::framework::ThreadPool(thread_num)); + } + return thread_pool.get(); +} +inline paddle::framework::ThreadPool* GetDumpPool(int thread_num) { + static std::shared_ptr thread_pool = nullptr; + if (thread_pool == nullptr) { + thread_pool.reset(new paddle::framework::ThreadPool(thread_num)); + } + return thread_pool.get(); +} +void PadBoxSlotDataset::CheckDownThreadPool(void) { + slot_pool_ = &SlotRecordDownPool(); + wait_futures_.clear(); + if (down_pool_ != nullptr && dump_pool_ != nullptr) { + return; + } + if (enable_pv_merge_ || FLAGS_enable_shuffle_by_searchid) { // shuffle by pv + VLOG(0) << "round id=" << pass_id_ << ", use shuffle by search id"; + } else if (merge_by_insid_) { // shuffle by lineid + VLOG(0) << "round id=" << pass_id_ << ", use shuffle by line id"; + } else { // shuffle + VLOG(0) << "round id=" << pass_id_ << ", use shuffle by random id"; + } + VLOG(0) << "round id=" << pass_id_ + << ", shuffle disable: " << disable_shuffle_ + << ", polling disable: " << disable_polling_; + // read ins thread + down_pool_ = GetDownPool(thread_num_); + CHECK(down_pool_ != nullptr); + // dump thread + dump_pool_ = GetDumpPool(merge_thread_num_ * 2); + CHECK(dump_pool_ != nullptr); + // shuffle thread + if (mpi_size_ > 1) { + shuffle_pool_ = GetShufflePool(shuffle_thread_num_ * 2); + CHECK(shuffle_pool_ != nullptr); + } + + std::vector& cores = boxps::get_readins_cores(); + if (cores.empty()) { + return; + } + down_pool_->SetCPUAffinity(cores, false); + dump_pool_->SetCPUAffinity(cores, false); + if (shuffle_pool_ != nullptr) { + shuffle_pool_->SetCPUAffinity(cores, false); + } +} +void PadBoxSlotDataset::PreLoadIntoDisk(const std::string& path, + const int file_num) { + pass_id_ = BoxWrapper::GetInstance()->GetRoundId(); + + CheckDownThreadPool(); + binary_files_.resize(file_num); + + total_ins_num_ = 0; + + char szpath[1024] = {0}; + for (int k = 0; k < file_num; ++k) { + binary_files_[k] = std::make_shared(); + snprintf(szpath, sizeof(szpath), "%s/%d", path.c_str(), k); + CHECK(binary_files_[k]->open(szpath)) << "open failed, path: " << szpath; + } + // dualbox global data shuffle + if (!disable_shuffle_ && mpi_size_ > 1) { + finished_counter_ = mpi_size_; + mpi_flags_.assign(mpi_size_, 1); + VLOG(3) << "RegisterClientToClientMsgHandler"; + data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); + VLOG(3) << "RegisterClientToClientMsgHandler done"; + } + CHECK(slot_pool_ != nullptr) << "slotrecord pool nullptr"; + read_ins_ref_ = thread_num_; + CHECK(down_pool_ != nullptr) << "down_pool nullptr"; + for (int64_t i = 0; i < thread_num_; ++i) { + wait_futures_.emplace_back(down_pool_->Run([this, i]() { + platform::Timer timer; + timer.Start(); + CHECK(readers_[i] != nullptr) << "reader index=" << i << " nullptr"; + reinterpret_cast(readers_[i].get()) + ->SetSlotRecordPool(slot_pool_); + readers_[i]->LoadIntoMemory(); + timer.Pause(); + double span = timer.ElapsedSec(); + if (max_read_ins_span_ < span) { + max_read_ins_span_ = span; + } + if (min_read_ins_span_ == 0 || min_read_ins_span_ > span) { + min_read_ins_span_ = span; + } + if (--read_ins_ref_ == 0) { + input_channel_->Close(); + other_timer_.Start(); + VLOG(0) << "round = " << pass_id_ + << ", read ins thread end, max:" << max_read_ins_span_ + << ", min:" << min_read_ins_span_; + } + })); + } + + // dualbox global data shuffle + if (!disable_shuffle_ && mpi_size_ > 1) { + ShuffleData(shuffle_thread_num_); + DumpIntoDisk(shuffle_channel_, path, file_num); + } else { + DumpIntoDisk(input_channel_, path, file_num); + } +} +void PadBoxSlotDataset::WaitLoadDiskDone(void) { + for (auto& f : wait_futures_) { + f.get(); + } + // close open file fd + for (size_t i = 0; i < binary_files_.size(); ++i) { + binary_files_[i]->close(); + } + binary_files_.clear(); + + if (data_consumer_ != nullptr) { + delete reinterpret_cast(data_consumer_); + data_consumer_ = nullptr; + } + VLOG(0) << "round = " << pass_id_ + << ", PadBoxSlotDataset::WaitLoadDiskDone() end" + << ", load data size=" << total_ins_num_ + << ", cost time=" << max_read_ins_span_ << " seconds"; +} +void PadBoxSlotDataset::DumpIntoDisk(const Channel& in, + const std::string& path, + const int file_num) { + CHECK(in != nullptr) << "dump input channel nullptr"; + merge_ins_ref_ = merge_thread_num_; + min_merge_ins_span_ = 1000; + CHECK(dump_pool_ != nullptr) << "dump_pool nullptr"; + for (int tid = 0; tid < merge_thread_num_; ++tid) { + wait_futures_.emplace_back(dump_pool_->Run([this, &in, tid, file_num]() { + // VLOG(0) << "merge thread id: " << tid << "start"; + platform::Timer timer; + size_t num = 0; + int fileid = 0; + std::vector datas; + while ((num = in->ReadOnce(datas, OBJPOOL_BLOCK_SIZE)) > 0) { + timer.Resume(); + for (auto& rec : datas) { + if (enable_pv_merge_ || + FLAGS_enable_shuffle_by_searchid) { // shuffle by pv + fileid = (rec->search_id / mpi_size_) % file_num; + } else if (merge_by_insid_) { // shuffle by lineid + fileid = (XXH64(rec->ins_id_.data(), rec->ins_id_.length(), 0) / + mpi_size_) % + file_num; + } else { // shuffle + fileid = (BoxWrapper::LocalRandomEngine()() / mpi_size_) % file_num; + } + // save to file + CHECK(binary_files_[fileid]->write(rec)); + } + total_ins_num_ += num; + // free allobject + slot_pool_->put(&datas); + datas.clear(); + timer.Pause(); + } + datas.shrink_to_fit(); + + double span = timer.ElapsedSec(); + if (max_merge_ins_span_ < span) { + max_merge_ins_span_ = span; + } + if (min_merge_ins_span_ > span) { + min_merge_ins_span_ = span; + } + // end merge thread + if (--merge_ins_ref_ == 0) { + other_timer_.Pause(); + VLOG(0) << "round = " << pass_id_ << ", dump thread id: " << tid + << ", span time: " << span << ", max:" << max_merge_ins_span_ + << ", min:" << min_merge_ins_span_; + } + })); + } +} // pre load void PadBoxSlotDataset::PreLoadIntoMemory() { + pass_id_ = BoxWrapper::GetInstance()->GetDataSetId(); CheckThreadPool(); LoadIndexIntoMemory(); // dualbox global data shuffle - if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + if (!disable_shuffle_ && mpi_size_ > 1) { finished_counter_ = mpi_size_; mpi_flags_.assign(mpi_size_, 1); VLOG(3) << "RegisterClientToClientMsgHandler"; @@ -1549,7 +1743,7 @@ void PadBoxSlotDataset::PreLoadIntoMemory() { } // dualbox global data shuffle - if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + if (!disable_shuffle_ && mpi_size_ > 1) { ShuffleData(shuffle_thread_num_); MergeInsKeys(shuffle_channel_); } else { @@ -1575,13 +1769,14 @@ void PadBoxSlotDataset::WaitPreLoadDone() { // load all data into memory void PadBoxSlotDataset::LoadIntoMemory() { VLOG(3) << "DatasetImpl::LoadIntoMemory() begin"; + pass_id_ = BoxWrapper::GetInstance()->GetDataSetId(); CheckThreadPool(); LoadIndexIntoMemory(); platform::Timer timeline; timeline.Start(); // dualbox global data shuffle - if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + if (!disable_shuffle_ && mpi_size_ > 1) { finished_counter_ = mpi_size_; mpi_flags_.assign(mpi_size_, 1); VLOG(3) << "RegisterClientToClientMsgHandler"; @@ -1600,7 +1795,7 @@ void PadBoxSlotDataset::LoadIntoMemory() { } // dualbox global data shuffle - if (!FLAGS_padbox_dataset_disable_shuffle && mpi_size_ > 1) { + if (!disable_shuffle_ && mpi_size_ > 1) { ShuffleData(shuffle_thread_num_); MergeInsKeys(shuffle_channel_); } else { @@ -1636,6 +1831,7 @@ void PadBoxSlotDataset::MergeInsKeys(const Channel& in) { platform::Timer timer; auto feed_obj = reinterpret_cast(readers_[0].get()); + CHECK(feed_obj != nullptr && in != nullptr); size_t num = 0; std::vector datas; while (in->ReadOnce(datas, OBJPOOL_BLOCK_SIZE)) { @@ -1700,7 +1896,7 @@ void PadBoxSlotDataset::ReleaseMemory() { readers_.clear(); readers_.shrink_to_fit(); - SlotRecordPool().put(&input_records_); + slot_pool_->put(&input_records_); input_records_.clear(); input_records_.shrink_to_fit(); @@ -1714,7 +1910,7 @@ void PadBoxSlotDataset::ReleaseMemory() { timeline.Pause(); VLOG(1) << "DatasetImpl::ReleaseMemory() end, cost time=" << timeline.ElapsedSec() - << " seconds, object pool size=" << SlotRecordPool().capacity(); + << " seconds, object pool size=" << slot_pool_->capacity(); } class ShuffleResultWaitGroup : public boxps::ResultCallback { public: @@ -1789,7 +1985,7 @@ void PadBoxSlotDataset::ShuffleData(int thread_num) { ars[client_id] << t; releases.push_back(t); } - SlotRecordPool().put(&releases); + slot_pool_->put(&releases); releases.clear(); size_t loc_len = loc_datas.size(); CHECK(shuffle_channel_->Write(std::move(loc_datas)) == loc_len); @@ -1898,7 +2094,7 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf, static const int max_fetch_num = OBJPOOL_BLOCK_SIZE / mpi_size_; int offset = 0; std::vector data; - SlotRecordPool().get(&data, max_fetch_num); + slot_pool_->get(&data, max_fetch_num); while (ar.Cursor() < ar.Finish()) { ar >> data[offset++]; if (offset >= max_fetch_num) { @@ -1906,7 +2102,7 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf, static_cast(offset)); data.clear(); offset = 0; - SlotRecordPool().get(&data, max_fetch_num); + slot_pool_->get(&data, max_fetch_num); } } CHECK(ar.Cursor() == ar.Finish()); @@ -1914,10 +2110,10 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf, CHECK(shuffle_channel_->WriteMove(offset, &data[0]) == static_cast(offset)); if (offset < max_fetch_num) { - SlotRecordPool().put(&data[offset], (max_fetch_num - offset)); + slot_pool_->put(&data[offset], (max_fetch_num - offset)); } } else { - SlotRecordPool().put(&data); + slot_pool_->put(&data); } data.clear(); @@ -1955,6 +2151,8 @@ void PadBoxSlotDataset::CreateReaders() { if (input_channel_ != nullptr) { readers_[i]->SetInputChannel(input_channel_.get()); } + // disk archive file + readers_[i]->SetLoadArchiveFile(is_archive_file_); } VLOG(3) << "readers size: " << readers_.size(); } diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 7dcd39f81f58a..74a54fb9fe247 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -30,6 +30,8 @@ DECLARE_int32(padbox_dataset_shuffle_thread_num); DECLARE_int32(padbox_dataset_merge_thread_num); DECLARE_int32(padbox_max_shuffle_wait_count); DECLARE_bool(enable_shuffle_by_searchid); +DECLARE_bool(padbox_dataset_disable_shuffle); +DECLARE_bool(padbox_dataset_disable_polling); namespace boxps { class PSAgentBase; } @@ -155,6 +157,13 @@ class Dataset { // set fleet send sleep seconds virtual void SetFleetSendSleepSeconds(int seconds) = 0; + // down load to disk mode + virtual void SetDiablePolling(bool disable) = 0; + virtual void SetDisableShuffle(bool disable) = 0; + virtual void PreLoadIntoDisk(const std::string& path, const int file_num) = 0; + virtual void WaitLoadDiskDone(void) = 0; + virtual void SetLoadArchiveFile(bool archive) = 0; + protected: virtual int ReceiveFromClient(int msg_type, int client_id, const std::string& msg) = 0; @@ -244,6 +253,13 @@ class DatasetImpl : public Dataset { virtual void SetFleetSendSleepSeconds(int seconds); virtual std::vector& GetInputRecord() { return input_records_; } + // disable shuffle + virtual void SetDiablePolling(bool disable) {} + virtual void SetDisableShuffle(bool disable) {} + virtual void PreLoadIntoDisk(const std::string& path, const int file_num) {} + virtual void WaitLoadDiskDone(void) {} + virtual void SetLoadArchiveFile(bool archive) {} + protected: virtual int ReceiveFromClient(int msg_type, int client_id, const std::string& msg); @@ -358,7 +374,12 @@ class PadBoxSlotDataset : public DatasetImpl { virtual void PostprocessInstance(); // prepare train do something virtual void PrepareTrain(void); - virtual int64_t GetMemoryDataSize() { return input_records_.size(); } + virtual int64_t GetMemoryDataSize() { + if (input_records_.empty()) { + return total_ins_num_; + } + return input_records_.size(); + } virtual int64_t GetPvDataSize() { return input_pv_ins_.size(); } virtual int64_t GetShuffleDataSize() { return input_records_.size(); } // merge ins from multiple sources and unroll @@ -370,6 +391,12 @@ class PadBoxSlotDataset : public DatasetImpl { virtual void PreLoadIntoMemory(); virtual void WaitPreLoadDone(); + virtual void SetDiablePolling(bool disable) { disable_polling_ = disable; } + virtual void SetDisableShuffle(bool disable) { disable_shuffle_ = disable; } + virtual void PreLoadIntoDisk(const std::string& path, const int file_num); + virtual void WaitLoadDiskDone(void); + virtual void SetLoadArchiveFile(bool archive) { is_archive_file_ = archive; } + protected: // shuffle data virtual void ShuffleData(int thread_num = -1); @@ -403,6 +430,9 @@ class PadBoxSlotDataset : public DatasetImpl { protected: void MergeInsKeys(const Channel& in); void CheckThreadPool(void); + void CheckDownThreadPool(void); + void DumpIntoDisk(const Channel& in, const std::string& path, + const int pass_num); protected: Channel shuffle_channel_ = nullptr; @@ -433,6 +463,14 @@ class PadBoxSlotDataset : public DatasetImpl { uint16_t pass_id_ = 0; double max_shuffle_span_ = 0; double min_shuffle_span_ = 0; + bool disable_shuffle_ = FLAGS_padbox_dataset_disable_shuffle; + bool disable_polling_ = FLAGS_padbox_dataset_disable_polling; + std::vector> binary_files_; + bool is_archive_file_ = false; + std::atomic total_ins_num_{0}; + paddle::framework::ThreadPool* down_pool_ = nullptr; + paddle::framework::ThreadPool* dump_pool_ = nullptr; + SlotObjPool* slot_pool_ = nullptr; }; class InputTableDataset : public PadBoxSlotDataset { diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index 320ffc9d87c05..eccf16fa506b1 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -522,7 +522,8 @@ class BoxWrapper { } else if (s_instance_->feature_type_ == static_cast(boxps::FEATURE_PCOC)) { s_instance_->cvm_offset_ = 8; - } else if (s_instance_->feature_type_ == static_cast(boxps::FEATURE_CONV)) { + } else if (s_instance_->feature_type_ == + static_cast(boxps::FEATURE_CONV)) { s_instance_->cvm_offset_ = 4; } else { s_instance_->cvm_offset_ = 3; @@ -594,6 +595,7 @@ class BoxWrapper { boxps::PaddleFileMgr* GetFileMgr(void) { return file_manager_.get(); } // get dataset id uint16_t GetDataSetId(void) { return dataset_id_.fetch_add(1); } + uint16_t GetRoundId(void) { return round_id_.fetch_add(1); } // this performs better than rand_r, especially large data static std::default_random_engine& LocalRandomEngine() { @@ -771,6 +773,7 @@ class BoxWrapper { std::mutex mutex4random_pool_; std::set slot_eval_set_; std::atomic dataset_id_{0}; + std::atomic round_id_{0}; }; /** * @brief file mgr diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 458acaa28299f..1e53b44742c0b 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -77,13 +77,15 @@ class ThreadPool { try { fn(); } catch (platform::EnforceNotMet& ex) { - CHECK(false) << "Unexpected exception is catched in thread pool: " - << ex.what(); + // CHECK(false) << "Unexpected exception is catched in thread + // pool: " + // << ex.what(); return std::unique_ptr( new platform::EnforceNotMet(ex)); } catch (const std::exception& e) { - CHECK(false) << "Unexpected exception is catched in thread pool: " - << e.what(); + // CHECK(false) << "Unexpected exception is catched in thread + // pool: " + // << e.what(); PADDLE_THROW(platform::errors::Fatal( "Unexpected exception is catched in thread pool. All " "throwable exception in Paddle should be an EnforceNotMet." diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 002a65a0e8807..5dca4948dcd4c 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -279,7 +279,6 @@ void BindDataset(py::module *m) { py::call_guard()) .def("set_enable_pv_merge", &framework::Dataset::SetEnablePvMerge, py::call_guard()) - .def("set_merge_by_lineid", &framework::Dataset::SetMergeByInsId, py::call_guard()) .def("merge_by_lineid", &framework::Dataset::MergeByInsId, @@ -311,6 +310,16 @@ void BindDataset(py::module *m) { &framework::Dataset::SetFleetSendSleepSeconds, py::call_guard()) .def("enable_pv_merge", &framework::Dataset::EnablePvMerge, + py::call_guard()) + .def("disable_polling", &framework::Dataset::SetDiablePolling, + py::call_guard()) + .def("disable_shuffle", &framework::Dataset::SetDisableShuffle, + py::call_guard()) + .def("preload_into_disk", &framework::Dataset::PreLoadIntoDisk, + py::call_guard()) + .def("wait_load_disk_done", &framework::Dataset::WaitLoadDiskDone, + py::call_guard()) + .def("set_archivefile", &framework::Dataset::SetLoadArchiveFile, py::call_guard()); py::class_(*m, "IterableDatasetWrapper") diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index e88f844bcacdb..0878746db3eb6 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -1261,6 +1261,44 @@ def load_into_memory(self): self._prepare_to_run() self.boxps.read_ins_into_memory() + def disable_polling(self, disable=False): + """ + disable file polling + """ + self.dataset.disable_polling(disable) + + def disable_shuffle(self, disable=False): + """ + disable data shuffle + """ + self.dataset.disable_shuffle(disable) + + def preload_into_disk(self, path, file_num): + """ + prepare load data to disk + """ + self._prepare_to_run() + self.dataset.preload_into_disk(path, file_num) + + def wait_load_disk_done(self): + """ + wait disk file load done + """ + self.dataset.wait_load_disk_done() + + def load_into_disk(self, path, file_num): + """ + sync load ins to disk + """ + self.preload_into_disk(path, file_num) + self.wait_load_disk_done() + + def set_archivefile(self, archive=False): + """ + is load archive file + """ + self.dataset.set_archivefile(archive) + class InputTableDataset(PadBoxSlotDataset): def __init__(self): From 69c0ba3ab9dbc4c5893fbb8028155f4292b1ebce Mon Sep 17 00:00:00 2001 From: qshuihu Date: Wed, 12 Jan 2022 15:43:40 +0800 Subject: [PATCH 2/6] used default sample_pool and pull push dedup --- paddle/fluid/platform/flags.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 096288bec536e..6279e74dd2408 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -311,7 +311,7 @@ DEFINE_double(memory_fraction_of_eager_deletion, 1.0, #ifdef PADDLE_ON_INFERENCE static constexpr char kDefaultAllocatorStrategy[] = "naive_best_fit"; #else -static constexpr char kDefaultAllocatorStrategy[] = "auto_growth"; +static constexpr char kDefaultAllocatorStrategy[] = "sample_pool"; #endif DEFINE_string( allocator_strategy, kDefaultAllocatorStrategy, @@ -600,7 +600,7 @@ DEFINE_bool(enable_slotrecord_reset_shrink, false, "enable slotrecord obejct reset shrink memory, default false"); DEFINE_bool(enable_slotpool_wait_release, false, "enable slotrecord obejct wait release, default false"); -DEFINE_bool(enable_pullpush_dedup_keys, false, +DEFINE_bool(enable_pullpush_dedup_keys, true, "enable pull push dedup keys, default false"); DEFINE_bool(enable_shuffle_by_searchid, false, "enable dualbox shuffle by searchid, default false"); From d068c4764c1fc795b36d9a3d813e45be907a3ed5 Mon Sep 17 00:00:00 2001 From: qshuihu Date: Fri, 21 Jan 2022 17:16:07 +0800 Subject: [PATCH 3/6] fix add pool cache --- paddle/fluid/framework/data_feed.cc | 28 +++++----- paddle/fluid/framework/data_feed.h | 27 ++++------ .../fluid/framework/details/nan_inf_utils.h | 2 + .../framework/details/nan_inf_utils_detail.cc | 32 ++++++++++++ .../framework/details/nan_inf_utils_detail.cu | 8 +-- paddle/fluid/framework/fleet/box_wrapper.cc | 52 ++++++++++--------- paddle/fluid/framework/operator.cc | 46 +++++++++------- paddle/fluid/framework/threadpool.h | 10 ++-- 8 files changed, 122 insertions(+), 83 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 1b17e1db3a092..441a84d1df975 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -2569,18 +2569,17 @@ void SlotPaddleBoxDataFeed::BuildSlotBatchGPU(const int ins_num) { int offset_cols_size = (ins_num + 1); size_t slot_total_num = (use_slot_size_ * offset_cols_size); - pack_->resize_gpu_slot_offsets(slot_total_num * sizeof(size_t)); + pack_->resize_gpu_slot_offsets(slot_total_num); auto& value = pack_->value(); const UsedSlotGpuType* used_slot_gpu_types = static_cast(pack_->get_gpu_slots()); - FillSlotValueOffset(ins_num, use_slot_size_, - reinterpret_cast(pack_->gpu_slot_offsets()), + FillSlotValueOffset(ins_num, use_slot_size_, pack_->gpu_slot_offsets(), value.d_uint64_offset.data(), uint64_use_slot_size_, value.d_float_offset.data(), float_use_slot_size_, used_slot_gpu_types); fill_timer_.Pause(); - size_t* d_slot_offsets = reinterpret_cast(pack_->gpu_slot_offsets()); + size_t* d_slot_offsets = pack_->gpu_slot_offsets(); offset_timer_.Resume(); HostBuffer& offsets = pack_->offsets(); @@ -2626,25 +2625,30 @@ void SlotPaddleBoxDataFeed::BuildSlotBatchGPU(const int ins_num) { feed->ShareDataWith(float_tensor.Slice( static_cast(float_offset), static_cast(float_offset + total_instance))); - feed->Resize({total_instance, 1}); float_offset += total_instance; - h_tensor_ptrs[j] = feed->mutable_data(this->place_); } else { - h_tensor_ptrs[j] = - feed->mutable_data({total_instance, 1}, this->place_); + feed->ShareDataWith( + float_tensor.Slice(static_cast(float_offset), + static_cast(float_offset + 1))); + float_offset += 1; } + feed->Resize({total_instance, 1}); + h_tensor_ptrs[j] = feed->data(); } else if (info.type[0] == 'u') { // uint64 if (total_instance > 0) { feed->ShareDataWith(uint64_tensor.Slice( static_cast(uint64_offset), static_cast(uint64_offset + total_instance))); - feed->Resize({total_instance, 1}); uint64_offset += total_instance; - h_tensor_ptrs[j] = feed->mutable_data(this->place_); } else { - h_tensor_ptrs[j] = - feed->mutable_data({total_instance, 1}, this->place_); + feed->ShareDataWith( + uint64_tensor.Slice(static_cast(uint64_offset), + static_cast(uint64_offset + 1))); + uint64_offset += 1; } + feed->Resize({total_instance, 1}); + // h_tensor_ptrs[j] = feed->mutable_data(this->place_); + h_tensor_ptrs[j] = feed->data(); } if (info.dense) { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 7c07d935ef950..47f794e92de98 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -1448,13 +1448,13 @@ class MiniBatchGpuPack { // tensor gpu memory reused void resize_tensor(void) { if (used_float_num_ > 0) { - int float_total_len = buf_.h_float_lens.back(); + int float_total_len = buf_.h_float_lens.back() + used_float_num_; if (float_total_len > 0) { float_tensor_.mutable_data({float_total_len, 1}, this->place_); } } if (used_uint64_num_ > 0) { - int uint64_total_len = buf_.h_uint64_lens.back(); + int uint64_total_len = buf_.h_uint64_lens.back() + used_uint64_num_; if (uint64_total_len > 0) { uint64_tensor_.mutable_data({uint64_total_len, 1}, this->place_); @@ -1469,18 +1469,14 @@ class MiniBatchGpuPack { HostBuffer& offsets(void) { return offsets_; } HostBuffer& h_tensor_ptrs(void) { return h_tensor_ptrs_; } - void* gpu_slot_offsets(void) { return gpu_slot_offsets_->ptr(); } + size_t* gpu_slot_offsets(void) { + return reinterpret_cast(gpu_slot_offsets_.data()); + } void* slot_buf_ptr(void) { return slot_buf_ptr_->ptr(); } - void resize_gpu_slot_offsets(const size_t slot_total_bytes) { - if (gpu_slot_offsets_ == nullptr) { - gpu_slot_offsets_ = memory::AllocShared(place_, slot_total_bytes); - } else if (gpu_slot_offsets_->size() < slot_total_bytes) { - auto buf = memory::AllocShared(place_, slot_total_bytes); - gpu_slot_offsets_.swap(buf); - buf = nullptr; - } + void resize_gpu_slot_offsets(const int64_t slot_total_num) { + gpu_slot_offsets_.mutable_data({slot_total_num, 1}, this->place_); } const std::string& get_lineid(int idx) { if (enable_pv_) { @@ -1542,9 +1538,8 @@ class MiniBatchGpuPack { // batch HostBuffer offsets_; HostBuffer h_tensor_ptrs_; - - std::shared_ptr gpu_slot_offsets_ = - nullptr; + // slot offset + LoDTensor gpu_slot_offsets_; std::shared_ptr slot_buf_ptr_ = nullptr; // pcoc @@ -1816,7 +1811,7 @@ class InputIndexDataFeed : public DataFeed { template paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, const SlotValues& r) { - uint16_t value_len = static_cast(r.slot_values.size()); + uint32_t value_len = static_cast(r.slot_values.size()); ar << value_len; if (value_len > 0) { ar.Write(&r.slot_values[0], value_len * sizeof(T)); @@ -1833,7 +1828,7 @@ paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, template paddle::framework::Archive& operator>>(paddle::framework::Archive& ar, SlotValues& r) { - uint16_t value_len = 0; + uint32_t value_len = 0; ar >> value_len; if (value_len > 0) { r.slot_values.resize(value_len); diff --git a/paddle/fluid/framework/details/nan_inf_utils.h b/paddle/fluid/framework/details/nan_inf_utils.h index 99fd147f11edd..0e9313685a5ef 100644 --- a/paddle/fluid/framework/details/nan_inf_utils.h +++ b/paddle/fluid/framework/details/nan_inf_utils.h @@ -36,6 +36,8 @@ void CheckOpHasNanOrInf(const framework::OperatorBase& op, bool CheckOpHasNanOrInfRet(const framework::OperatorBase& op, const framework::Scope& scope, const platform::Place& place); +void DumpTensorToFile(const std::string& path, const std::string& prefix, + const std::string& iname, const Scope& exec_scope); } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/nan_inf_utils_detail.cc b/paddle/fluid/framework/details/nan_inf_utils_detail.cc index dc5a89318a8f6..9eaa2699c8959 100644 --- a/paddle/fluid/framework/details/nan_inf_utils_detail.cc +++ b/paddle/fluid/framework/details/nan_inf_utils_detail.cc @@ -453,6 +453,38 @@ bool CheckOpHasNanOrInfRet(const framework::OperatorBase& op, return false; } +void DumpTensorToFile(const std::string& path, const std::string& prefix, + const std::string& iname, const Scope& exec_scope) { + auto* var = exec_scope.FindVar(iname); + if (var == nullptr) { + return; + } + if (!var->IsInitialized()) { + return; + } + auto& tensor = var->Get(); + if (!tensor.IsInitialized()) { + return; + } + + std::ostringstream os; + if (var->IsType()) { + os << var->Get(); + } else if (var->IsType()) { + os << var->Get().value(); + } + os << "\n"; + std::string s = os.str(); + + char file_name[2048] = {0}; + snprintf(file_name, sizeof(file_name), "%s/%s_%s", path.c_str(), + prefix.c_str(), iname.c_str()); + + std::ofstream out(file_name, std::ios::binary); + out.write(s.c_str(), s.length()); + out.close(); +} + } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/nan_inf_utils_detail.cu b/paddle/fluid/framework/details/nan_inf_utils_detail.cu index a474e4dd3ab14..c31e17fbcbfef 100644 --- a/paddle/fluid/framework/details/nan_inf_utils_detail.cu +++ b/paddle/fluid/framework/details/nan_inf_utils_detail.cu @@ -205,10 +205,10 @@ __global__ void CountNanInfNumKernel(const size_t len, const T* val, count = atomicAdd(&block_inf, 1); } // for cuda, print in every block - if (count > 0) { - printf("numel:%lu idx:%lu value:%f\n", static_cast(len), - static_cast(i), static_cast(val[i])); - } + // if (count > 0) { + // printf("numel:%lu idx:%lu value:%f\n", static_cast(len), + // static_cast(i), static_cast(val[i])); + // } } __syncthreads(); diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index db7c4b5f7ca42..a81f81927f5b2 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -427,8 +427,8 @@ void BoxWrapper::PullSparse(const paddle::platform::Place& place, feature_type_ == static_cast(boxps::FEATURE_SHOWCLK)) { \ PullSparseCase>( \ place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_CONV)) { \ - PullSparseCase>( \ + } else if (feature_type_ == static_cast(boxps::FEATURE_CONV)) { \ + PullSparseCase>( \ place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \ } else if (feature_type_ == static_cast(boxps::FEATURE_VARIABLE)) { \ PullSparseCase>( \ @@ -481,33 +481,33 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place, } \ } break -#define PUSHSPARSE_CASE(i, ...) \ - case i: { \ - constexpr size_t ExpandDim = i; \ - if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ - PushSparseGradCase< \ - boxps::FeaturePushValueGpuShareEmbedding>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ - PushSparseGradCase< \ - boxps::FeaturePushValueGpuPCOC>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ +#define PUSHSPARSE_CASE(i, ...) \ + case i: { \ + constexpr size_t ExpandDim = i; \ + if (feature_type_ == static_cast(boxps::FEATURE_SHARE_EMBEDDING)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuShareEmbedding>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } else if (feature_type_ == static_cast(boxps::FEATURE_PCOC)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuPCOC>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ } else if (feature_type_ == static_cast(boxps::FEATURE_VARIABLE)) { \ PushSparseGradCase>( \ place, keys, grad_values, slot_lengths, hidden_size, \ expand_embed_dim, batch_size); \ - } else if (feature_type_ == static_cast(boxps::FEATURE_CONV)) { \ - PushSparseGradCase< \ - boxps::FeaturePushValueGpuConv>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } else { \ - PushSparseGradCase>( \ - place, keys, grad_values, slot_lengths, hidden_size, \ - expand_embed_dim, batch_size); \ - } \ + } else if (feature_type_ == static_cast(boxps::FEATURE_CONV)) { \ + PushSparseGradCase< \ + boxps::FeaturePushValueGpuConv>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } else { \ + PushSparseGradCase>( \ + place, keys, grad_values, slot_lengths, hidden_size, \ + expand_embed_dim, batch_size); \ + } \ } break CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim); @@ -580,6 +580,8 @@ void BoxWrapper::FeedPass(int date, void BoxWrapper::BeginFeedPass(int date, boxps::PSAgentBase** agent) { if (FLAGS_enable_force_mem_recyle) { SlotRecordPool().disable_pool(true); + } else { + SlotRecordPool().disable_pool(boxps_ptr_->CheckNeedLimitMem()); } int ret = boxps_ptr_->BeginFeedPass(date, *agent); if (FLAGS_use_gpu_replica_cache) { diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index cc47fdbecfee7..5c246ee8a1498 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -1149,30 +1149,36 @@ void OperatorWithKernel::RunImpl(const Scope& scope, if (FLAGS_check_nan_inf) { // framework::details::CheckOpHasNanOrInf(*this, exec_scope, place); if (framework::details::CheckOpHasNanOrInfRet(*this, exec_scope, place)) { - std::ostringstream os; + int device_id = 0; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + device_id = boost::get(place).GetDeviceId(); +#endif + VLOG(0) << "begin dump scope all tensor data"; + std::string log_path = "./nan_inf"; + if (!PathExists(log_path)) { + MkDirRecursively(log_path.c_str()); + } + + // dump scope all data + char prefix[128] = {0}; + snprintf(prefix, sizeof(prefix), "gpu%d", device_id); + for (auto& iname : exec_scope.LocalVarNames()) { + framework::details::DumpTensorToFile(log_path, prefix, iname, + exec_scope); + } + VLOG(0) << "end dump scope all tensor data"; + + // dump current op data for (auto& iname : InputVars()) { - auto* var = exec_scope.FindVar(iname); - if (var == nullptr) continue; - os << "input name:" << iname << ", "; - if (var->IsType()) { - os << var->Get(); - } else if (var->IsType()) { - os << var->Get().value(); - } - os << "\n"; + snprintf(prefix, sizeof(prefix), "gpu%d_input", device_id); + framework::details::DumpTensorToFile(log_path, prefix, iname, + exec_scope); } for (auto& iname : OutputVars(true)) { - auto* var = exec_scope.FindVar(iname); - if (var == nullptr) continue; - os << "output name:" << iname << ", "; - if (var->IsType()) { - os << var->Get(); - } else if (var->IsType()) { - os << var->Get().value(); - } - os << "\n"; + snprintf(prefix, sizeof(prefix), "gpu%d_output", device_id); + framework::details::DumpTensorToFile(log_path, prefix, iname, + exec_scope); } - printf("%s", os.str().c_str()); PADDLE_ENFORCE(false, "ERROR: check INF and NAN: %s", DebugStringEx(&exec_scope).c_str()); } diff --git a/paddle/fluid/framework/threadpool.h b/paddle/fluid/framework/threadpool.h index 1e53b44742c0b..ec4c8fadd5411 100644 --- a/paddle/fluid/framework/threadpool.h +++ b/paddle/fluid/framework/threadpool.h @@ -77,15 +77,13 @@ class ThreadPool { try { fn(); } catch (platform::EnforceNotMet& ex) { - // CHECK(false) << "Unexpected exception is catched in thread - // pool: " - // << ex.what(); + CHECK(false) << "Unexpected exception is catched in threal pool: " + << ex.what(); return std::unique_ptr( new platform::EnforceNotMet(ex)); } catch (const std::exception& e) { - // CHECK(false) << "Unexpected exception is catched in thread - // pool: " - // << e.what(); + CHECK(false) << "Unexpected exception is catched in thread pool: " + << e.what(); PADDLE_THROW(platform::errors::Fatal( "Unexpected exception is catched in thread pool. All " "throwable exception in Paddle should be an EnforceNotMet." From 50b0ab4cde33187b29a2294be65970789e139f5f Mon Sep 17 00:00:00 2001 From: qshuihu Date: Fri, 21 Jan 2022 17:16:07 +0800 Subject: [PATCH 4/6] fix add pool cache --- paddle/fluid/framework/fleet/box_wrapper.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index a81f81927f5b2..2c75dda0dff37 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -1280,9 +1280,8 @@ void BoxWrapper::ReleasePool(void) { timer.Start(); size_t capacity = SlotRecordPool().capacity(); SlotRecordPool().clear(); + SlotRecordPool().disable_pool(false); timer.Pause(); - STAT_RESET(STAT_total_feasign_num_in_mem, 0); - STAT_RESET(STAT_slot_pool_size, 0); LOG(WARNING) << "ReleasePool Size=" << capacity << ", Time=" << timer.ElapsedSec() << "sec"; } From 2f1809ff02a7fd8a2cb68031330481b16a9d4352 Mon Sep 17 00:00:00 2001 From: qshuihu Date: Mon, 24 Jan 2022 16:26:43 +0800 Subject: [PATCH 5/6] add slotpool disable gflag, fix e2eq after save model train slow --- paddle/fluid/framework/data_set.cc | 15 +++++++++++++-- paddle/fluid/framework/fleet/box_wrapper.cc | 9 ++++++--- paddle/fluid/platform/flags.cc | 2 ++ python/paddle/fluid/__init__.py | 1 + 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 1a55896163b58..1234d941710ae 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -1758,13 +1758,19 @@ void PadBoxSlotDataset::WaitPreLoadDone() { delete reinterpret_cast(data_consumer_); data_consumer_ = nullptr; } + + platform::Timer timeline; + timeline.Start(); if (FLAGS_padbox_dataset_enable_unrollinstance) { UnrollInstance(); } + timeline.Pause(); + VLOG(0) << "passid = " << pass_id_ << ", PadBoxSlotDataset::WaitPreLoadDone() end" << ", memory data size=" << input_records_.size() - << ", cost time=" << max_read_ins_span_ << " seconds"; + << ", cost time=" << max_read_ins_span_ << " seconds" + << ", unroll time=" << timeline.ElapsedSec() << " seconds"; } // load all data into memory void PadBoxSlotDataset::LoadIntoMemory() { @@ -1810,6 +1816,10 @@ void PadBoxSlotDataset::LoadIntoMemory() { delete reinterpret_cast(data_consumer_); data_consumer_ = nullptr; } + timeline.Pause(); + double span_time = timeline.ElapsedSec(); + + timeline.Start(); if (FLAGS_padbox_dataset_enable_unrollinstance) { UnrollInstance(); } @@ -1817,7 +1827,8 @@ void PadBoxSlotDataset::LoadIntoMemory() { VLOG(1) << "PadBoxSlotDataset::LoadIntoMemory() end" << ", memory data size=" << input_records_.size() - << ", cost time=" << timeline.ElapsedSec() << " seconds"; + << ", cost time=" << span_time << " seconds" + << ", unroll time=" << timeline.ElapsedSec() << " seconds"; } // add fea keys void PadBoxSlotDataset::MergeInsKeys(const Channel& in) { diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index 2c75dda0dff37..229580b29df55 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -28,6 +28,7 @@ DECLARE_bool(use_gpu_replica_cache); DECLARE_int32(gpu_replica_cache_dim); DECLARE_bool(enable_force_hbm_recyle); DECLARE_bool(enable_force_mem_recyle); +DECLARE_bool(enbale_slotpool_auto_clear); namespace paddle { namespace framework { @@ -579,9 +580,10 @@ void BoxWrapper::FeedPass(int date, void BoxWrapper::BeginFeedPass(int date, boxps::PSAgentBase** agent) { if (FLAGS_enable_force_mem_recyle) { - SlotRecordPool().disable_pool(true); + SlotRecordPool().disable_pool(FLAGS_enbale_slotpool_auto_clear); } else { - SlotRecordPool().disable_pool(boxps_ptr_->CheckNeedLimitMem()); + SlotRecordPool().disable_pool(FLAGS_enbale_slotpool_auto_clear && + boxps_ptr_->CheckNeedLimitMem()); } int ret = boxps_ptr_->BeginFeedPass(date, *agent); if (FLAGS_use_gpu_replica_cache) { @@ -619,7 +621,8 @@ void BoxWrapper::BeginPass() { PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( "BeginPass failed in BoxPS.")); // auto disable or enable slotrecord pool recyle memory - SlotRecordPool().disable_pool(boxps_ptr_->CheckNeedLimitMem()); + SlotRecordPool().disable_pool(FLAGS_enbale_slotpool_auto_clear && + boxps_ptr_->CheckNeedLimitMem()); } void BoxWrapper::SetTestMode(bool is_test) const { diff --git a/paddle/fluid/platform/flags.cc b/paddle/fluid/platform/flags.cc index 6279e74dd2408..d7c090371d3a2 100644 --- a/paddle/fluid/platform/flags.cc +++ b/paddle/fluid/platform/flags.cc @@ -606,3 +606,5 @@ DEFINE_bool(enable_shuffle_by_searchid, false, "enable dualbox shuffle by searchid, default false"); DEFINE_bool(enable_pull_box_padding_zero, true, "enable pull box padding zero, default true"); +DEFINE_bool(enbale_slotpool_auto_clear, false, + "slot pool enable auto clear, default false"); diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index b0d18e54139e3..d52a1f7580c6f 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -271,6 +271,7 @@ def __bootstrap__(): 'enable_pullpush_dedup_keys', 'enable_shuffle_by_searchid', 'enable_pull_box_padding_zero', + 'enbale_slotpool_auto_clear', ] core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)]) core.init_glog(sys.argv[0]) From a43c1ab6e5797695eb1b76cc02c2215c7ea77c6f Mon Sep 17 00:00:00 2001 From: qshuihu Date: Mon, 24 Jan 2022 16:45:36 +0800 Subject: [PATCH 6/6] rollback nan inf check print --- paddle/fluid/framework/operator.cc | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index 5c246ee8a1498..780697bb28adb 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -1170,14 +1170,30 @@ void OperatorWithKernel::RunImpl(const Scope& scope, // dump current op data for (auto& iname : InputVars()) { - snprintf(prefix, sizeof(prefix), "gpu%d_input", device_id); - framework::details::DumpTensorToFile(log_path, prefix, iname, - exec_scope); + auto* var = exec_scope.FindVar(iname); + if (var == nullptr) continue; + std::ostringstream os; + os << "input name:" << iname << ", "; + if (var->IsType()) { + os << var->Get(); + } else if (var->IsType()) { + os << var->Get().value(); + } + os << "\n"; + printf("%s", os.str().c_str()); } for (auto& iname : OutputVars(true)) { - snprintf(prefix, sizeof(prefix), "gpu%d_output", device_id); - framework::details::DumpTensorToFile(log_path, prefix, iname, - exec_scope); + auto* var = exec_scope.FindVar(iname); + if (var == nullptr) continue; + std::ostringstream os; + os << "output name:" << iname << ", "; + if (var->IsType()) { + os << var->Get(); + } else if (var->IsType()) { + os << var->Get().value(); + } + os << "\n"; + printf("%s", os.str().c_str()); } PADDLE_ENFORCE(false, "ERROR: check INF and NAN: %s", DebugStringEx(&exec_scope).c_str());