Skip to content

Commit

Permalink
Optimize cpu sketch allreduce for sparse data. (#6009)
Browse files Browse the repository at this point in the history
* Bypass RABIT serialization reducer and use custom allgather based merging.
  • Loading branch information
trivialfis authored Aug 19, 2020
1 parent 90355b4 commit 29b7fea
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 86 deletions.
24 changes: 6 additions & 18 deletions src/common/hist_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,14 @@ inline HistogramCuts SketchOnDMatrix(DMatrix *m, int32_t max_bins) {
for (auto& column : column_sizes) {
column.resize(info.num_col_, 0);
}
for (auto const& page : m->GetBatches<SparsePage>()) {
page.data.HostVector();
page.offset.HostVector();
ParallelFor(page.Size(), threads, [&](size_t i) {
auto &local_column_sizes = column_sizes.at(omp_get_thread_num());
auto row = page[i];
auto const *p_row = row.data();
for (size_t j = 0; j < row.size(); ++j) {
local_column_sizes.at(p_row[j].index)++;
}
});
}
std::vector<bst_row_t> reduced(info.num_col_, 0);

ParallelFor(info.num_col_, threads, [&](size_t i) {
for (auto const &thread : column_sizes) {
reduced[i] += thread[i];
for (auto const& page : m->GetBatches<SparsePage>()) {
auto const &entries_per_column =
HostSketchContainer::CalcColumnSize(page, info.num_col_, threads);
for (size_t i = 0; i < entries_per_column.size(); ++i) {
reduced[i] += entries_per_column[i];
}
});

}
HostSketchContainer container(reduced, max_bins,
HostSketchContainer::UseGroup(info));
for (auto const &page : m->GetBatches<SparsePage>()) {
Expand Down
225 changes: 179 additions & 46 deletions src/common/quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,67 @@ HostSketchContainer::HostSketchContainer(std::vector<bst_row_t> columns_size,
}
}

std::vector<bst_feature_t> LoadBalance(SparsePage const &page,
std::vector<size_t> columns_size,
size_t const nthreads) {
/* Some sparse datasets have their mass concentrating on small
* number of features. To avoid wating for a few threads running
* forever, we here distirbute different number of columns to
* different threads according to number of entries. */
size_t const total_entries = page.data.Size();
std::vector<bst_row_t>
HostSketchContainer::CalcColumnSize(SparsePage const &batch,
bst_feature_t const n_columns,
size_t const nthreads) {
auto page = batch.GetView();
std::vector<std::vector<bst_row_t>> column_sizes(nthreads);
for (auto &column : column_sizes) {
column.resize(n_columns, 0);
}

ParallelFor(page.Size(), nthreads, [&](size_t i) {
auto &local_column_sizes = column_sizes.at(omp_get_thread_num());
auto row = page[i];
auto const *p_row = row.data();
for (size_t j = 0; j < row.size(); ++j) {
local_column_sizes.at(p_row[j].index)++;
}
});
std::vector<bst_row_t> entries_per_columns(n_columns, 0);
ParallelFor(n_columns, nthreads, [&](size_t i) {
for (auto const &thread : column_sizes) {
entries_per_columns[i] += thread[i];
}
});
return entries_per_columns;
}

std::vector<bst_feature_t> HostSketchContainer::LoadBalance(
SparsePage const &batch, bst_feature_t n_columns, size_t const nthreads) {
/* Some sparse datasets have their mass concentrating on small number of features. To
* avoid wating for a few threads running forever, we here distirbute different number
* of columns to different threads according to number of entries.
*/
auto page = batch.GetView();
size_t const total_entries = page.data.size();
size_t const entries_per_thread = common::DivRoundUp(total_entries, nthreads);

std::vector<bst_feature_t> cols_ptr(nthreads+1, 0);
std::vector<std::vector<bst_row_t>> column_sizes(nthreads);
for (auto& column : column_sizes) {
column.resize(n_columns, 0);
}
std::vector<bst_row_t> entries_per_columns =
CalcColumnSize(batch, n_columns, nthreads);
std::vector<bst_feature_t> cols_ptr(nthreads + 1, 0);
size_t count {0};
size_t current_thread {1};

for (auto col : columns_size) {
cols_ptr[current_thread]++; // add one column to thread
for (auto col : entries_per_columns) {
cols_ptr.at(current_thread)++; // add one column to thread
count += col;
if (count > entries_per_thread + 1) {
CHECK_LE(count, total_entries);
if (count > entries_per_thread) {
current_thread++;
count = 0;
cols_ptr[current_thread] = cols_ptr[current_thread-1];
cols_ptr.at(current_thread) = cols_ptr[current_thread-1];
}
}
// Idle threads.
for (; current_thread < cols_ptr.size() - 1; ++current_thread) {
cols_ptr[current_thread+1] = cols_ptr[current_thread];
}

return cols_ptr;
}

Expand All @@ -67,11 +100,10 @@ void HostSketchContainer::PushRowPage(SparsePage const &page,
// Use group index for weights?
auto batch = page.GetView();
dmlc::OMPException exec;
// Parallel over columns. Asumming the data is dense, each thread owns a set of
// consecutive columns.
// Parallel over columns. Each thread owns a set of consecutive columns.
auto const ncol = static_cast<uint32_t>(info.num_col_);
auto const is_dense = info.num_nonzero_ == info.num_col_ * info.num_row_;
auto thread_columns_ptr = LoadBalance(page, columns_size_, nthread);
auto thread_columns_ptr = LoadBalance(page, info.num_col_, nthread);

#pragma omp parallel num_threads(nthread)
{
Expand Down Expand Up @@ -112,58 +144,158 @@ void HostSketchContainer::PushRowPage(SparsePage const &page,
monitor_.Stop(__func__);
}

void AddCutPoint(WQuantileSketch<float, float>::SummaryContainer const &summary,
int max_bin, HistogramCuts *cuts) {
size_t required_cuts = std::min(summary.size, static_cast<size_t>(max_bin));
auto& cut_values = cuts->cut_values_.HostVector();
for (size_t i = 1; i < required_cuts; ++i) {
bst_float cpt = summary.data[i].value;
if (i == 1 || cpt > cuts->cut_values_.ConstHostVector().back()) {
cut_values.push_back(cpt);
}
void HostSketchContainer::GatherSketchInfo(
std::vector<WQSketch::SummaryContainer> const &reduced,
std::vector<size_t> *p_worker_segments,
std::vector<bst_row_t> *p_sketches_scan,
std::vector<WQSketch::Entry> *p_global_sketches) {
auto& worker_segments = *p_worker_segments;
worker_segments.resize(1, 0);
auto world = rabit::GetWorldSize();
auto rank = rabit::GetRank();
auto n_columns = sketches_.size();

std::vector<bst_row_t> sketch_size;
for (auto const& sketch : reduced) {
sketch_size.push_back(sketch.size);
}
std::vector<bst_row_t>& sketches_scan = *p_sketches_scan;
sketches_scan.resize((n_columns + 1) * world, 0);
size_t beg_scan = rank * (n_columns + 1);
std::partial_sum(sketch_size.cbegin(), sketch_size.cend(),
sketches_scan.begin() + beg_scan + 1);
// Gather all column pointers
rabit::Allreduce<rabit::op::Sum>(sketches_scan.data(), sketches_scan.size());

for (int32_t i = 0; i < world; ++i) {
size_t back = (i + 1) * (n_columns + 1) - 1;
auto n_entries = sketches_scan.at(back);
worker_segments.push_back(n_entries);
}
// Offset of sketch from each worker.
std::partial_sum(worker_segments.begin(), worker_segments.end(),
worker_segments.begin());
CHECK_GE(worker_segments.size(), 1);
auto total = worker_segments.back();

auto& global_sketches = *p_global_sketches;
global_sketches.resize(total, WQSketch::Entry{0, 0, 0, 0});
auto worker_sketch = Span<WQSketch::Entry>{global_sketches}.subspan(
worker_segments[rank], worker_segments[rank + 1] - worker_segments[rank]);
size_t cursor = 0;
for (auto const &sketch : reduced) {
std::copy(sketch.data, sketch.data + sketch.size,
worker_sketch.begin() + cursor);
cursor += sketch.size;
}

static_assert(sizeof(WQSketch::Entry) / 4 == sizeof(float), "");
rabit::Allreduce<rabit::op::Sum>(
reinterpret_cast<float *>(global_sketches.data()),
global_sketches.size() * sizeof(WQSketch::Entry) / sizeof(float));
}

void HostSketchContainer::MakeCuts(HistogramCuts* cuts) {
void HostSketchContainer::AllReduce(
std::vector<WQSketch::SummaryContainer> *p_reduced,
std::vector<int32_t>* p_num_cuts) {
monitor_.Start(__func__);
rabit::Allreduce<rabit::op::Sum>(columns_size_.data(), columns_size_.size());
std::vector<WQSketch::SummaryContainer> reduced(sketches_.size());
std::vector<int32_t> num_cuts;
size_t nbytes = 0;
auto& num_cuts = *p_num_cuts;
CHECK_EQ(num_cuts.size(), 0);
auto &reduced = *p_reduced;
reduced.resize(sketches_.size());

size_t n_columns = sketches_.size();
rabit::Allreduce<rabit::op::Max>(&n_columns, 1);
CHECK_EQ(n_columns, sketches_.size()) << "Number of columns differs across workers";

// Prune the intermediate num cuts for synchronization.
std::vector<bst_row_t> global_column_size(columns_size_);
rabit::Allreduce<rabit::op::Sum>(global_column_size.data(), global_column_size.size());

size_t nbytes = 0;
for (size_t i = 0; i < sketches_.size(); ++i) {
int32_t intermediate_num_cuts = static_cast<int32_t>(std::min(
columns_size_[i], static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
if (columns_size_[i] != 0) {
global_column_size[i], static_cast<size_t>(max_bins_ * WQSketch::kFactor)));
if (global_column_size[i] != 0) {
WQSketch::SummaryContainer out;
sketches_[i].GetSummary(&out);
reduced[i].Reserve(intermediate_num_cuts);
CHECK(reduced[i].data);
reduced[i].SetPrune(out, intermediate_num_cuts);
nbytes = std::max(
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts),
nbytes);
}

num_cuts.push_back(intermediate_num_cuts);
nbytes = std::max(
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts), nbytes);
}
auto world = rabit::GetWorldSize();
if (world == 1) {
return;
}

std::vector<size_t> worker_segments(1, 0); // CSC pointer to sketches.
std::vector<bst_row_t> sketches_scan((n_columns + 1) * world, 0);

std::vector<WQSketch::Entry> global_sketches;
this->GatherSketchInfo(reduced, &worker_segments, &sketches_scan,
&global_sketches);

std::vector<WQSketch::SummaryContainer> final_sketches(n_columns);
ParallelFor(n_columns, omp_get_max_threads(), [&](size_t fidx) {
int32_t intermediate_num_cuts = num_cuts[fidx];
auto nbytes =
WQSketch::SummaryContainer::CalcMemCost(intermediate_num_cuts);

for (int32_t i = 1; i < world + 1; ++i) {
auto size = worker_segments.at(i) - worker_segments[i - 1];
auto worker_sketches = Span<WQSketch::Entry>{global_sketches}.subspan(
worker_segments[i - 1], size);
auto worker_scan =
Span<bst_row_t>(sketches_scan)
.subspan((i - 1) * (n_columns + 1), (n_columns + 1));

auto worker_feature = worker_sketches.subspan(
worker_scan[fidx], worker_scan[fidx + 1] - worker_scan[fidx]);
CHECK(worker_feature.data());
WQSummary<float, float> summary(worker_feature.data(),
worker_feature.size());
auto &out = final_sketches.at(fidx);
out.Reduce(summary, nbytes);
}

reduced.at(fidx).Reserve(intermediate_num_cuts);
reduced.at(fidx).SetPrune(final_sketches.at(fidx), intermediate_num_cuts);
});
monitor_.Stop(__func__);
}

if (rabit::IsDistributed()) {
// FIXME(trivialfis): This call will allocate nbytes * num_columns on rabit, which
// may generate oom error when data is sparse. To fix it, we need to:
// - gather the column offsets over all workers.
// - run rabit::allgather on sketch data to collect all data.
// - merge all gathered sketches based on worker offsets and column offsets of data
// from each worker.
// See GPU implementation for details.
rabit::SerializeReducer<WQSketch::SummaryContainer> sreducer;
sreducer.Allreduce(dmlc::BeginPtr(reduced), nbytes, reduced.size());
void AddCutPoint(WQuantileSketch<float, float>::SummaryContainer const &summary,
int max_bin, HistogramCuts *cuts) {
size_t required_cuts = std::min(summary.size, static_cast<size_t>(max_bin));
auto& cut_values = cuts->cut_values_.HostVector();
for (size_t i = 1; i < required_cuts; ++i) {
bst_float cpt = summary.data[i].value;
if (i == 1 || cpt > cuts->cut_values_.ConstHostVector().back()) {
cut_values.push_back(cpt);
}
}
}

void HostSketchContainer::MakeCuts(HistogramCuts* cuts) {
monitor_.Start(__func__);
std::vector<WQSketch::SummaryContainer> reduced;
std::vector<int32_t> num_cuts;
this->AllReduce(&reduced, &num_cuts);

cuts->min_vals_.HostVector().resize(sketches_.size(), 0.0f);

for (size_t fid = 0; fid < reduced.size(); ++fid) {
WQSketch::SummaryContainer a;
size_t max_num_bins = std::min(num_cuts[fid], max_bins_);
a.Reserve(max_num_bins + 1);
CHECK(a.data);
if (columns_size_[fid] != 0) {
if (num_cuts[fid] != 0) {
a.SetPrune(reduced[fid], max_num_bins + 1);
CHECK(a.data && reduced[fid].data);
const bst_float mval = a.data[0].value;
Expand All @@ -173,6 +305,7 @@ void HostSketchContainer::MakeCuts(HistogramCuts* cuts) {
const float mval = 1e-5f;
cuts->min_vals_.HostVector()[fid] = mval;
}

AddCutPoint(a, max_num_bins, cuts);
// push a value that is greater than anything
const bst_float cpt
Expand Down
26 changes: 26 additions & 0 deletions src/common/quantile.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ struct WQSummary {
* \param src source sketch
*/
inline void CopyFrom(const WQSummary &src) {
if (!src.data) {
CHECK_EQ(src.size, 0);
size = 0;
return;
}
if (!data) {
CHECK_EQ(this->size, 0);
CHECK_EQ(src.size, 0);
return;
}
size = src.size;
std::memcpy(data, src.data, sizeof(Entry) * size);
}
Expand Down Expand Up @@ -721,6 +731,14 @@ class HostSketchContainer {
return use_group_ind;
}

static std::vector<bst_row_t> CalcColumnSize(SparsePage const &page,
bst_feature_t const n_columns,
size_t const nthreads);

static std::vector<bst_feature_t> LoadBalance(SparsePage const &page,
bst_feature_t n_columns,
size_t const nthreads);

static uint32_t SearchGroupIndFromRow(std::vector<bst_uint> const &group_ptr,
size_t const base_rowid) {
CHECK_LT(base_rowid, group_ptr.back())
Expand All @@ -730,6 +748,14 @@ class HostSketchContainer {
group_ptr.cbegin() - 1;
return group_ind;
}
// Gather sketches from all workers.
void GatherSketchInfo(std::vector<WQSketch::SummaryContainer> const &reduced,
std::vector<bst_row_t> *p_worker_segments,
std::vector<bst_row_t> *p_sketches_scan,
std::vector<WQSketch::Entry> *p_global_sketches);
// Merge sketches from all workers.
void AllReduce(std::vector<WQSketch::SummaryContainer> *p_reduced,
std::vector<int32_t>* p_num_cuts);

/* \brief Push a CSR matrix. */
void PushRowPage(SparsePage const& page, MetaInfo const& info);
Expand Down
10 changes: 5 additions & 5 deletions tests/cpp/c_api/test_c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ TEST(CAPI, XGDMatrixCreateFromMatDT) {
std::shared_ptr<xgboost::DMatrix> *dmat =
static_cast<std::shared_ptr<xgboost::DMatrix> *>(handle);
xgboost::MetaInfo &info = (*dmat)->Info();
ASSERT_EQ(info.num_col_, 2);
ASSERT_EQ(info.num_row_, 3);
ASSERT_EQ(info.num_nonzero_, 6);
ASSERT_EQ(info.num_col_, 2ul);
ASSERT_EQ(info.num_row_, 3ul);
ASSERT_EQ(info.num_nonzero_, 6ul);

for (const auto &batch : (*dmat)->GetBatches<xgboost::SparsePage>()) {
ASSERT_EQ(batch[0][0].fvalue, 0.0f);
Expand All @@ -38,9 +38,9 @@ TEST(CAPI, XGDMatrixCreateFromMatDT) {
}

TEST(CAPI, XGDMatrixCreateFromMatOmp) {
std::vector<int> num_rows = {100, 11374, 15000};
std::vector<bst_ulong> num_rows = {100, 11374, 15000};
for (auto row : num_rows) {
int num_cols = 50;
bst_ulong num_cols = 50;
int num_missing = 5;
DMatrixHandle handle;
std::vector<float> data(num_cols * row, 1.5);
Expand Down
Loading

0 comments on commit 29b7fea

Please sign in to comment.