Skip to content

Commit

Permalink
Merge pull request #14 from qingshui/paddlebox
Browse files Browse the repository at this point in the history
support share embeding
  • Loading branch information
qingshui authored Jul 29, 2021
2 parents df46df6 + 983918d commit 2146a1a
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 116 deletions.
2 changes: 1 addition & 1 deletion cmake/external/box_ps.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ IF((NOT DEFINED BOX_PS_VER) OR (NOT DEFINED BOX_PS_URL))
SET(BOX_PS_VER "0.1.1" CACHE STRING "" FORCE)
SET(BOX_PS_NAME "box_ps" CACHE STRING "" FORCE)
#SET(BOX_PS_URL "http://box-ps.gz.bcebos.com/box_ps.tar.gz" CACHE STRING "" FORCE)
SET(BOX_PS_URL "data-im.baidu.com:/home/work/var/CI_DATA/im/static/box_ps.tar.gz/box_ps.tar.gz.14" CACHE STRING "" FORCE)
SET(BOX_PS_URL "data-im.baidu.com:/home/work/var/CI_DATA/im/static/box_ps.tar.gz/box_ps.tar.gz.15" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "BOX_PS_NAME: ${BOX_PS_NAME}, BOX_PS_URL: ${BOX_PS_URL}")
SET(BOX_PS_SOURCE_DIR "${THIRD_PARTY_PATH}/box_ps")
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,7 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLine(void) {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
record_vec.shrink_to_fit();
timeline.Pause();
VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
Expand Down Expand Up @@ -2925,6 +2926,7 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByCommand(void) {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
record_vec.shrink_to_fit();
timeline.Pause();
VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename
<< ", lines=" << lines
Expand Down Expand Up @@ -3177,6 +3179,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByLib(void) {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
record_vec.shrink_to_fit();
timeline.Pause();
VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
Expand Down Expand Up @@ -3257,6 +3260,7 @@ void SlotPaddleBoxDataFeedWithGpuReplicaCache::LoadIntoMemoryByCommand(void) {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
record_vec.shrink_to_fit();
timeline.Pause();
VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
Expand Down Expand Up @@ -3481,6 +3485,7 @@ void InputTableDataFeed::LoadIntoMemoryByLib() {
SlotRecordPool().put(&record_vec);
}
record_vec.clear();
record_vec.shrink_to_fit();
timeline.Pause();
VLOG(3) << "LoadIntoMemoryByLib() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
Expand Down
65 changes: 44 additions & 21 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ DECLARE_int32(padbox_record_pool_max_size);
DECLARE_int32(padbox_slotpool_thread_num);
DECLARE_int32(padbox_slotrecord_extend_dim);
DECLARE_bool(padbox_auc_runner_mode);
DECLARE_bool(enable_slotrecord_reset_shrink);
DECLARE_bool(enable_slotpool_wait_release);

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -776,10 +778,6 @@ struct SlotValues {
std::vector<T> slot_values;
std::vector<uint32_t> slot_offsets;

~SlotValues() {
slot_values.shrink_to_fit();
slot_offsets.shrink_to_fit();
}
void add_values(const T* values, uint32_t num) {
if (slot_offsets.empty()) {
slot_offsets.push_back(0);
Expand Down Expand Up @@ -809,9 +807,13 @@ struct SlotValues {
}
slot_offsets[slot_num] = slot_values.size();
}
void clear(void) {
void clear(bool shrink) {
slot_offsets.clear();
slot_values.clear();
if (shrink) {
slot_values.shrink_to_fit();
slot_offsets.shrink_to_fit();
}
}
};

Expand All @@ -830,14 +832,11 @@ struct SlotRecordObject {
SlotValues<uint64_t> slot_uint64_feasigns_;
SlotValues<float> slot_float_feasigns_;

~SlotRecordObject() {
slot_uint64_feasigns_.clear();
slot_float_feasigns_.clear();
}

void reset(void) {
slot_uint64_feasigns_.clear();
slot_float_feasigns_.clear();
~SlotRecordObject() { clear(true); }
void reset(void) { clear(FLAGS_enable_slotrecord_reset_shrink); }
void clear(bool shrink) {
slot_uint64_feasigns_.clear(shrink);
slot_float_feasigns_.clear(shrink);
}
};
using SlotRecord = SlotRecordObject*;
Expand Down Expand Up @@ -891,15 +890,16 @@ inline int GetTotalFeaNum(const std::vector<SlotRecord>& slot_record,
template <class T>
class SlotObjAllocator {
public:
SlotObjAllocator() : free_nodes_(NULL), capacity_(0) {}
explicit SlotObjAllocator(std::function<void(T*)> deleter)
: free_nodes_(NULL), capacity_(0), deleter_(deleter) {}
~SlotObjAllocator() { clear(); }

void clear(void) {
void clear() {
T* tmp = NULL;
while (free_nodes_ != NULL) {
tmp = reinterpret_cast<T*>(reinterpret_cast<void*>(free_nodes_));
free_nodes_ = free_nodes_->next;
delete tmp;
deleter_(tmp);
--capacity_;
}
CHECK_EQ(capacity_, static_cast<size_t>(0));
Expand Down Expand Up @@ -928,17 +928,21 @@ class SlotObjAllocator {
};
Node* free_nodes_; // a list
size_t capacity_;
std::function<void(T*)> deleter_ = nullptr;
};
static const int OBJPOOL_BLOCK_SIZE = 10000;
class SlotObjPool {
public:
SlotObjPool() : max_capacity_(FLAGS_padbox_record_pool_max_size) {
SlotObjPool()
: max_capacity_(FLAGS_padbox_record_pool_max_size),
alloc_(free_slotrecord) {
ins_chan_ = MakeChannel<SlotRecord>();
ins_chan_->SetBlockSize(OBJPOOL_BLOCK_SIZE);
for (int i = 0; i < FLAGS_padbox_slotpool_thread_num; ++i) {
threads_.push_back(std::thread([this]() { run(); }));
}
disable_pool_ = false;
count_ = 0;
}
~SlotObjPool() {
ins_chan_->Close();
Expand All @@ -963,6 +967,7 @@ class SlotObjPool {
}
}
mutex_.unlock();
count_ += n;
if (size == n) {
return;
}
Expand All @@ -983,19 +988,23 @@ class SlotObjPool {
}
void run(void) {
std::vector<SlotRecord> input;
while (ins_chan_->Read(input)) {
while (ins_chan_->ReadOnce(input, OBJPOOL_BLOCK_SIZE)) {
if (input.empty()) {
continue;
}
// over max capacity
if (disable_pool_ || input.size() + capacity() > max_capacity_) {
size_t n = input.size();
count_ -= n;
if (disable_pool_ || n + capacity() > max_capacity_) {
for (auto& t : input) {
free_slotrecord(t);
}
} else {
mutex_.lock();
for (auto& t : input) {
t->reset();
}
mutex_.lock();
for (auto& t : input) {
alloc_.release(t);
}
mutex_.unlock();
Expand All @@ -1004,9 +1013,20 @@ class SlotObjPool {
}
}
void clear(void) {
platform::Timer timeline;
timeline.Start();
mutex_.lock();
alloc_.clear();
mutex_.unlock();
// wait release channel data
if (FLAGS_enable_slotpool_wait_release) {
while (!ins_chan_->Empty()) {
sleep(1);
}
}
timeline.Pause();
LOG(WARNING) << "clear slot pool data size=" << count_.load()
<< ", span=" << timeline.ElapsedSec();
}
size_t capacity(void) {
mutex_.lock();
Expand All @@ -1022,6 +1042,7 @@ class SlotObjPool {
std::mutex mutex_;
SlotObjAllocator<SlotRecordObject> alloc_;
bool disable_pool_;
std::atomic<long> count_; // NOLINT
};

inline SlotObjPool& SlotRecordPool() {
Expand Down Expand Up @@ -1570,7 +1591,9 @@ class SlotPaddleBoxDataFeed : public DataFeed {
virtual ~SlotPaddleBoxDataFeed() {
#if defined(PADDLE_WITH_CUDA) && defined(_LINUX)
if (pack_ != nullptr) {
LOG(WARNING) << "pack batch total time: " << batch_timer_.ElapsedSec()
LOG(WARNING) << "gpu: "
<< boost::get<platform::CUDAPlace>(place_).GetDeviceId()
<< ", pack batch total time: " << batch_timer_.ElapsedSec()
<< "[copy:" << pack_->trans_time_span()
<< ",fill:" << fill_timer_.ElapsedSec()
<< ",memory:" << offset_timer_.ElapsedSec()
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,10 @@ void PadBoxSlotDataset::ShuffleData(int thread_num) {
wg.wait();
timer.Pause();

data.shrink_to_fit();
loc_datas.shrink_to_fit();
releases.shrink_to_fit();

double span = timer.ElapsedSec();
if (span > max_shuffle_span_) {
max_shuffle_span_ = span;
Expand Down
124 changes: 63 additions & 61 deletions paddle/fluid/framework/fleet/box_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ namespace framework {
std::shared_ptr<BoxWrapper> BoxWrapper::s_instance_ = nullptr;
std::shared_ptr<boxps::PaddleShuffler> BoxWrapper::data_shuffle_ = nullptr;
cudaStream_t BoxWrapper::stream_list_[MAX_GPU_NUM];
// int BoxWrapper::embedx_dim_ = 8;
// int BoxWrapper::expand_embed_dim_ = 0;
// int BoxWrapper::feature_type_ = 0;
// float BoxWrapper::pull_embedx_scale_ = 1.0;

void BasicAucCalculator::add_unlock_data(double pred, int label) {
PADDLE_ENFORCE_GE(pred, 0.0, platform::errors::PreconditionNotMet(
Expand Down Expand Up @@ -372,24 +368,24 @@ void BasicAucCalculator::compute() {

void BoxWrapper::CheckEmbedSizeIsValid(int embedx_dim, int expand_embed_dim) {
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) {
PADDLE_ENFORCE_EQ(
(embedx_dim % boxps::SHARE_EMBEDDING_NUM), 0,
platform::errors::InvalidArgument(
"SetInstance(): invalid embedx_dim. "
"embedx_dim % boxps::SHARE_EMBEDDING_NUM shoule be 0"));
PADDLE_ENFORCE_EQ((embedx_dim % expand_embed_dim), 0,
platform::errors::InvalidArgument(
"SetInstance(): invalid embedx_dim. "
"embedx_dim % expand_embed_dim shoule be 0"));

embedx_dim = embedx_dim / boxps::SHARE_EMBEDDING_NUM;
embedx_dim = embedx_dim / expand_embed_dim;
} else {
PADDLE_ENFORCE_EQ(expand_embed_dim_, expand_embed_dim,
platform::errors::InvalidArgument(
"SetInstance(): invalid expand_embed_dim. When "
"expand_embed_dim = %d, but got %d.",
expand_embed_dim_, expand_embed_dim));
}
PADDLE_ENFORCE_EQ(
embedx_dim_, embedx_dim,
platform::errors::InvalidArgument("SetInstance(): invalid embedx_dim. "
"When embedx_dim = %d, but got %d.",
embedx_dim_, embedx_dim));
PADDLE_ENFORCE_EQ(expand_embed_dim_, expand_embed_dim,
platform::errors::InvalidArgument(
"SetInstance(): invalid expand_embed_dim. When "
"expand_embed_dim = %d, but got %d.",
expand_embed_dim_, expand_embed_dim));
}

void BoxWrapper::PullSparse(const paddle::platform::Place& place,
Expand All @@ -408,33 +404,36 @@ void BoxWrapper::PullSparse(const paddle::platform::Place& place,
} \
} break

#define PULLSPARSE_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) { \
constexpr size_t SingleEmbedxDim = \
EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \
PullSparseCase<boxps::FeaturePullValueGpuShareEmbedding<SingleEmbedxDim, \
ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PullSparseCase<boxps::FeaturePullValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_QUANT) || \
feature_type_ == static_cast<int>(boxps::FEATURE_SHOWCLK)) { \
PullSparseCase<boxps::FeaturePullValueGpuQuant<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else { \
PullSparseCase<boxps::FeaturePullValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} \
#define PULLSPARSE_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) { \
PullSparseCase< \
boxps::FeaturePullValueGpuShareEmbedding<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PullSparseCase<boxps::FeaturePullValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_QUANT) || \
feature_type_ == static_cast<int>(boxps::FEATURE_SHOWCLK)) { \
PullSparseCase<boxps::FeaturePullValueGpuQuant<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} else { \
PullSparseCase<boxps::FeaturePullValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
} \
} break

CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim);
switch (hidden_size - cvm_offset_) {
EMBEDX_CASE(8, PULLSPARSE_CASE(0); PULLSPARSE_CASE(8);
switch (embedx_dim_) {
EMBEDX_CASE(8, PULLSPARSE_CASE(0); PULLSPARSE_CASE(1); PULLSPARSE_CASE(2);
PULLSPARSE_CASE(3); PULLSPARSE_CASE(4); PULLSPARSE_CASE(5);
PULLSPARSE_CASE(6); PULLSPARSE_CASE(7); PULLSPARSE_CASE(8);
PULLSPARSE_CASE(64););
EMBEDX_CASE(16, PULLSPARSE_CASE(0); PULLSPARSE_CASE(1); PULLSPARSE_CASE(2);
PULLSPARSE_CASE(3); PULLSPARSE_CASE(4); PULLSPARSE_CASE(5);
PULLSPARSE_CASE(6); PULLSPARSE_CASE(7); PULLSPARSE_CASE(8);
PULLSPARSE_CASE(64););
EMBEDX_CASE(16, PULLSPARSE_CASE(0); PULLSPARSE_CASE(64););
EMBEDX_CASE(32, PULLSPARSE_CASE(0););
EMBEDX_CASE(64, PULLSPARSE_CASE(0););
EMBEDX_CASE(256, PULLSPARSE_CASE(0););
Expand Down Expand Up @@ -466,33 +465,36 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place,
} \
} break

#define PUSHSPARSE_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) { \
constexpr size_t SingleEmbedxDim = \
EmbedxDim / boxps::SHARE_EMBEDDING_NUM; \
PushSparseGradCase<boxps::FeaturePushValueGpuShareEmbedding< \
SingleEmbedxDim, ExpandDim>>(place, keys, grad_values, slot_lengths, \
hidden_size, expand_embed_dim, \
batch_size); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PushSparseGradCase< \
boxps::FeaturePushValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
} else { \
PushSparseGradCase<boxps::FeaturePushValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
} \
#define PUSHSPARSE_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) { \
PushSparseGradCase< \
boxps::FeaturePushValueGpuShareEmbedding<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PushSparseGradCase< \
boxps::FeaturePushValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
} else { \
PushSparseGradCase<boxps::FeaturePushValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
} \
} break

CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim);
switch (hidden_size - cvm_offset_) {
EMBEDX_CASE(8, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(8);
switch (embedx_dim_) {
EMBEDX_CASE(8, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(1); PUSHSPARSE_CASE(2);
PUSHSPARSE_CASE(3); PUSHSPARSE_CASE(4); PUSHSPARSE_CASE(5);
PUSHSPARSE_CASE(6); PUSHSPARSE_CASE(7); PUSHSPARSE_CASE(8);
PUSHSPARSE_CASE(64););
EMBEDX_CASE(16, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(1); PUSHSPARSE_CASE(2);
PUSHSPARSE_CASE(3); PUSHSPARSE_CASE(4); PUSHSPARSE_CASE(5);
PUSHSPARSE_CASE(6); PUSHSPARSE_CASE(7); PUSHSPARSE_CASE(8);
PUSHSPARSE_CASE(64););
EMBEDX_CASE(16, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(64););
EMBEDX_CASE(32, PUSHSPARSE_CASE(0););
EMBEDX_CASE(64, PUSHSPARSE_CASE(0););
EMBEDX_CASE(256, PUSHSPARSE_CASE(0););
Expand Down
Loading

0 comments on commit 2146a1a

Please sign in to comment.