Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GK sketching on GPU. #5846

Merged
merged 15 commits into from
Jul 7, 2020
1 change: 1 addition & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def TestPythonGPU(args) {
nodeReq = (args.multi_gpu) ? 'linux && mgpu' : 'linux && gpu'
node(nodeReq) {
unstash name: 'xgboost_whl_cuda10'
unstash name: 'xgboost_cpp_tests'
unstash name: 'srcs'
echo "Test Python GPU: CUDA ${args.cuda_version}"
def container_type = "gpu"
Expand Down
9 changes: 4 additions & 5 deletions include/xgboost/span.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,18 +573,17 @@ class Span {
XGBOOST_DEVICE auto subspan() const -> // NOLINT
Span<element_type,
detail::ExtentValue<Extent, Offset, Count>::value> {
SPAN_CHECK(Offset < size() || size() == 0);
SPAN_CHECK(Count == dynamic_extent || (Offset + Count <= size()));
SPAN_CHECK((Count == dynamic_extent) ?
(Offset <= size()) : (Offset + Count <= size()));

return {data() + Offset, Count == dynamic_extent ? size() - Offset : Count};
}

XGBOOST_DEVICE Span<element_type, dynamic_extent> subspan( // NOLINT
index_type _offset,
index_type _count = dynamic_extent) const {
SPAN_CHECK(_offset < size() || size() == 0);
SPAN_CHECK((_count == dynamic_extent) || (_offset + _count <= size()));

SPAN_CHECK((_count == dynamic_extent) ?
(_offset <= size()) : (_offset + _count <= size()));
return {data() + _offset, _count ==
dynamic_extent ? size() - _offset : _count};
}
Expand Down
27 changes: 27 additions & 0 deletions src/common/device_helpers.cu
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ void AllReducer::Init(int _device_ordinal) {
#endif // XGBOOST_USE_NCCL
}

void AllReducer::AllGather(void const *data, size_t length_bytes,
std::vector<size_t> *segments,
dh::caching_device_vector<char> *recvbuf) {
#ifdef XGBOOST_USE_NCCL
CHECK(initialised_);
dh::safe_cuda(cudaSetDevice(device_ordinal_));
size_t world = rabit::GetWorldSize();
segments->clear();
segments->resize(world, 0);
segments->at(rabit::GetRank()) = length_bytes;
rabit::Allreduce<rabit::op::Max>(segments->data(), segments->size());
auto total_bytes = std::accumulate(segments->cbegin(), segments->cend(), 0);
recvbuf->resize(total_bytes);

size_t offset = 0;
safe_nccl(ncclGroupStart());
for (int32_t i = 0; i < world; ++i) {
size_t as_bytes = segments->at(i);
safe_nccl(
ncclBroadcast(data, recvbuf->data().get() + offset,
as_bytes, ncclChar, i, comm_, stream_));
offset += as_bytes;
}
safe_nccl(ncclGroupEnd());
#endif // XGBOOST_USE_NCCL
}

AllReducer::~AllReducer() {
#ifdef XGBOOST_USE_NCCL
if (initialised_) {
Expand Down
177 changes: 173 additions & 4 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
#include <thrust/device_ptr.h>
#include <thrust/device_vector.h>
#include <thrust/device_malloc_allocator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/system/cuda/error.h>
#include <thrust/system_error.h>
#include <thrust/execution_policy.h>

#include <thrust/transform_scan.h>
#include <thrust/logical.h>
#include <thrust/gather.h>
#include <thrust/unique.h>
#include <thrust/binary_search.h>

#include <rabit/rabit.h>
Expand Down Expand Up @@ -53,6 +59,36 @@ __device__ __forceinline__ double atomicAdd(double* address, double val) { // N
}
#endif

namespace dh {
namespace detail {
template <size_t size>
struct AtomicDispatcher;

template <>
struct AtomicDispatcher<sizeof(uint32_t)> {
using Type = unsigned int; // NOLINT
static_assert(sizeof(Type) == sizeof(uint32_t), "Unsigned should be of size 32 bits.");
};

template <>
struct AtomicDispatcher<sizeof(uint64_t)> {
using Type = unsigned long long; // NOLINT
static_assert(sizeof(Type) == sizeof(uint64_t), "Unsigned long long should be of size 64 bits.");
};
} // namespace detail
} // namespace dh

// atomicAdd is not defined for size_t.
template <typename T = size_t,
std::enable_if_t<std::is_same<size_t, T>::value &&
!std::is_same<size_t, unsigned long long>::value> * = // NOLINT
nullptr>
T __device__ __forceinline__ atomicAdd(T *addr, T v) { // NOLINT
using Type = typename dh::detail::AtomicDispatcher<sizeof(T)>::Type;
Type ret = ::atomicAdd(reinterpret_cast<Type *>(addr), static_cast<Type>(v));
return static_cast<T>(ret);
}

namespace dh {

#define HOST_DEV_INLINE XGBOOST_DEVICE __forceinline__
Expand Down Expand Up @@ -291,10 +327,12 @@ public:
safe_cuda(cudaGetDevice(&current_device));
stats_.RegisterDeallocation(ptr, n, current_device);
}
size_t PeakMemory()
{
size_t PeakMemory() const {
return stats_.peak_allocated_bytes;
}
size_t CurrentlyAllocatedBytes() const {
return stats_.currently_allocated_bytes;
}
void Clear()
{
stats_ = DeviceStats();
Expand Down Expand Up @@ -529,7 +567,6 @@ class AllReducer {
bool initialised_ {false};
size_t allreduce_bytes_ {0}; // Keep statistics of the number of bytes communicated
size_t allreduce_calls_ {0}; // Keep statistics of the number of reduce calls
std::vector<size_t> host_data_; // Used for all reduce on host
#ifdef XGBOOST_USE_NCCL
ncclComm_t comm_;
cudaStream_t stream_;
Expand Down Expand Up @@ -569,6 +606,27 @@ class AllReducer {
#endif
}

/**
* \brief Allgather implemented as grouped calls to Broadcast. This way we can accept
* different size of data on different workers.
* \param length_bytes Size of input data in bytes.
* \param segments Size of data on each worker.
* \param recvbuf Buffer storing the result of data from all workers.
*/
void AllGather(void const* data, size_t length_bytes,
std::vector<size_t>* segments, dh::caching_device_vector<char>* recvbuf);

void AllGather(uint32_t const* data, size_t length,
dh::caching_device_vector<uint32_t>* recvbuf) {
#ifdef XGBOOST_USE_NCCL
CHECK(initialised_);
size_t world = rabit::GetWorldSize();
recvbuf->resize(length * world);
safe_nccl(ncclAllGather(data, recvbuf->data().get(), length, ncclUint32,
comm_, stream_));
#endif // XGBOOST_USE_NCCL
}

/**
* \brief Allreduce. Use in exactly the same way as NCCL but without needing
* streams or comms.
Expand Down Expand Up @@ -607,6 +665,40 @@ class AllReducer {
#endif
}

void AllReduceSum(const uint32_t *sendbuff, uint32_t *recvbuff, int count) {
#ifdef XGBOOST_USE_NCCL
CHECK(initialised_);

dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_nccl(ncclAllReduce(sendbuff, recvbuff, count, ncclUint32, ncclSum, comm_, stream_));
#endif
}

void AllReduceSum(const uint64_t *sendbuff, uint64_t *recvbuff, int count) {
#ifdef XGBOOST_USE_NCCL
CHECK(initialised_);

dh::safe_cuda(cudaSetDevice(device_ordinal_));
dh::safe_nccl(ncclAllReduce(sendbuff, recvbuff, count, ncclUint64, ncclSum, comm_, stream_));
#endif
}

// Specialization for size_t, which is implementation defined so it might or might not
// be one of uint64_t/uint32_t/unsigned long long/unsigned long.
template <typename T = size_t,
std::enable_if_t<std::is_same<size_t, T>::value &&
!std::is_same<size_t, unsigned long long>::value> // NOLINT
* = nullptr>
void AllReduceSum(const T *sendbuff, T *recvbuff, int count) { // NOLINT
#ifdef XGBOOST_USE_NCCL
CHECK(initialised_);

dh::safe_cuda(cudaSetDevice(device_ordinal_));
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), ""); // NOLINT
dh::safe_nccl(ncclAllReduce(sendbuff, recvbuff, count, ncclUint64, ncclSum, comm_, stream_));
#endif
}

/**
* \fn void Synchronize()
*
Expand Down Expand Up @@ -886,9 +978,86 @@ DEV_INLINE void AtomicAddGpair(OutputGradientT* dest,

// Thrust version of this function causes error on Windows
template <typename ReturnT, typename IterT, typename FuncT>
thrust::transform_iterator<FuncT, IterT, ReturnT> MakeTransformIterator(
XGBOOST_DEVICE thrust::transform_iterator<FuncT, IterT, ReturnT> MakeTransformIterator(
IterT iter, FuncT func) {
return thrust::transform_iterator<FuncT, IterT, ReturnT>(iter, func);
}

template <typename It>
size_t XGBOOST_DEVICE SegmentId(It first, It last, size_t idx) {
size_t segment_id = thrust::upper_bound(thrust::seq, first, last, idx) -
1 - first;
return segment_id;
}

template <typename T>
size_t XGBOOST_DEVICE SegmentId(xgboost::common::Span<T> segments_ptr, size_t idx) {
return SegmentId(segments_ptr.cbegin(), segments_ptr.cend(), idx);
}

namespace detail {
template <typename Key, typename KeyOutIt>
struct SegmentedUniqueReduceOp {
KeyOutIt key_out;
__device__ Key const& operator()(Key const& key) const {
auto constexpr kOne = static_cast<std::remove_reference_t<decltype(*(key_out + key.first))>>(1);
atomicAdd(&(*(key_out + key.first)), kOne);
return key;
}
};
} // namespace detail

/* \brief Segmented unique function. Keys are pointers to segments with key_segments_last -
* key_segments_first = n_segments + 1.
*
* \pre Input segment and output segment must not overlap.
*
* \param key_segments_first Beginning iterator of segments.
* \param key_segments_last End iterator of segments.
* \param val_first Beginning iterator of values.
* \param val_last End iterator of values.
* \param key_segments_out Output iterator of segments.
* \param val_out Output iterator of values.
*
* \return Number of unique values in total.
*/
template <typename KeyInIt, typename KeyOutIt, typename ValInIt,
typename ValOutIt, typename Comp>
size_t
SegmentedUnique(KeyInIt key_segments_first, KeyInIt key_segments_last, ValInIt val_first,
ValInIt val_last, KeyOutIt key_segments_out, ValOutIt val_out,
Comp comp) {
using Key = thrust::pair<size_t, typename thrust::iterator_traits<ValInIt>::value_type>;
dh::XGBCachingDeviceAllocator<char> alloc;
auto unique_key_it = dh::MakeTransformIterator<Key>(
thrust::make_counting_iterator(static_cast<size_t>(0)),
[=] __device__(size_t i) {
size_t seg = dh::SegmentId(key_segments_first, key_segments_last, i);
return thrust::make_pair(seg, *(val_first + i));
});
size_t segments_len = key_segments_last - key_segments_first;
thrust::fill(thrust::device, key_segments_out, key_segments_out + segments_len, 0);
size_t n_inputs = std::distance(val_first, val_last);
// Reduce the number of uniques elements per segment, avoid creating an intermediate
// array for `reduce_by_key`. It's limited by the types that atomicAdd supports. For
// example, size_t is not supported as of CUDA 10.2.
auto reduce_it = thrust::make_transform_output_iterator(
thrust::make_discard_iterator(),
detail::SegmentedUniqueReduceOp<Key, KeyOutIt>{key_segments_out});
auto uniques_ret = thrust::unique_by_key_copy(
thrust::cuda::par(alloc), unique_key_it, unique_key_it + n_inputs,
val_first, reduce_it, val_out,
[=] __device__(Key const &l, Key const &r) {
if (l.first == r.first) {
// In the same segment.
return comp(l.second, r.second);
}
return false;
});
auto n_uniques = uniques_ret.second - val_out;
CHECK_LE(n_uniques, n_inputs);
thrust::exclusive_scan(thrust::cuda::par(alloc), key_segments_out,
key_segments_out + segments_len, key_segments_out, 0);
return n_uniques;
}
} // namespace dh
10 changes: 4 additions & 6 deletions src/common/hist_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ void SparseCuts::SingleThreadBuild(SparsePage const& page, MetaInfo const& info,
uint32_t beg_col, uint32_t end_col,
uint32_t thread_id) {
CHECK_GE(end_col, beg_col);
constexpr float kFactor = 8;

// Data groups, used in ranking.
std::vector<bst_uint> const& group_ptr = info.group_ptr_;
Expand All @@ -175,11 +174,12 @@ void SparseCuts::SingleThreadBuild(SparsePage const& page, MetaInfo const& info,
max_num_bins);
if (n_bins == 0) {
// cut_ptrs_ is initialized with a zero, so there's always an element at the back
CHECK_GE(local_ptrs.size(), 1);
local_ptrs.emplace_back(local_ptrs.back());
continue;
}

sketch.Init(info.num_row_, 1.0 / (n_bins * kFactor));
sketch.Init(info.num_row_, 1.0 / (n_bins * WQSketch::kFactor));
for (auto const& entry : column) {
uint32_t weight_ind = 0;
if (use_group_ind) {
Expand Down Expand Up @@ -329,7 +329,6 @@ void DenseCuts::Build(DMatrix* p_fmat, uint32_t max_num_bins) {
const MetaInfo& info = p_fmat->Info();

// safe factor for better accuracy
constexpr int kFactor = 8;
std::vector<WQSketch> sketchs;

const int nthread = omp_get_max_threads();
Expand All @@ -339,7 +338,7 @@ void DenseCuts::Build(DMatrix* p_fmat, uint32_t max_num_bins) {
unsigned const ncol = static_cast<unsigned>(info.num_col_);
sketchs.resize(info.num_col_);
for (auto& s : sketchs) {
s.Init(info.num_row_, 1.0 / (max_num_bins * kFactor));
s.Init(info.num_row_, 1.0 / (max_num_bins * WQSketch::kFactor));
}

// Data groups, used in ranking.
Expand Down Expand Up @@ -410,9 +409,8 @@ void DenseCuts::Init
// This allows efficient training on wide data
size_t global_max_rows = max_rows;
rabit::Allreduce<rabit::op::Sum>(&global_max_rows, 1);
constexpr int kFactor = 8;
size_t intermediate_num_cuts =
std::min(global_max_rows, static_cast<size_t>(max_num_bins * kFactor));
std::min(global_max_rows, static_cast<size_t>(max_num_bins * WQSketch::kFactor));
// gather the histogram data
rabit::SerializeReducer<WQSketch::SummaryContainer> sreducer;
std::vector<WQSketch::SummaryContainer> summary_array;
Expand Down
Loading