diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index f41147851c2df..cea2ee79ea336 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -3189,6 +3189,182 @@ bool SlotPaddleBoxDataFeedWithGpuReplicaCache::ParseOneInstance( return (uint64_total_slot_num > 0); } +void InputTableDataFeed::LoadIntoMemoryByLib() { + paddle::framework::ISlotParser* parser = + global_parser_pool().Get(parser_so_path_, all_slots_info_); + CHECK(parser != nullptr); + + boxps::PaddleDataReader* reader = nullptr; + if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) { + reader = + boxps::PaddleDataReader::New(BoxWrapper::GetInstance()->GetFileMgr()); + } + + std::string filename; + BufferedLineFileReader line_reader; + line_reader.set_sample_rate(sample_rate_); + + int from_pool_num = 0; + auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); + PADDLE_ENFORCE(!box_ptr->input_table_deque_.empty()); + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + std::vector record_vec; + platform::Timer timeline; + timeline.Start(); + const int max_fetch_num = 10000; + int offset = 0; + + SlotRecordPool().get(&record_vec, max_fetch_num); + from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + auto func = [this, &box_ptr, &parser, &record_vec, &offset, &max_fetch_num, + &from_pool_num, &filename](const std::string& line) { + int old_offset = offset; + auto GetOffsetFunc = [&box_ptr](std::string& key) -> uint64_t { + return box_ptr->input_table_deque_.back().GetIndexOffset(key); + }; + + if (!parser->ParseOneInstance( + line, GetOffsetFunc, + [this, &offset, &record_vec, &max_fetch_num, &old_offset]( + std::vector& vec, int num) { + vec.resize(num); + if (offset + num > max_fetch_num) { + // Considering the prob of show expanding is low, so we don't + // update STAT here + input_channel_->WriteMove(offset, &record_vec[0]); + SlotRecordPool().get(&record_vec[0], offset); + record_vec.resize(max_fetch_num); + offset = 0; + old_offset = 0; + } + for (int i = 0; i < num; ++i) { + auto& ins = record_vec[offset + i]; + ins->reset(); + vec[i] = ins; + } + offset = offset + num; + })) { + offset = old_offset; + LOG(WARNING) << "read file:[" << filename << "] item error, line:[" + << line << "]"; + } + if (offset >= max_fetch_num) { + input_channel_->Write(std::move(record_vec)); + STAT_ADD(STAT_total_feasign_num_in_mem, + GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); + record_vec.clear(); + SlotRecordPool().get(&record_vec, max_fetch_num); + from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num); + offset = 0; + } + }; + int lines = 0; + if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) { + while (reader->open(filename) < 0) { + sleep(1); + } + lines = line_reader.read_api(reader, func); + reader->close(); + } else { + if (BoxWrapper::GetInstance()->UseAfsApi()) { + this->fp_ = BoxWrapper::GetInstance()->OpenReadFile( + filename, this->pipe_command_); + } else { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + } + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + lines = line_reader.read_file(this->fp_.get(), func); + } + if (offset > 0) { + input_channel_->WriteMove(offset, &record_vec[0]); + STAT_ADD(STAT_total_feasign_num_in_mem, + GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num); + if (offset < max_fetch_num) { + SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset)); + } + } else { + SlotRecordPool().put(&record_vec); + } + record_vec.clear(); + timeline.Pause(); + VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_ << ", lines=" << lines + << ", sample lines=" << line_reader.get_sample_line() + << ", filesize=" << line_reader.file_size() / 1024.0 / 1024.0 + << "MB"; + } + if (reader != nullptr) { + delete reader; + } + + VLOG(3) << "LoadIntoMemoryByLib() end, thread_id=" << thread_id_ + << ", total size: " << line_reader.file_size(); +} + +void InputIndexDataFeed::LoadIntoMemory() { + std::vector slots_info; + paddle::framework::ISlotParser* parser = + global_parser_pool().Get(parser_so_path_, slots_info); + CHECK(parser != nullptr); + + boxps::PaddleDataReader* reader = nullptr; + if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) { + reader = + boxps::PaddleDataReader::New(BoxWrapper::GetInstance()->GetFileMgr()); + } + + std::string filename; + BufferedLineFileReader line_reader; + auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); + PADDLE_ENFORCE(!box_ptr->input_table_deque_.empty()); + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + + auto func = [this, &box_ptr, &filename, &parser](const std::string& line) { + auto ret = parser->ParseIndexData( + line, [&box_ptr](std::string& key, std::vector& vec) { + box_ptr->input_table_deque_.back().AddIndexData(key, vec); + }); + if (!ret) { + LOG(WARNING) << "read file:[" << filename << "] item error, line:[" + << line << "]"; + } + }; + + int lines = 0; + if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) { + while (reader->open(filename) < 0) { + sleep(1); + } + lines = line_reader.read_api(reader, func); + reader->close(); + } else { + if (BoxWrapper::GetInstance()->UseAfsApi()) { + this->fp_ = BoxWrapper::GetInstance()->OpenReadFile( + filename, this->pipe_command_); + } else { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + } + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + lines = line_reader.read_file(this->fp_.get(), func); + } + + VLOG(3) << "read file:[" << filename << "], lines:[" << lines << "]"; + } + + if (reader) { + delete reader; + } +} + ////////////////////////////// pack //////////////////////////////////// #if defined(PADDLE_WITH_CUDA) && defined(_LINUX) static void SetCPUAffinity(int tid) { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 9e1e1c39f4fdf..c74d5b3be08bd 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -1013,6 +1013,17 @@ class ISlotParser { std::function&, int)> GetInsFunc) { return true; } + virtual bool ParseOneInstance( + const std::string& line, + std::function GetOffsetFunc, + std::function&, int)> GetInsFunc) { + return true; + } + virtual bool ParseIndexData( + const std::string& line, + std::function&)> AddIndexDataFunc) { + return true; + } }; struct UsedSlotInfo { int idx; @@ -1456,6 +1467,39 @@ class SlotPaddleBoxDataFeedWithGpuReplicaCache : public SlotPaddleBoxDataFeed { int gpu_cache_offset); }; +class InputTableDataFeed : public SlotPaddleBoxDataFeed { + protected: + virtual void LoadIntoMemoryByCommand(void) { + PADDLE_THROW( + "InputTableDataFeed is not implemented LoadIntoMemoryByCommand"); + } + virtual void LoadIntoMemoryByLib(void); +}; + +class InputIndexDataFeed : public DataFeed { + public: + void Init(const DataFeedDesc& data_feed_desc) override { + pipe_command_ = data_feed_desc.index_parser(); + parser_so_path_ = paddle::string::erase_spaces(pipe_command_); + VLOG(3) << "InputIndexDataFeed parser: " << parser_so_path_; + + size_t pos = pipe_command_.find(".so"); + CHECK(pos != std::string::npos); + pipe_command_.clear(); + + finish_init_ = true; + } + bool Start() override { return true; } + int Next() override { return 0; } + void SetThreadId(int thread_id) { thread_id_ = thread_id; } + void LoadIntoMemory() override; + + protected: + int thread_id_ = 0; + std::string parser_so_path_; + std::shared_ptr fp_ = nullptr; +}; + template paddle::framework::Archive& operator<<(paddle::framework::Archive& ar, const SlotValues& r) { diff --git a/paddle/fluid/framework/data_feed.proto b/paddle/fluid/framework/data_feed.proto index 7256ca891a465..f2e8a13d73e9f 100644 --- a/paddle/fluid/framework/data_feed.proto +++ b/paddle/fluid/framework/data_feed.proto @@ -34,4 +34,5 @@ message DataFeedDesc { optional int32 pv_batch_size = 7 [ default = 32 ]; optional int32 input_type = 8 [ default = 0 ]; optional float sample_rate = 9 [ default = 1.0 ]; + optional string index_parser = 10; } diff --git a/paddle/fluid/framework/data_feed_factory.cc b/paddle/fluid/framework/data_feed_factory.cc index 60f9eb08be419..d333a929edbb2 100644 --- a/paddle/fluid/framework/data_feed_factory.cc +++ b/paddle/fluid/framework/data_feed_factory.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/data_feed_factory.h" + #include #include #include @@ -68,6 +69,8 @@ REGISTER_DATAFEED_CLASS(PaddleBoxDataFeed); REGISTER_DATAFEED_CLASS(SlotPaddleBoxDataFeedWithGpuReplicaCache); #ifdef PADDLE_WITH_BOX_PS REGISTER_DATAFEED_CLASS(SlotPaddleBoxDataFeed); +REGISTER_DATAFEED_CLASS(InputTableDataFeed); +REGISTER_DATAFEED_CLASS(InputIndexDataFeed); #endif #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) REGISTER_DATAFEED_CLASS(MultiSlotFileInstantDataFeed); diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 5d35a3281f039..4d07ef292d9e4 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -13,22 +13,23 @@ * limitations under the License. */ #include "paddle/fluid/framework/data_set.h" + #include #include #include #include + #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" #include "paddle/fluid/framework/data_feed_factory.h" +#include "paddle/fluid/framework/fleet/box_wrapper.h" #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/platform/monitor.h" #include "paddle/fluid/platform/timer.h" #include "xxhash.h" // NOLINT -#include "paddle/fluid/framework/fleet/box_wrapper.h" - #if defined _WIN32 || defined __APPLE__ #else #define _LINUX @@ -2045,6 +2046,102 @@ void PadBoxSlotDataset::PrepareTrain(void) { } } } + +void InputTableDataset::LoadIntoMemory() { + VLOG(3) << "InputTableDataset::LoadIntoMemory() begin"; + + platform::Timer timer; + timer.Start(); + LoadIndexIntoMemory(); + timer.Pause(); + VLOG(1) << "load index into memory cost: " << timer.ElapsedSec(); + + platform::Timer timeline; + timeline.Start(); + std::vector load_threads; + std::vector shuffle_threads; + + if (mpi_size_ > 1) { + finished_counter_ = mpi_size_; + mpi_flags_.assign(mpi_size_, 1); + VLOG(3) << "RegisterClientToClientMsgHandler"; + data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); + VLOG(3) << "RegisterClientToClientMsgHandler done"; + } + + std::atomic ref(thread_num_); + for (int64_t i = 0; i < thread_num_; ++i) { + load_threads.push_back(std::thread([this, i, &ref]() { + SetCPUAffinity(i, false); + readers_[i]->LoadIntoMemory(); + if (--ref == 0) { + input_channel_->Close(); + } + })); + } + + // dualbox global data shuffle + if (mpi_size_ > 1) { + ShuffleData(&shuffle_threads, shuffle_thread_num_); + MergeInsKeys(shuffle_channel_); + } else { + MergeInsKeys(input_channel_); + } + + for (std::thread& t : load_threads) { + t.join(); + } + + if (!shuffle_threads.empty()) { + for (std::thread& t : shuffle_threads) { + t.join(); + } + } + + if (data_consumer_ != nullptr) { + delete reinterpret_cast(data_consumer_); + data_consumer_ = nullptr; + } + // shuffle_channel_->Clear(); + // input_channel_->Clear(); + + timeline.Pause(); + VLOG(1) << "PadBoxSlotDataset::LoadIntoMemory() end" + << ", memory data size=" << input_records_.size() + << ", cost time=" << timeline.ElapsedSec() << " seconds"; +} + +void InputTableDataset::LoadIndexIntoMemory() { + VLOG(3) << "LoadIndexIntoMemory()"; + + std::vector> readers; + size_t file_idx = 0; + std::mutex mutex_for_pick_file; + + for (int i = 0; i < thread_num_; ++i) { + readers.push_back(DataFeedFactory::CreateDataFeed("InputIndexDataFeed")); + readers[i]->Init(data_feed_desc_); + readers[i]->SetThreadId(i); + readers[i]->SetFileListMutex(&mutex_for_pick_file); + readers[i]->SetFileListIndex(&file_idx); + readers[i]->SetFileList(index_filelist_); + } + + std::vector threads; + for (int i = 0; i < thread_num_; ++i) { + threads.push_back(std::thread([i, &readers]() { + SetCPUAffinity(i, false); + readers[i]->LoadIntoMemory(); + })); + } + + for (auto& t : threads) { + t.join(); + } + + VLOG(3) << "end LoadIndexIntoMemory()"; +} + #endif } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 86e83eb85cb01..10b64c7fed2d1 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -15,6 +15,7 @@ #pragma once #include + #include #include #include // NOLINT @@ -48,6 +49,7 @@ class Dataset { virtual ~Dataset() {} // set file list virtual void SetFileList(const std::vector& filelist) = 0; + virtual void SetIndexFileList(const std::vector& filelist) {} // set readers' num virtual void SetThreadNum(int thread_num) = 0; // set workers' num @@ -363,10 +365,10 @@ class PadBoxSlotDataset : public DatasetImpl { public: virtual void ReceiveSuffleData(const int client_id, const char* msg, int len); - private: + protected: void MergeInsKeys(const Channel& in); - private: + protected: Channel shuffle_channel_ = nullptr; std::vector mpi_flags_; std::atomic finished_counter_{0}; @@ -378,6 +380,19 @@ class PadBoxSlotDataset : public DatasetImpl { void* data_consumer_ = nullptr; std::atomic receiver_cnt_{0}; }; + +class InputTableDataset : public PadBoxSlotDataset { + public: + virtual void LoadIntoMemory(); + virtual void SetIndexFileList(const std::vector& filelist) { + index_filelist_ = filelist; + } + + private: + void LoadIndexIntoMemory(); + + std::vector index_filelist_; +}; #endif } // end namespace framework diff --git a/paddle/fluid/framework/dataset_factory.cc b/paddle/fluid/framework/dataset_factory.cc index 048b4adc52134..9f1fd6273994d 100644 --- a/paddle/fluid/framework/dataset_factory.cc +++ b/paddle/fluid/framework/dataset_factory.cc @@ -64,6 +64,7 @@ std::unique_ptr DatasetFactory::CreateDataset( REGISTER_DATASET_CLASS(MultiSlotDataset); #ifdef PADDLE_WITH_BOX_PS REGISTER_DATASET_CLASS(PadBoxSlotDataset); +REGISTER_DATASET_CLASS(InputTableDataset); #endif } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index d9c92c8a145ab..cbe5eb7e093d7 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -13,10 +13,12 @@ // limitations under the License. #ifdef PADDLE_WITH_BOX_PS #include "paddle/fluid/framework/fleet/box_wrapper.h" + #include #include #include #include + #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/gpu_info.h" @@ -35,10 +37,12 @@ int BoxWrapper::feature_type_ = 0; float BoxWrapper::pull_embedx_scale_ = 1.0; void BasicAucCalculator::add_unlock_data(double pred, int label) { - PADDLE_ENFORCE_GE(pred, 0.0, platform::errors::PreconditionNotMet( - "pred should be greater than 0")); - PADDLE_ENFORCE_LE(pred, 1.0, platform::errors::PreconditionNotMet( - "pred should be lower than 1")); + PADDLE_ENFORCE_GE( + pred, 0.0, + platform::errors::PreconditionNotMet("pred should be greater than 0")); + PADDLE_ENFORCE_LE( + pred, 1.0, + platform::errors::PreconditionNotMet("pred should be lower than 1")); PADDLE_ENFORCE_EQ( label * label, label, platform::errors::PreconditionNotMet( @@ -464,8 +468,9 @@ void BasicAucCalculator::calculate_bucket_error() { void BoxWrapper::FeedPass(int date, const std::vector& feasgin_to_box) { int ret = boxps_ptr_->FeedPass(date, feasgin_to_box); - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "FeedPass failed in BoxPS.")); + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("FeedPass failed in BoxPS.")); } void BoxWrapper::BeginFeedPass(int date, boxps::PSAgentBase** agent) { @@ -475,17 +480,27 @@ void BoxWrapper::BeginFeedPass(int date, boxps::PSAgentBase** agent) { VLOG(3) << "gpu cache dim:" << dim; gpu_replica_cache.emplace_back(dim); } - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "BeginFeedPass failed in BoxPS.")); + if (dataset_name_ == "InputTableDataset") { + VLOG(3) << "lookup input dim: " << input_table_dim_; + input_table_deque_.emplace_back(input_table_dim_); + } + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("BeginFeedPass failed in BoxPS.")); } void BoxWrapper::EndFeedPass(boxps::PSAgentBase* agent) { if (FLAGS_use_gpu_replica_cache) { gpu_replica_cache.back().ToHBM(); } + if (dataset_name_ == "InputTableDataset") { + auto& t = input_table_deque_.back(); + VLOG(3) << "input table size: " << t.size() << " miss: " << t.miss(); + } int ret = boxps_ptr_->EndFeedPass(agent); - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "EndFeedPass failed in BoxPS.")); + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("EndFeedPass failed in BoxPS.")); } void BoxWrapper::BeginPass() { @@ -495,8 +510,9 @@ void BoxWrapper::BeginPass() { dev.ResetTimer(); } int ret = boxps_ptr_->BeginPass(); - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "BeginPass failed in BoxPS.")); + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("BeginPass failed in BoxPS.")); } void BoxWrapper::SetTestMode(bool is_test) const { @@ -507,6 +523,9 @@ void BoxWrapper::EndPass(bool need_save_delta) { if (FLAGS_use_gpu_replica_cache) { gpu_replica_cache.pop_front(); } + if (dataset_name_ == "InputTableDataset") { + input_table_deque_.pop_front(); + } int ret = boxps_ptr_->EndPass(need_save_delta); PADDLE_ENFORCE_EQ( ret, 0, platform::errors::PreconditionNotMet("EndPass failed in BoxPS.")); diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index 0f9d5abac93a5..c38ddfd71f797 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -23,6 +23,7 @@ limitations under the License. */ #include #endif #include + #include #include #include @@ -33,8 +34,10 @@ limitations under the License. */ #include #include #include +#include #include #include + #include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -170,6 +173,64 @@ class GpuReplicaCache { std::vector h_emb_; }; +class InputTable { + public: + explicit InputTable(uint64_t dim) : dim_(dim), miss_(0) { + // add default vec 0 => [0, 0, ...] + std::vector vec(dim_, 0); + AddIndexData("-", vec); + } + + void AddIndexData(const std::string& key, const std::vector& vec) { + PADDLE_ENFORCE_EQ(vec.size(), dim_); + + table_mutex_.lock(); + key_offset_.emplace(key, table_.size()); + table_.insert(table_.end(), vec.begin(), vec.end()); + table_mutex_.unlock(); + } + + uint64_t GetIndexOffset(const std::string& key) { + auto it = key_offset_.find(key); + if (it == key_offset_.end()) { + ++miss_; + return 0; + } + + return it->second; + } + + void LookupInput(uint64_t* keys, float* values, uint64_t num, + size_t device_id) { + std::vector d_keys; + std::vector d_values; + d_keys.resize(num); + d_values.resize(num * dim_); + + cudaSetDevice(device_id); + cudaMemcpy(d_keys.data(), keys, d_keys.size() * sizeof(uint64_t), + cudaMemcpyDeviceToHost); + for (size_t i = 0; i < num; ++i) { + memcpy(&d_values[i * dim_], &table_[d_keys[i]], dim_ * sizeof(float)); + } + cudaMemcpy(values, d_values.data(), d_values.size() * sizeof(float), + cudaMemcpyHostToDevice); + } + + size_t size() const { return key_offset_.size(); } + + size_t miss() const { return miss_; } + + size_t dim() const { return dim_; } + + protected: + uint64_t dim_; + std::mutex table_mutex_; + std::unordered_map key_offset_; + std::vector table_; + std::atomic miss_; +}; + class BoxWrapper { struct DeviceBoxData { LoDTensor keys_tensor; @@ -204,6 +265,8 @@ class BoxWrapper { public: std::deque gpu_replica_cache; + std::deque input_table_deque_; + virtual ~BoxWrapper() { if (file_manager_ != nullptr) { file_manager_->destory(); @@ -226,7 +289,8 @@ class BoxWrapper { fprintf(stdout, "init box wrapper\n"); boxps::MPICluster::Ins(); } - + void SetDatasetName(const std::string& name) { dataset_name_ = name; } + void SetInputTableDim(size_t dim) { input_table_dim_ = dim; } void FeedPass(int date, const std::vector& feasgin_to_box); void BeginFeedPass(int date, boxps::PSAgentBase** agent); void EndFeedPass(boxps::PSAgentBase* agent); @@ -370,8 +434,9 @@ class BoxWrapper { std::string ret_str; int ret = boxps_ptr_->SaveBase(batch_model_path, xbox_model_path, ret_str, seconds_from_1970 / 86400); - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "SaveBase failed in BoxPS.")); + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("SaveBase failed in BoxPS.")); return ret_str; } @@ -379,8 +444,9 @@ class BoxWrapper { VLOG(3) << "Begin SaveDelta"; std::string ret_str; int ret = boxps_ptr_->SaveDelta(xbox_model_path, ret_str); - PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - "SaveDelta failed in BoxPS.")); + PADDLE_ENFORCE_EQ( + ret, 0, + platform::errors::PreconditionNotMet("SaveDelta failed in BoxPS.")); return ret_str; } @@ -455,8 +521,9 @@ class BoxWrapper { std::string user = fs_ugi.substr(0, split); std::string pwd = fs_ugi.substr(split + 1); bool ret = file_manager_->initialize(fs_name, user, pwd, conf_path); - PADDLE_ENFORCE_EQ(ret, true, platform::errors::PreconditionNotMet( - "Called AFSAPI Init Interface Failed.")); + PADDLE_ENFORCE_EQ(ret, true, + platform::errors::PreconditionNotMet( + "Called AFSAPI Init Interface Failed.")); use_afs_api_ = true; } @@ -861,6 +928,8 @@ class BoxWrapper { // box device cache DeviceBoxData* device_caches_ = nullptr; std::map lr_map_; + std::string dataset_name_; + size_t input_table_dim_ = 0; public: static std::shared_ptr data_shuffle_; diff --git a/paddle/fluid/operators/pull_box_sparse_op.cc b/paddle/fluid/operators/pull_box_sparse_op.cc index 8d453a9f74c2c..1c13011b97292 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.cc +++ b/paddle/fluid/operators/pull_box_sparse_op.cc @@ -160,6 +160,44 @@ class PushBoxSparseOp : public framework::OperatorWithKernel { ctx.device_context()); } }; + +class LookupInputOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + void InferShape(framework::InferShapeContext* ctx) const override { + auto input_dim = ctx->GetInputDim("Id"); + auto size = static_cast(ctx->Attrs().Get("size")); + ctx->SetOutputDim("Out", {input_dim[0], size}); + ctx->ShareLoD("Id", "Out"); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType(framework::proto::VarType::FP32, + ctx.device_context()); + } +}; + +class LookupInputOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("Id", + "Input tensors with type int32 or int64 " + "contains the ids to be lookup input. " + "The last dimension size must be 1."); + AddOutput("Out", "The lookup results tensors."); + AddAttr("size", "(int, the input hidden size").SetDefault(1); + AddComment(R"DOC( +Lookup Input Operator. +This operator is used to lookup input by index, +then concatenated into a dense tensor. +The input Ids can carry the LoD (Level of Details) information, +or not. And the output only shares the LoD information with input Ids. +)DOC"); + } +}; + } // namespace operators } // namespace paddle @@ -177,3 +215,6 @@ REGISTER_OPERATOR(pull_cache_value, ops::PullCacheValuesOp, ops::PushCacheValuesOpMaker); REGISTER_OP_CPU_KERNEL(pull_cache_value, ops::PullCacheValuesCPUKernel) REGISTER_OPERATOR(push_cache_value, ops::PushBoxSparseOp); + +REGISTER_OPERATOR(lookup_input, ops::LookupInputOp, ops::LookupInputOpMaker); +REGISTER_OP_CPU_KERNEL(lookup_input, ops::LookupInputCPUKernel) diff --git a/paddle/fluid/operators/pull_box_sparse_op.cu b/paddle/fluid/operators/pull_box_sparse_op.cu index 8ad45330472b3..5de2c612f20c3 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.cu +++ b/paddle/fluid/operators/pull_box_sparse_op.cu @@ -45,6 +45,14 @@ class PullCacheValuesCUDAKernel: public framework::OpKernel { } }; +template +class LookupInputCUDAKernel: public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + LookupInputFunctor(ctx); + } +}; + } // namespace operators } // namespace paddle @@ -52,3 +60,4 @@ namespace ops = paddle::operators; REGISTER_OP_CUDA_KERNEL(pull_box_sparse, ops::PullBoxSparseCUDAKernel) REGISTER_OP_CUDA_KERNEL(push_box_sparse, ops::PushBoxSparseCUDAKernel) REGISTER_OP_CUDA_KERNEL(pull_cache_value, ops::PullCacheValuesCUDAKernel) +REGISTER_OP_CUDA_KERNEL(lookup_input, ops::LookupInputCUDAKernel) diff --git a/paddle/fluid/operators/pull_box_sparse_op.h b/paddle/fluid/operators/pull_box_sparse_op.h index 1b52e29a216f4..5f3c58308725b 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.h +++ b/paddle/fluid/operators/pull_box_sparse_op.h @@ -15,6 +15,7 @@ #pragma once #include #include + #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/fleet/box_wrapper.h" #include "paddle/fluid/framework/op_registry.h" @@ -45,19 +46,41 @@ static void PaddingZeros(const framework::ExecutionContext &ctx, template static void PullCacheValuesFunctor(const framework::ExecutionContext &ctx) { - const auto* input = ctx.Input("Id"); - auto* output = ctx.Output("Out"); + const auto *input = ctx.Input("Id"); + auto *output = ctx.Output("Out"); auto batch_size = input->dims()[0]; - uint64_t* input_data = reinterpret_cast(const_cast(input->data())); - float* output_data = const_cast(output->mutable_data(ctx.GetPlace())); + uint64_t *input_data = reinterpret_cast( + const_cast(input->data())); + float *output_data = + const_cast(output->mutable_data(ctx.GetPlace())); #ifdef PADDLE_WITH_BOX_PS auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); int i = BOOST_GET_CONST(platform::CUDAPlace, ctx.GetPlace()).GetDeviceId(); - box_ptr->gpu_replica_cache.front().PullCacheValue(input_data, output_data, batch_size, i); + box_ptr->gpu_replica_cache.front().PullCacheValue(input_data, output_data, + batch_size, i); +#endif +} + +template +static void LookupInputFunctor(const framework::ExecutionContext &ctx) { + const auto *input = ctx.Input("Id"); + auto *output = ctx.Output("Out"); + auto batch_size = input->dims()[0]; + uint64_t *input_data = reinterpret_cast( + const_cast(input->data())); + float *output_data = + const_cast(output->mutable_data(ctx.GetPlace())); + +#ifdef PADDLE_WITH_BOX_PS + auto box_ptr = paddle::framework::BoxWrapper::GetInstance(); + size_t device_id = + BOOST_GET_CONST(platform::CUDAPlace, ctx.GetPlace()).GetDeviceId(); + box_ptr->input_table_deque_.front().LookupInput(input_data, output_data, + batch_size, device_id); #endif } @@ -178,5 +201,14 @@ class PushBoxSparseCPUKernel : public framework::OpKernel { PushBoxSparseFunctor(ctx); } }; + +template +class LookupInputCPUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + LookupInputFunctor(ctx); + } +}; + } // namespace operators } // namespace paddle diff --git a/paddle/fluid/pybind/box_helper_py.cc b/paddle/fluid/pybind/box_helper_py.cc index 92784608d3935..01e1ab382914d 100644 --- a/paddle/fluid/pybind/box_helper_py.cc +++ b/paddle/fluid/pybind/box_helper_py.cc @@ -65,9 +65,11 @@ void BindBoxHelper(py::module* m) { void BindBoxWrapper(py::module* m) { py::class_>( *m, "BoxWrapper") - .def(py::init([](int embedx_dim, int expand_embed_dim, bool is_quant, float pull_embedx_scale) { + .def(py::init([](int embedx_dim, int expand_embed_dim, bool is_quant, + float pull_embedx_scale) { // return std::make_shared(dataset); - return framework::BoxWrapper::SetInstance(embedx_dim, expand_embed_dim, is_quant, pull_embedx_scale); + return framework::BoxWrapper::SetInstance(embedx_dim, expand_embed_dim, + is_quant, pull_embedx_scale); })) .def("save_base", &framework::BoxWrapper::SaveBase, py::call_guard()) @@ -95,6 +97,10 @@ void BindBoxWrapper(py::module* m) { .def("finalize", &framework::BoxWrapper::Finalize, py::call_guard()) .def("release_pool", &framework::BoxWrapper::ReleasePool, + py::call_guard()) + .def("set_dataset_name", &framework::BoxWrapper::SetDatasetName, + py::call_guard()) + .def("set_input_table_dim", &framework::BoxWrapper::SetInputTableDim, py::call_guard()); } // end BoxWrapper #endif diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 7a32d8729fc6c..002a65a0e8807 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -204,6 +204,8 @@ void BindDataset(py::module *m) { })) .def("set_filelist", &framework::Dataset::SetFileList, py::call_guard()) + .def("set_index_filelist", &framework::Dataset::SetIndexFileList, + py::call_guard()) .def("set_thread_num", &framework::Dataset::SetThreadNum, py::call_guard()) .def("set_trainer_num", &framework::Dataset::SetTrainerNum, diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 75a0eb5de14f5..125e6e0c5b67e 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -1168,3 +1168,50 @@ def load_into_memory(self): """ self._prepare_to_run() self.boxps.read_ins_into_memory() + + +class InputTableDataset(PadBoxSlotDataset): + def __init__(self): + """ + InputTableDataset: derived from PadBoxSlotDataset. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InputTableDataset") + """ + self.proto_desc = data_feed_pb2.DataFeedDesc() + self.proto_desc.pipe_command = "cat" + self.dataset = core.Dataset("InputTableDataset") + self.thread_num = 1 + self.filelist = [] + self.boxps = core.BoxPS(self.dataset) + self.proto_desc.name = "InputTableDataFeed" + self.fleet_send_batch_size = None + self.is_user_set_queue_num = False + self.queue_num = None + self.parse_ins_id = False + self.parse_content = False + self.parse_logkey = False + self.merge_by_sid = True + self.enable_pv_merge = False + self.merge_by_lineid = False + self.fleet_send_sleep_seconds = None + + def set_index_parser(self, index_parser): + """ set index parser + """ + self.proto_desc.index_parser = index_parser + + def set_index_filelist(self, filelist): + """ set index filelist + """ + self.dataset.set_index_filelist(filelist) + + def set_feed_type(self, data_feed_type): + """ + Set data_feed_desc + """ + assert data_feed_type == 'InputTableDataFeed', 'InputTableDataset must use InputTableDataFeed' + self.proto_desc.name = data_feed_type diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index aea7c780cb281..fbf13894e1c9c 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -647,6 +647,7 @@ def _pull_sparse_v2(input, return outs[0] return outs + def _pull_cache_value(input, size, dtype='float32'): """ **Pull Box Sparse Layer** @@ -660,6 +661,7 @@ def _pull_cache_value(input, size, dtype='float32'): attrs={'size': size}) return out + def _pull_box_sparse(input, size, dtype='float32'): """ **Pull Box Sparse Layer** @@ -708,6 +710,20 @@ def _pull_box_sparse(input, size, dtype='float32'): return outs +def lookup_input(input, size): + """ + lookup_input + """ + helper = LayerHelper('lookup_input', **locals()) + out = helper.create_variable_for_type_inference('float32') + helper.append_op( + type='lookup_input', + inputs={'Id': [input]}, + outputs={'Out': [out]}, + attrs={'size': size}) + return out + + @templatedoc() def linear_chain_crf(input, label, param_attr=None, length=None): """