From df88cf5ffccd8a454f17ba686dcb5ec0d7a045b3 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Fri, 28 Jun 2024 15:40:52 -0500 Subject: [PATCH] Use size_t to allow large conditional joins (#16127) The conditional join kernels were using `cudf::size_type` where `std::size_t` was needed. This PR fixes that bug, which caused `cudaErrorIllegalAddress` as shown in #16115. This closes #16115. I did not add tests because we typically do not test very large workloads. However, I committed the test and reverted it in this PR, so there is a record of my validation code. Authors: - Bradley Dice (https://github.com/bdice) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - https://github.com/nvdbaranec - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/16127 --- cpp/src/join/conditional_join.cu | 5 +- cpp/src/join/conditional_join_kernels.cuh | 124 ++++++++++++++++++++-- cpp/src/join/join_common_utils.cuh | 95 ----------------- 3 files changed, 117 insertions(+), 107 deletions(-) diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu index 97a06d5a923..d4ef2747c9d 100644 --- a/cpp/src/join/conditional_join.cu +++ b/cpp/src/join/conditional_join.cu @@ -95,7 +95,7 @@ std::unique_ptr> conditional_join_anti_semi( join_size = size.value(stream); } - rmm::device_scalar write_index(0, stream); + rmm::device_scalar write_index(0, stream); auto left_indices = std::make_unique>(join_size, stream, mr); @@ -232,13 +232,14 @@ conditional_join(table_view const& left, std::make_unique>(0, stream, mr)); } - rmm::device_scalar write_index(0, stream); + rmm::device_scalar write_index(0, stream); auto left_indices = std::make_unique>(join_size, stream, mr); auto right_indices = std::make_unique>(join_size, stream, mr); auto const& join_output_l = left_indices->data(); auto const& join_output_r = right_indices->data(); + if (has_nulls) { conditional_join <<>>( diff --git a/cpp/src/join/conditional_join_kernels.cuh b/cpp/src/join/conditional_join_kernels.cuh index 1e16c451f5a..62769862f54 100644 --- a/cpp/src/join/conditional_join_kernels.cuh +++ b/cpp/src/join/conditional_join_kernels.cuh @@ -29,6 +29,110 @@ namespace cudf { namespace detail { +/** + * @brief Adds a pair of indices to the shared memory cache + * + * @param[in] first The first index in the pair + * @param[in] second The second index in the pair + * @param[in,out] current_idx_shared Pointer to shared index that determines + * where in the shared memory cache the pair will be written + * @param[in] warp_id The ID of the warp of the calling the thread + * @param[out] joined_shared_l Pointer to the shared memory cache for left indices + * @param[out] joined_shared_r Pointer to the shared memory cache for right indices + */ +__inline__ __device__ void add_pair_to_cache(size_type const first, + size_type const second, + std::size_t* current_idx_shared, + int const warp_id, + size_type* joined_shared_l, + size_type* joined_shared_r) +{ + cuda::atomic_ref ref{*(current_idx_shared + warp_id)}; + std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed); + // It's guaranteed to fit into the shared cache + joined_shared_l[my_current_idx] = first; + joined_shared_r[my_current_idx] = second; +} + +__inline__ __device__ void add_left_to_cache(size_type const first, + std::size_t* current_idx_shared, + int const warp_id, + size_type* joined_shared_l) +{ + cuda::atomic_ref ref{*(current_idx_shared + warp_id)}; + std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed); + joined_shared_l[my_current_idx] = first; +} + +template +__device__ void flush_output_cache(unsigned int const activemask, + std::size_t const max_size, + int const warp_id, + int const lane_id, + std::size_t* current_idx, + std::size_t current_idx_shared[num_warps], + size_type join_shared_l[num_warps][output_cache_size], + size_type join_shared_r[num_warps][output_cache_size], + size_type* join_output_l, + size_type* join_output_r) +{ + // count how many active threads participating here which could be less than warp_size + int const num_threads = __popc(activemask); + std::size_t output_offset = 0; + + if (0 == lane_id) { + cuda::atomic_ref ref{*current_idx}; + output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed); + } + + // No warp sync is necessary here because we are assuming that ShuffleIndex + // is internally using post-CUDA 9.0 synchronization-safe primitives + // (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to + // be safe by the compiler because it is not required by the standard to + // converge divergent branches before executing. + output_offset = cub::ShuffleIndex(output_offset, 0, activemask); + + for (std::size_t shared_out_idx = static_cast(lane_id); + shared_out_idx < current_idx_shared[warp_id]; + shared_out_idx += num_threads) { + std::size_t thread_offset = output_offset + shared_out_idx; + if (thread_offset < max_size) { + join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; + join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx]; + } + } +} + +template +__device__ void flush_output_cache(unsigned int const activemask, + std::size_t const max_size, + int const warp_id, + int const lane_id, + std::size_t* current_idx, + std::size_t current_idx_shared[num_warps], + size_type join_shared_l[num_warps][output_cache_size], + size_type* join_output_l) +{ + int const num_threads = __popc(activemask); + std::size_t output_offset = 0; + + if (0 == lane_id) { + cuda::atomic_ref ref{*current_idx}; + output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed); + } + + output_offset = cub::ShuffleIndex(output_offset, 0, activemask); + + for (std::size_t shared_out_idx = static_cast(lane_id); + shared_out_idx < current_idx_shared[warp_id]; + shared_out_idx += num_threads) { + std::size_t thread_offset = output_offset + shared_out_idx; + if (thread_offset < max_size) { + join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; + } + } +} + /** * @brief Computes the output size of joining the left table to the right table. * @@ -103,14 +207,14 @@ CUDF_KERNEL void compute_conditional_join_output_size( } } - using BlockReduce = cub::BlockReduce; + using BlockReduce = cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter); // Add block counter to global counter if (threadIdx.x == 0) { cuda::atomic_ref ref{*output_size}; - ref.fetch_add(block_counter, cuda::std::memory_order_relaxed); + ref.fetch_add(block_counter, cuda::memory_order_relaxed); } } @@ -143,13 +247,13 @@ CUDF_KERNEL void conditional_join(table_device_view left_table, join_kind join_type, cudf::size_type* join_output_l, cudf::size_type* join_output_r, - cudf::size_type* current_idx, + std::size_t* current_idx, cudf::ast::detail::expression_device_view device_expression_data, - cudf::size_type const max_size, + std::size_t const max_size, bool const swap_tables) { constexpr int num_warps = block_size / detail::warp_size; - __shared__ cudf::size_type current_idx_shared[num_warps]; + __shared__ std::size_t current_idx_shared[num_warps]; __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; __shared__ cudf::size_type join_shared_r[num_warps][output_cache_size]; @@ -183,7 +287,7 @@ CUDF_KERNEL void conditional_join(table_device_view left_table, if (outer_row_index < outer_num_rows) { bool found_match = false; - for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows; + for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows; ++inner_row_index) { auto output_dest = cudf::ast::detail::value_expression_result(); auto const left_row_index = swap_tables ? inner_row_index : outer_row_index; @@ -277,12 +381,12 @@ CUDF_KERNEL void conditional_join_anti_semi( table_device_view right_table, join_kind join_type, cudf::size_type* join_output_l, - cudf::size_type* current_idx, + std::size_t* current_idx, cudf::ast::detail::expression_device_view device_expression_data, - cudf::size_type const max_size) + std::size_t const max_size) { constexpr int num_warps = block_size / detail::warp_size; - __shared__ cudf::size_type current_idx_shared[num_warps]; + __shared__ std::size_t current_idx_shared[num_warps]; __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; extern __shared__ char raw_intermediate_storage[]; @@ -310,7 +414,7 @@ CUDF_KERNEL void conditional_join_anti_semi( for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows; outer_row_index += stride) { bool found_match = false; - for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows; + for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows; ++inner_row_index) { auto output_dest = cudf::ast::detail::value_expression_result(); diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index 31f267d5cfb..3d0f3e4340d 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -262,101 +262,6 @@ struct valid_range { } }; -/** - * @brief Adds a pair of indices to the shared memory cache - * - * @param[in] first The first index in the pair - * @param[in] second The second index in the pair - * @param[in,out] current_idx_shared Pointer to shared index that determines - * where in the shared memory cache the pair will be written - * @param[in] warp_id The ID of the warp of the calling the thread - * @param[out] joined_shared_l Pointer to the shared memory cache for left indices - * @param[out] joined_shared_r Pointer to the shared memory cache for right indices - */ -__inline__ __device__ void add_pair_to_cache(size_type const first, - size_type const second, - size_type* current_idx_shared, - int const warp_id, - size_type* joined_shared_l, - size_type* joined_shared_r) -{ - size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; - // its guaranteed to fit into the shared cache - joined_shared_l[my_current_idx] = first; - joined_shared_r[my_current_idx] = second; -} - -__inline__ __device__ void add_left_to_cache(size_type const first, - size_type* current_idx_shared, - int const warp_id, - size_type* joined_shared_l) -{ - size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; - - joined_shared_l[my_current_idx] = first; -} - -template -__device__ void flush_output_cache(unsigned int const activemask, - cudf::size_type const max_size, - int const warp_id, - int const lane_id, - cudf::size_type* current_idx, - cudf::size_type current_idx_shared[num_warps], - size_type join_shared_l[num_warps][output_cache_size], - size_type join_shared_r[num_warps][output_cache_size], - size_type* join_output_l, - size_type* join_output_r) -{ - // count how many active threads participating here which could be less than warp_size - int const num_threads = __popc(activemask); - cudf::size_type output_offset = 0; - - if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } - - // No warp sync is necessary here because we are assuming that ShuffleIndex - // is internally using post-CUDA 9.0 synchronization-safe primitives - // (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to - // be safe by the compiler because it is not required by the standard to - // converge divergent branches before executing. - output_offset = cub::ShuffleIndex(output_offset, 0, activemask); - - for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id]; - shared_out_idx += num_threads) { - cudf::size_type thread_offset = output_offset + shared_out_idx; - if (thread_offset < max_size) { - join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; - join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx]; - } - } -} - -template -__device__ void flush_output_cache(unsigned int const activemask, - cudf::size_type const max_size, - int const warp_id, - int const lane_id, - cudf::size_type* current_idx, - cudf::size_type current_idx_shared[num_warps], - size_type join_shared_l[num_warps][output_cache_size], - size_type* join_output_l) -{ - int const num_threads = __popc(activemask); - cudf::size_type output_offset = 0; - - if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } - - output_offset = cub::ShuffleIndex(output_offset, 0, activemask); - - for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id]; - shared_out_idx += num_threads) { - cudf::size_type thread_offset = output_offset + shared_out_idx; - if (thread_offset < max_size) { - join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; - } - } -} - } // namespace detail } // namespace cudf