Skip to content

Commit

Permalink
feat: lookup_input op (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
laipaang authored Dec 15, 2020
1 parent 101fe67 commit 23a73d7
Show file tree
Hide file tree
Showing 16 changed files with 608 additions and 30 deletions.
176 changes: 176 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3189,6 +3189,182 @@ bool SlotPaddleBoxDataFeedWithGpuReplicaCache::ParseOneInstance(
return (uint64_total_slot_num > 0);
}

void InputTableDataFeed::LoadIntoMemoryByLib() {
paddle::framework::ISlotParser* parser =
global_parser_pool().Get(parser_so_path_, all_slots_info_);
CHECK(parser != nullptr);

boxps::PaddleDataReader* reader = nullptr;
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
reader =
boxps::PaddleDataReader::New(BoxWrapper::GetInstance()->GetFileMgr());
}

std::string filename;
BufferedLineFileReader line_reader;
line_reader.set_sample_rate(sample_rate_);

int from_pool_num = 0;
auto box_ptr = paddle::framework::BoxWrapper::GetInstance();
PADDLE_ENFORCE(!box_ptr->input_table_deque_.empty());
while (this->PickOneFile(&filename)) {
VLOG(3) << "PickOneFile, filename=" << filename
<< ", thread_id=" << thread_id_;
std::vector<SlotRecord> record_vec;
platform::Timer timeline;
timeline.Start();
const int max_fetch_num = 10000;
int offset = 0;

SlotRecordPool().get(&record_vec, max_fetch_num);
from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num);
auto func = [this, &box_ptr, &parser, &record_vec, &offset, &max_fetch_num,
&from_pool_num, &filename](const std::string& line) {
int old_offset = offset;
auto GetOffsetFunc = [&box_ptr](std::string& key) -> uint64_t {
return box_ptr->input_table_deque_.back().GetIndexOffset(key);
};

if (!parser->ParseOneInstance(
line, GetOffsetFunc,
[this, &offset, &record_vec, &max_fetch_num, &old_offset](
std::vector<SlotRecord>& vec, int num) {
vec.resize(num);
if (offset + num > max_fetch_num) {
// Considering the prob of show expanding is low, so we don't
// update STAT here
input_channel_->WriteMove(offset, &record_vec[0]);
SlotRecordPool().get(&record_vec[0], offset);
record_vec.resize(max_fetch_num);
offset = 0;
old_offset = 0;
}
for (int i = 0; i < num; ++i) {
auto& ins = record_vec[offset + i];
ins->reset();
vec[i] = ins;
}
offset = offset + num;
})) {
offset = old_offset;
LOG(WARNING) << "read file:[" << filename << "] item error, line:["
<< line << "]";
}
if (offset >= max_fetch_num) {
input_channel_->Write(std::move(record_vec));
STAT_ADD(STAT_total_feasign_num_in_mem,
GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num);
record_vec.clear();
SlotRecordPool().get(&record_vec, max_fetch_num);
from_pool_num = GetTotalFeaNum(record_vec, max_fetch_num);
offset = 0;
}
};
int lines = 0;
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
while (reader->open(filename) < 0) {
sleep(1);
}
lines = line_reader.read_api(reader, func);
reader->close();
} else {
if (BoxWrapper::GetInstance()->UseAfsApi()) {
this->fp_ = BoxWrapper::GetInstance()->OpenReadFile(
filename, this->pipe_command_);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
}
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
lines = line_reader.read_file(this->fp_.get(), func);
}
if (offset > 0) {
input_channel_->WriteMove(offset, &record_vec[0]);
STAT_ADD(STAT_total_feasign_num_in_mem,
GetTotalFeaNum(record_vec, max_fetch_num) - from_pool_num);
if (offset < max_fetch_num) {
SlotRecordPool().put(&record_vec[offset], (max_fetch_num - offset));
}
} else {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
timeline.Pause();
VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
<< " seconds, thread_id=" << thread_id_ << ", lines=" << lines
<< ", sample lines=" << line_reader.get_sample_line()
<< ", filesize=" << line_reader.file_size() / 1024.0 / 1024.0
<< "MB";
}
if (reader != nullptr) {
delete reader;
}

VLOG(3) << "LoadIntoMemoryByLib() end, thread_id=" << thread_id_
<< ", total size: " << line_reader.file_size();
}

void InputIndexDataFeed::LoadIntoMemory() {
std::vector<AllSlotInfo> slots_info;
paddle::framework::ISlotParser* parser =
global_parser_pool().Get(parser_so_path_, slots_info);
CHECK(parser != nullptr);

boxps::PaddleDataReader* reader = nullptr;
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
reader =
boxps::PaddleDataReader::New(BoxWrapper::GetInstance()->GetFileMgr());
}

std::string filename;
BufferedLineFileReader line_reader;
auto box_ptr = paddle::framework::BoxWrapper::GetInstance();
PADDLE_ENFORCE(!box_ptr->input_table_deque_.empty());
while (this->PickOneFile(&filename)) {
VLOG(3) << "PickOneFile, filename=" << filename
<< ", thread_id=" << thread_id_;

auto func = [this, &box_ptr, &filename, &parser](const std::string& line) {
auto ret = parser->ParseIndexData(
line, [&box_ptr](std::string& key, std::vector<float>& vec) {
box_ptr->input_table_deque_.back().AddIndexData(key, vec);
});
if (!ret) {
LOG(WARNING) << "read file:[" << filename << "] item error, line:["
<< line << "]";
}
};

int lines = 0;
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
while (reader->open(filename) < 0) {
sleep(1);
}
lines = line_reader.read_api(reader, func);
reader->close();
} else {
if (BoxWrapper::GetInstance()->UseAfsApi()) {
this->fp_ = BoxWrapper::GetInstance()->OpenReadFile(
filename, this->pipe_command_);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
}
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
lines = line_reader.read_file(this->fp_.get(), func);
}

VLOG(3) << "read file:[" << filename << "], lines:[" << lines << "]";
}

if (reader) {
delete reader;
}
}

////////////////////////////// pack ////////////////////////////////////
#if defined(PADDLE_WITH_CUDA) && defined(_LINUX)
static void SetCPUAffinity(int tid) {
Expand Down
44 changes: 44 additions & 0 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,17 @@ class ISlotParser {
std::function<void(std::vector<SlotRecord>&, int)> GetInsFunc) {
return true;
}
virtual bool ParseOneInstance(
const std::string& line,
std::function<uint64_t(std::string&)> GetOffsetFunc,
std::function<void(std::vector<SlotRecord>&, int)> GetInsFunc) {
return true;
}
virtual bool ParseIndexData(
const std::string& line,
std::function<void(std::string&, std::vector<float>&)> AddIndexDataFunc) {
return true;
}
};
struct UsedSlotInfo {
int idx;
Expand Down Expand Up @@ -1456,6 +1467,39 @@ class SlotPaddleBoxDataFeedWithGpuReplicaCache : public SlotPaddleBoxDataFeed {
int gpu_cache_offset);
};

class InputTableDataFeed : public SlotPaddleBoxDataFeed {
protected:
virtual void LoadIntoMemoryByCommand(void) {
PADDLE_THROW(
"InputTableDataFeed is not implemented LoadIntoMemoryByCommand");
}
virtual void LoadIntoMemoryByLib(void);
};

class InputIndexDataFeed : public DataFeed {
public:
void Init(const DataFeedDesc& data_feed_desc) override {
pipe_command_ = data_feed_desc.index_parser();
parser_so_path_ = paddle::string::erase_spaces(pipe_command_);
VLOG(3) << "InputIndexDataFeed parser: " << parser_so_path_;

size_t pos = pipe_command_.find(".so");
CHECK(pos != std::string::npos);
pipe_command_.clear();

finish_init_ = true;
}
bool Start() override { return true; }
int Next() override { return 0; }
void SetThreadId(int thread_id) { thread_id_ = thread_id; }
void LoadIntoMemory() override;

protected:
int thread_id_ = 0;
std::string parser_so_path_;
std::shared_ptr<FILE> fp_ = nullptr;
};

template <class AR, class T>
paddle::framework::Archive<AR>& operator<<(paddle::framework::Archive<AR>& ar,
const SlotValues<T>& r) {
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/data_feed.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ message DataFeedDesc {
optional int32 pv_batch_size = 7 [ default = 32 ];
optional int32 input_type = 8 [ default = 0 ];
optional float sample_rate = 9 [ default = 1.0 ];
optional string index_parser = 10;
}
3 changes: 3 additions & 0 deletions paddle/fluid/framework/data_feed_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/data_feed_factory.h"

#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -68,6 +69,8 @@ REGISTER_DATAFEED_CLASS(PaddleBoxDataFeed);
REGISTER_DATAFEED_CLASS(SlotPaddleBoxDataFeedWithGpuReplicaCache);
#ifdef PADDLE_WITH_BOX_PS
REGISTER_DATAFEED_CLASS(SlotPaddleBoxDataFeed);
REGISTER_DATAFEED_CLASS(InputTableDataFeed);
REGISTER_DATAFEED_CLASS(InputIndexDataFeed);
#endif
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
REGISTER_DATAFEED_CLASS(MultiSlotFileInstantDataFeed);
Expand Down
101 changes: 99 additions & 2 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@
* limitations under the License. */

#include "paddle/fluid/framework/data_set.h"

#include <algorithm>
#include <random>
#include <unordered_map>
#include <unordered_set>

#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/fleet/box_wrapper.h"
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/platform/monitor.h"
#include "paddle/fluid/platform/timer.h"
#include "xxhash.h" // NOLINT

#include "paddle/fluid/framework/fleet/box_wrapper.h"

#if defined _WIN32 || defined __APPLE__
#else
#define _LINUX
Expand Down Expand Up @@ -2045,6 +2046,102 @@ void PadBoxSlotDataset::PrepareTrain(void) {
}
}
}

void InputTableDataset::LoadIntoMemory() {
VLOG(3) << "InputTableDataset<T>::LoadIntoMemory() begin";

platform::Timer timer;
timer.Start();
LoadIndexIntoMemory();
timer.Pause();
VLOG(1) << "load index into memory cost: " << timer.ElapsedSec();

platform::Timer timeline;
timeline.Start();
std::vector<std::thread> load_threads;
std::vector<std::thread> shuffle_threads;

if (mpi_size_ > 1) {
finished_counter_ = mpi_size_;
mpi_flags_.assign(mpi_size_, 1);
VLOG(3) << "RegisterClientToClientMsgHandler";
data_consumer_ = reinterpret_cast<void*>(new PadBoxSlotDataConsumer(this));
VLOG(3) << "RegisterClientToClientMsgHandler done";
}

std::atomic<int> ref(thread_num_);
for (int64_t i = 0; i < thread_num_; ++i) {
load_threads.push_back(std::thread([this, i, &ref]() {
SetCPUAffinity(i, false);
readers_[i]->LoadIntoMemory();
if (--ref == 0) {
input_channel_->Close();
}
}));
}

// dualbox global data shuffle
if (mpi_size_ > 1) {
ShuffleData(&shuffle_threads, shuffle_thread_num_);
MergeInsKeys(shuffle_channel_);
} else {
MergeInsKeys(input_channel_);
}

for (std::thread& t : load_threads) {
t.join();
}

if (!shuffle_threads.empty()) {
for (std::thread& t : shuffle_threads) {
t.join();
}
}

if (data_consumer_ != nullptr) {
delete reinterpret_cast<PadBoxSlotDataConsumer*>(data_consumer_);
data_consumer_ = nullptr;
}
// shuffle_channel_->Clear();
// input_channel_->Clear();

timeline.Pause();
VLOG(1) << "PadBoxSlotDataset::LoadIntoMemory() end"
<< ", memory data size=" << input_records_.size()
<< ", cost time=" << timeline.ElapsedSec() << " seconds";
}

void InputTableDataset::LoadIndexIntoMemory() {
VLOG(3) << "LoadIndexIntoMemory()";

std::vector<std::shared_ptr<paddle::framework::DataFeed>> readers;
size_t file_idx = 0;
std::mutex mutex_for_pick_file;

for (int i = 0; i < thread_num_; ++i) {
readers.push_back(DataFeedFactory::CreateDataFeed("InputIndexDataFeed"));
readers[i]->Init(data_feed_desc_);
readers[i]->SetThreadId(i);
readers[i]->SetFileListMutex(&mutex_for_pick_file);
readers[i]->SetFileListIndex(&file_idx);
readers[i]->SetFileList(index_filelist_);
}

std::vector<std::thread> threads;
for (int i = 0; i < thread_num_; ++i) {
threads.push_back(std::thread([i, &readers]() {
SetCPUAffinity(i, false);
readers[i]->LoadIntoMemory();
}));
}

for (auto& t : threads) {
t.join();
}

VLOG(3) << "end LoadIndexIntoMemory()";
}

#endif
} // end namespace framework
} // end namespace paddle
Loading

0 comments on commit 23a73d7

Please sign in to comment.