Skip to content

Commit

Permalink
Avoid omp reduction in coordinate descent and aft metrics. (#7316)
Browse files Browse the repository at this point in the history
Aside from the omp issue, parameter configuration for aft metric is simplified.
  • Loading branch information
trivialfis authored Oct 17, 2021
1 parent f56e2e9 commit fb1a9e6
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 71 deletions.
37 changes: 21 additions & 16 deletions src/linear/coordinate_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,32 @@ inline std::pair<double, double> GetGradient(int group_idx, int num_group, int f
*
* \return The gradient and diagonal Hessian entry for a given feature.
*/
inline std::pair<double, double> GetGradientParallel(int group_idx, int num_group, int fidx,
const std::vector<GradientPair> &gpair,
DMatrix *p_fmat) {
double sum_grad = 0.0, sum_hess = 0.0;
inline std::pair<double, double>
GetGradientParallel(GenericParameter const *ctx, int group_idx, int num_group,
int fidx, const std::vector<GradientPair> &gpair,
DMatrix *p_fmat) {
std::vector<double> sum_grad_tloc(ctx->Threads(), 0.0);
std::vector<double> sum_hess_tloc(ctx->Threads(), 0.0);

for (const auto &batch : p_fmat->GetBatches<CSCPage>()) {
auto page = batch.GetView();
auto col = page[fidx];
const auto ndata = static_cast<bst_omp_uint>(col.size());
dmlc::OMPException exc;
#pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess)
for (bst_omp_uint j = 0; j < ndata; ++j) {
exc.Run([&]() {
const bst_float v = col[j].fvalue;
auto &p = gpair[col[j].index * num_group + group_idx];
if (p.GetHess() < 0.0f) return;
sum_grad += p.GetGrad() * v;
sum_hess += p.GetHess() * v * v;
});
}
exc.Rethrow();
common::ParallelFor(ndata, ctx->Threads(), [&](size_t j) {
const bst_float v = col[j].fvalue;
auto &p = gpair[col[j].index * num_group + group_idx];
if (p.GetHess() < 0.0f) {
return;
}
auto t_idx = omp_get_thread_num();
sum_grad_tloc[t_idx] += p.GetGrad() * v;
sum_hess_tloc[t_idx] += p.GetHess() * v * v;
});
}
double sum_grad =
std::accumulate(sum_grad_tloc.cbegin(), sum_grad_tloc.cend(), 0.0);
double sum_hess =
std::accumulate(sum_hess_tloc.cbegin(), sum_hess_tloc.cend(), 0.0);
return std::make_pair(sum_grad, sum_hess);
}

Expand Down
4 changes: 2 additions & 2 deletions src/linear/updater_coordinate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class CoordinateUpdater : public LinearUpdater {
DMatrix *p_fmat, gbm::GBLinearModel *model) {
const int ngroup = model->learner_model_param->num_output_group;
bst_float &w = (*model)[fidx][group_idx];
auto gradient =
GetGradientParallel(group_idx, ngroup, fidx, *in_gpair, p_fmat);
auto gradient = GetGradientParallel(learner_param_, group_idx, ngroup, fidx,
*in_gpair, p_fmat);
auto dw = static_cast<float>(
tparam_.learning_rate *
CoordinateDelta(gradient.first, gradient.second, w, tparam_.reg_alpha_denorm,
Expand Down
104 changes: 51 additions & 53 deletions src/metric/survival_metric.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "metric_common.h"
#include "../common/math.h"
#include "../common/survival_util.h"
#include "../common/threading_utils.h"

#if defined(XGBOOST_USE_CUDA)
#include <thrust/execution_policy.h> // thrust::cuda::par
Expand All @@ -42,11 +43,12 @@ class ElementWiseSurvivalMetricsReduction {
policy_ = policy;
}

PackedReduceResult CpuReduceMetrics(
const HostDeviceVector<bst_float>& weights,
const HostDeviceVector<bst_float>& labels_lower_bound,
const HostDeviceVector<bst_float>& labels_upper_bound,
const HostDeviceVector<bst_float>& preds) const {
PackedReduceResult
CpuReduceMetrics(const HostDeviceVector<bst_float> &weights,
const HostDeviceVector<bst_float> &labels_lower_bound,
const HostDeviceVector<bst_float> &labels_upper_bound,
const HostDeviceVector<bst_float> &preds,
int32_t n_threads) const {
size_t ndata = labels_lower_bound.Size();
CHECK_EQ(ndata, labels_upper_bound.Size());

Expand All @@ -55,22 +57,24 @@ class ElementWiseSurvivalMetricsReduction {
const auto& h_weights = weights.HostVector();
const auto& h_preds = preds.HostVector();

double residue_sum = 0;
double weights_sum = 0;

dmlc::OMPException exc;
#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static)
for (omp_ulong i = 0; i < ndata; ++i) {
exc.Run([&]() {
const double wt = h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
residue_sum += policy_.EvalRow(
static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) * wt;
weights_sum += wt;
});
}
exc.Rethrow();
std::vector<double> score_tloc(n_threads, 0.0);
std::vector<double> weight_tloc(n_threads, 0.0);

common::ParallelFor(ndata, n_threads, [&](size_t i) {
const double wt =
h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
auto t_idx = omp_get_thread_num();
score_tloc[t_idx] +=
policy_.EvalRow(static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) *
wt;
weight_tloc[t_idx] += wt;
});

double residue_sum = std::accumulate(score_tloc.cbegin(), score_tloc.cend(), 0.0);
double weights_sum = std::accumulate(weight_tloc.cbegin(), weight_tloc.cend(), 0.0);

PackedReduceResult res{residue_sum, weights_sum};
return res;
}
Expand Down Expand Up @@ -119,25 +123,25 @@ class ElementWiseSurvivalMetricsReduction {
#endif // XGBOOST_USE_CUDA

PackedReduceResult Reduce(
int device,
const GenericParameter &ctx,
const HostDeviceVector<bst_float>& weights,
const HostDeviceVector<bst_float>& labels_lower_bound,
const HostDeviceVector<bst_float>& labels_upper_bound,
const HostDeviceVector<bst_float>& preds) {
PackedReduceResult result;

if (device < 0) {
result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds);
if (ctx.gpu_id < 0) {
result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound,
preds, ctx.Threads());
}
#if defined(XGBOOST_USE_CUDA)
else { // NOLINT
device_ = device;
preds.SetDevice(device_);
labels_lower_bound.SetDevice(device_);
labels_upper_bound.SetDevice(device_);
weights.SetDevice(device_);
preds.SetDevice(ctx.gpu_id);
labels_lower_bound.SetDevice(ctx.gpu_id);
labels_upper_bound.SetDevice(ctx.gpu_id);
weights.SetDevice(ctx.gpu_id);

dh::safe_cuda(cudaSetDevice(device_));
dh::safe_cuda(cudaSetDevice(ctx.gpu_id));
result = DeviceReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds);
}
#endif // defined(XGBOOST_USE_CUDA)
Expand All @@ -146,9 +150,6 @@ class ElementWiseSurvivalMetricsReduction {

private:
EvalRow policy_;
#if defined(XGBOOST_USE_CUDA)
int device_{-1};
#endif // defined(XGBOOST_USE_CUDA)
};

struct EvalIntervalRegressionAccuracy {
Expand Down Expand Up @@ -193,28 +194,27 @@ struct EvalAFTNLogLik {
AFTParam param_;
};

template<typename Policy>
struct EvalEWiseSurvivalBase : public Metric {
template <typename Policy> struct EvalEWiseSurvivalBase : public Metric {
explicit EvalEWiseSurvivalBase(GenericParameter const *ctx) {
tparam_ = ctx;
}
EvalEWiseSurvivalBase() = default;

void Configure(const Args& args) override {
policy_.Configure(args);
for (const auto& e : args) {
if (e.first == "gpu_id") {
device_ = dmlc::ParseSignedInt<int>(e.second.c_str(), nullptr, 10);
}
}
reducer_.Configure(policy_);
CHECK(tparam_);
}

bst_float Eval(const HostDeviceVector<bst_float>& preds,
const MetaInfo& info,
bool distributed) override {
CHECK_EQ(preds.Size(), info.labels_lower_bound_.Size());
CHECK_EQ(preds.Size(), info.labels_upper_bound_.Size());

auto result = reducer_.Reduce(
device_, info.weights_, info.labels_lower_bound_, info.labels_upper_bound_, preds);
CHECK(tparam_);
auto result =
reducer_.Reduce(*tparam_, info.weights_, info.labels_lower_bound_,
info.labels_upper_bound_, preds);

double dat[2] {result.Residue(), result.Weights()};

Expand Down Expand Up @@ -252,24 +252,22 @@ struct AFTNLogLikDispatcher : public Metric {
param_.UpdateAllowUnknown(args);
switch (param_.aft_loss_distribution) {
case common::ProbabilityDistributionType::kNormal:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::NormalDistribution>>());
metric_.reset(
new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::NormalDistribution>>(
tparam_));
break;
case common::ProbabilityDistributionType::kLogistic:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::LogisticDistribution>>());
metric_.reset(new EvalEWiseSurvivalBase<
EvalAFTNLogLik<common::LogisticDistribution>>(tparam_));
break;
case common::ProbabilityDistributionType::kExtreme:
metric_.reset(new EvalEWiseSurvivalBase<EvalAFTNLogLik<common::ExtremeDistribution>>());
metric_.reset(new EvalEWiseSurvivalBase<
EvalAFTNLogLik<common::ExtremeDistribution>>(tparam_));
break;
default:
LOG(FATAL) << "Unknown probability distribution";
}
Args new_args{args};
// tparam_ doesn't get propagated to the inner metric object because we didn't use
// Metric::Create(). I don't think it's a good idea to pollute the metric registry with
// specialized versions of the AFT metric, so as a work-around, manually pass the GPU ID
// into the inner metric via configuration.
new_args.emplace_back("gpu_id", std::to_string(tparam_->gpu_id));
metric_->Configure(new_args);
metric_->Configure(args);
}

void SaveConfig(Json* p_out) const override {
Expand Down
39 changes: 39 additions & 0 deletions tests/cpp/metric/test_survival_metric.cu
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,41 @@

namespace xgboost {
namespace common {
namespace {
inline void CheckDeterministicMetricElementWise(StringView name, int32_t device) {
auto lparam = CreateEmptyGenericParam(device);
std::unique_ptr<Metric> metric{Metric::Create(name.c_str(), &lparam)};
metric->Configure(Args{});

HostDeviceVector<float> predts;
MetaInfo info;
auto &h_predts = predts.HostVector();

SimpleLCG lcg;
SimpleRealUniformDistribution<float> dist{0.0f, 1.0f};

size_t n_samples = 2048;
h_predts.resize(n_samples);

for (size_t i = 0; i < n_samples; ++i) {
h_predts[i] = dist(&lcg);
}

auto &h_upper = info.labels_upper_bound_.HostVector();
auto &h_lower = info.labels_lower_bound_.HostVector();
h_lower.resize(n_samples);
h_upper.resize(n_samples);
for (size_t i = 0; i < n_samples; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}

auto result = metric->Eval(predts, info, false);
for (size_t i = 0; i < 8; ++i) {
ASSERT_EQ(metric->Eval(predts, info, false), result);
}
}
} // anonymous namespace

TEST(Metric, DeclareUnifiedTest(AFTNegLogLik)) {
auto lparam = xgboost::CreateEmptyGenericParam(GPUIDX);
Expand Down Expand Up @@ -61,6 +96,8 @@ TEST(Metric, DeclareUnifiedTest(IntervalRegressionAccuracy)) {
EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.50f);
info.labels_lower_bound_.HostVector()[0] = 70.0f;
EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.25f);

CheckDeterministicMetricElementWise(StringView{"interval-regression-accuracy"}, GPUIDX);
}

// Test configuration of AFT metric
Expand All @@ -75,6 +112,8 @@ TEST(AFTNegLogLikMetric, DeclareUnifiedTest(Configuration)) {
auto aft_param_json = j_obj["aft_loss_param"];
EXPECT_EQ(get<String>(aft_param_json["aft_loss_distribution"]), "normal");
EXPECT_EQ(get<String>(aft_param_json["aft_loss_distribution_scale"]), "10");

CheckDeterministicMetricElementWise(StringView{"aft-nloglik"}, GPUIDX);
}

} // namespace common
Expand Down

0 comments on commit fb1a9e6

Please sign in to comment.