diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index ff431c7f260..7be456ddfba 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -353,6 +353,11 @@ ConfigureNVBench(JSON_READER_NVBENCH io/json/nested_json.cpp io/json/json_reader ConfigureNVBench(JSON_READER_OPTION_NVBENCH io/json/json_reader_option.cpp) ConfigureNVBench(JSON_WRITER_NVBENCH io/json/json_writer.cpp) +# ################################################################################################## +# * multi buffer memset benchmark +# ---------------------------------------------------------------------- +ConfigureNVBench(BATCHED_MEMSET_BENCH io/utilities/batched_memset_bench.cpp) + # ################################################################################################## # * io benchmark --------------------------------------------------------------------- ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp) diff --git a/cpp/benchmarks/io/utilities/batched_memset_bench.cpp b/cpp/benchmarks/io/utilities/batched_memset_bench.cpp new file mode 100644 index 00000000000..2905895a63b --- /dev/null +++ b/cpp/benchmarks/io/utilities/batched_memset_bench.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include + +#include + +// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to +// run on most GPUs, but large enough to allow highest throughput +constexpr size_t data_size = 512 << 20; + +void parquet_read_common(cudf::size_type num_rows_to_read, + cudf::size_type num_cols_to_read, + cuio_source_sink_pair& source_sink, + nvbench::state& state) +{ + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + + auto mem_stats_logger = cudf::memory_stats_logger(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value())); + state.exec( + nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { + try_drop_l3_cache(); + + timer.start(); + auto const result = cudf::io::read_parquet(read_opts); + timer.stop(); + + CUDF_EXPECTS(result.tbl->num_columns() == num_cols_to_read, "Unexpected number of columns"); + CUDF_EXPECTS(result.tbl->num_rows() == num_rows_to_read, "Unexpected number of rows"); + }); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size"); +} + +template +void bench_batched_memset(nvbench::state& state, nvbench::type_list>) +{ + auto const d_type = get_type_or_group(static_cast(DataType)); + auto const num_cols = static_cast(state.get_int64("num_cols")); + auto const cardinality = static_cast(state.get_int64("cardinality")); + auto const run_length = static_cast(state.get_int64("run_length")); + auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); + auto const compression = cudf::io::compression_type::NONE; + cuio_source_sink_pair source_sink(source_type); + auto const tbl = + create_random_table(cycle_dtypes(d_type, num_cols), + table_size_bytes{data_size}, + data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const view = tbl->view(); + + cudf::io::parquet_writer_options write_opts = + cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view) + .compression(compression); + cudf::io::write_parquet(write_opts); + auto const num_rows = view.num_rows(); + + parquet_read_common(num_rows, num_cols, source_sink, state); +} + +using d_type_list = nvbench::enum_type_list; + +NVBENCH_BENCH_TYPES(bench_batched_memset, NVBENCH_TYPE_AXES(d_type_list)) + .set_name("batched_memset") + .set_type_axes_names({"data_type"}) + .add_int64_axis("num_cols", {1000}) + .add_string_axis("io_type", {"DEVICE_BUFFER"}) + .set_min_samples(4) + .add_int64_axis("cardinality", {0, 1000}) + .add_int64_axis("run_length", {1, 32}); diff --git a/cpp/include/cudf/io/detail/batched_memset.hpp b/cpp/include/cudf/io/detail/batched_memset.hpp new file mode 100644 index 00000000000..d0922cc64ee --- /dev/null +++ b/cpp/include/cudf/io/detail/batched_memset.hpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace CUDF_EXPORT cudf { +namespace io::detail { + +/** + * @brief A helper function that takes in a vector of device spans and memsets them to the + * value provided using batches sent to the GPU. + * + * @param bufs Vector with device spans of data + * @param value Value to memset all device spans to + * @param _stream Stream used for device memory operations and kernel launches + * + * @return The data in device spans all set to value + */ +template +void batched_memset(std::vector> const& bufs, + T const value, + rmm::cuda_stream_view stream) +{ + // define task and bytes parameters + auto const num_bufs = bufs.size(); + + // copy bufs into device memory and then get sizes + auto gpu_bufs = + cudf::detail::make_device_uvector_async(bufs, stream, rmm::mr::get_current_device_resource()); + + // get a vector with the sizes of all buffers + auto sizes = cudf::detail::make_counting_transform_iterator( + static_cast(0), + cuda::proclaim_return_type( + [gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].size(); })); + + // get an iterator with a constant value to memset + auto iter_in = thrust::make_constant_iterator(thrust::make_constant_iterator(value)); + + // get an iterator pointing to each device span + auto iter_out = thrust::make_transform_iterator( + thrust::counting_iterator(0), + cuda::proclaim_return_type( + [gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].data(); })); + + size_t temp_storage_bytes = 0; + + cub::DeviceCopy::Batched(nullptr, temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream); + + rmm::device_buffer d_temp_storage( + temp_storage_bytes, stream, rmm::mr::get_current_device_resource()); + + cub::DeviceCopy::Batched( + d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream); +} + +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index e006cc7d714..557b1a45c1f 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -1494,6 +1495,11 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // buffers if they are not part of a list hierarchy. mark down // if we have any list columns that need further processing. bool has_lists = false; + // Casting to std::byte since data buffer pointer is void * + std::vector> memset_bufs; + // Validity Buffer is a uint32_t pointer + std::vector> nullmask_bufs; + for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& input_col = _input_columns[idx]; size_t const max_depth = input_col.nesting_depth(); @@ -1514,13 +1520,19 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // we're going to start null mask as all valid and then turn bits off if necessary out_buf.create_with_mask( out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, - cudf::mask_state::ALL_VALID, + cudf::mask_state::UNINITIALIZED, + false, _stream, _mr); + memset_bufs.push_back(cudf::device_span(static_cast(out_buf.data()), + out_buf.data_size())); + nullmask_bufs.push_back(cudf::device_span( + out_buf.null_mask(), + cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(cudf::bitmask_type)) / + sizeof(cudf::bitmask_type))); } } } - // compute output column sizes by examining the pages of the -input- columns if (has_lists) { auto h_cols_info = @@ -1593,11 +1605,22 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num // allocate // we're going to start null mask as all valid and then turn bits off if necessary - out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr); + out_buf.create_with_mask(size, cudf::mask_state::UNINITIALIZED, false, _stream, _mr); + memset_bufs.push_back(cudf::device_span( + static_cast(out_buf.data()), out_buf.data_size())); + nullmask_bufs.push_back(cudf::device_span( + out_buf.null_mask(), + cudf::util::round_up_safe(out_buf.null_mask_size(), sizeof(cudf::bitmask_type)) / + sizeof(cudf::bitmask_type))); } } } } + + cudf::io::detail::batched_memset(memset_bufs, static_cast(0), _stream); + // Need to set null mask bufs to all high bits + cudf::io::detail::batched_memset( + nullmask_bufs, std::numeric_limits::max(), _stream); } std::vector reader::impl::calculate_page_string_offsets() diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 2f4272b0367..8abfb000b94 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -33,7 +33,7 @@ namespace cudf::io::detail { -void gather_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream) +void gather_column_buffer::allocate_strings_data(bool memset_data, rmm::cuda_stream_view stream) { CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column"); // The contents of _strings will never be directly returned to the user. @@ -56,11 +56,12 @@ std::unique_ptr gather_column_buffer::make_string_column_impl(rmm::cuda_ return make_strings_column(*_strings, stream, _mr); } -void cudf::io::detail::inline_column_buffer::allocate_strings_data(rmm::cuda_stream_view stream) +void cudf::io::detail::inline_column_buffer::allocate_strings_data(bool memset_data, + rmm::cuda_stream_view stream) { CUDF_EXPECTS(type.id() == type_id::STRING, "allocate_strings_data called for non-string column"); // size + 1 for final offset. _string_data will be initialized later. - _data = create_data(data_type{type_id::INT32}, size + 1, stream, _mr); + _data = create_data(data_type{type_to_id()}, size + 1, memset_data, stream, _mr); } void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes, @@ -93,6 +94,7 @@ void copy_buffer_data(string_policy const& buff, string_policy& new_buff) template void column_buffer_base::create_with_mask(size_type _size, cudf::mask_state null_mask_state, + bool memset_data, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -100,16 +102,20 @@ void column_buffer_base::create_with_mask(size_type _size, _mr = mr; switch (type.id()) { - case type_id::STRING: static_cast(this)->allocate_strings_data(stream); break; + case type_id::STRING: + static_cast(this)->allocate_strings_data(memset_data, stream); + break; // list columns store a buffer of int32's as offsets to represent // their individual rows - case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, _mr); break; + case type_id::LIST: + _data = create_data(data_type{type_to_id()}, size, memset_data, stream, _mr); + break; // struct columns store no data themselves. just validity and children. case type_id::STRUCT: break; - default: _data = create_data(type, size, stream, _mr); break; + default: _data = create_data(type, size, memset_data, stream, _mr); break; } if (is_nullable) { _null_mask = @@ -117,12 +123,21 @@ void column_buffer_base::create_with_mask(size_type _size, } } +template +void column_buffer_base::create(size_type _size, + bool memset_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + create_with_mask(_size, mask_state::ALL_NULL, memset_data, stream, mr); +} + template void column_buffer_base::create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - create_with_mask(_size, mask_state::ALL_NULL, stream, mr); + create_with_mask(_size, mask_state::ALL_NULL, true, stream, mr); } template diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index ed6bb8bbdca..b2290965bb9 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -44,6 +44,7 @@ namespace detail { * * @param type The intended data type to populate * @param size The number of elements to be represented by the mask + * @param memset_data Defines whether data should be memset to 0 * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned device_buffer * @@ -51,17 +52,25 @@ namespace detail { */ inline rmm::device_buffer create_data(data_type type, size_type size, + bool memset_data, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { std::size_t data_size = size_of(type) * size; rmm::device_buffer data(data_size, stream, mr); - CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value())); - + if (memset_data) { CUDF_CUDA_TRY(cudaMemsetAsync(data.data(), 0, data_size, stream.value())); } return data; } +inline rmm::device_buffer create_data(data_type type, + size_type size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return create_data(type, size, true, stream, mr); +} + using string_index_pair = thrust::pair; // forward declare friend functions @@ -113,12 +122,18 @@ class column_buffer_base { // instantiate a column of known type with a specified size. Allows deferred creation for // preprocessing steps such as in the Parquet reader + void create(size_type _size, + bool memset_data, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + void create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); // like create(), but also takes a `cudf::mask_state` to allow initializing the null mask as // something other than `ALL_NULL` void create_with_mask(size_type _size, cudf::mask_state null_mask_state, + bool memset_data, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -192,7 +207,7 @@ class gather_column_buffer : public column_buffer_base { create(_size, stream, mr); } - void allocate_strings_data(rmm::cuda_stream_view stream); + void allocate_strings_data(bool memset_data, rmm::cuda_stream_view stream); [[nodiscard]] void* data_impl() { return _strings ? _strings->data() : _data.data(); } [[nodiscard]] void const* data_impl() const { return _strings ? _strings->data() : _data.data(); } @@ -226,7 +241,7 @@ class inline_column_buffer : public column_buffer_base { create(_size, stream, mr); } - void allocate_strings_data(rmm::cuda_stream_view stream); + void allocate_strings_data(bool memset_data, rmm::cuda_stream_view stream); void* data_impl() { return _data.data(); } [[nodiscard]] void const* data_impl() const { return _data.data(); } diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 4dffcb41ba2..5e85b3e8adf 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -393,6 +393,7 @@ ConfigureTest( utilities_tests/pinned_memory_tests.cpp utilities_tests/type_check_tests.cpp utilities_tests/type_list_tests.cpp + utilities_tests/batched_memset_tests.cu ) # ################################################################################################## diff --git a/cpp/tests/utilities_tests/batched_memset_tests.cu b/cpp/tests/utilities_tests/batched_memset_tests.cu new file mode 100644 index 00000000000..9fc5baeec97 --- /dev/null +++ b/cpp/tests/utilities_tests/batched_memset_tests.cu @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include + +template +struct MultiBufferTestIntegral : public cudf::test::BaseFixture {}; + +TEST(MultiBufferTestIntegral, BasicTest1) +{ + std::vector const BUF_SIZES{ + 50000, 4, 1000, 0, 250000, 1, 100, 8000, 0, 1, 100, 1000, 10000, 100000, 0, 1, 100000}; + + // Device init + auto stream = cudf::get_default_stream(); + auto mr = rmm::mr::get_current_device_resource(); + + // Creating base vector for data and setting it to all 0xFF + std::vector> expected; + std::transform(BUF_SIZES.begin(), BUF_SIZES.end(), std::back_inserter(expected), [](auto size) { + return std::vector(size + 2000, std::numeric_limits::max()); + }); + + // set buffer region to other value + std::for_each(thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), BUF_SIZES.begin())), + thrust::make_zip_iterator(thrust::make_tuple(expected.end(), BUF_SIZES.end())), + [](auto elem) { + std::fill_n( + thrust::get<0>(elem).begin() + 1000, thrust::get<1>(elem), 0xEEEEEEEEEEEEEEEE); + }); + + // Copy host vector data to device + std::vector> device_bufs; + std::transform(expected.begin(), + expected.end(), + std::back_inserter(device_bufs), + [stream, mr](auto const& vec) { + return cudf::detail::make_device_uvector_async(vec, stream, mr); + }); + + // Initialize device buffers for memset + std::vector> memset_bufs; + std::transform( + thrust::make_zip_iterator(thrust::make_tuple(device_bufs.begin(), BUF_SIZES.begin())), + thrust::make_zip_iterator(thrust::make_tuple(device_bufs.end(), BUF_SIZES.end())), + std::back_inserter(memset_bufs), + [](auto const& elem) { + return cudf::device_span(thrust::get<0>(elem).data() + 1000, thrust::get<1>(elem)); + }); + + // Function Call + cudf::io::detail::batched_memset(memset_bufs, uint64_t{0}, stream); + + // Set all buffer regions to 0 for expected comparison + std::for_each( + thrust::make_zip_iterator(thrust::make_tuple(expected.begin(), BUF_SIZES.begin())), + thrust::make_zip_iterator(thrust::make_tuple(expected.end(), BUF_SIZES.end())), + [](auto elem) { std::fill_n(thrust::get<0>(elem).begin() + 1000, thrust::get<1>(elem), 0UL); }); + + // Compare to see that only given buffers are zeroed out + std::for_each( + thrust::make_zip_iterator(thrust::make_tuple(device_bufs.begin(), expected.begin())), + thrust::make_zip_iterator(thrust::make_tuple(device_bufs.end(), expected.end())), + [stream](auto const& elem) { + auto after_memset = cudf::detail::make_std_vector_async(thrust::get<0>(elem), stream); + EXPECT_TRUE( + std::equal(thrust::get<1>(elem).begin(), thrust::get<1>(elem).end(), after_memset.begin())); + }); +}