diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index aab0a9b2d49..5fd68bfb26c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -502,6 +502,7 @@ add_library( src/reductions/product.cu src/reductions/reductions.cpp src/reductions/scan/rank_scan.cu + src/reductions/scan/ewm.cu src/reductions/scan/scan.cpp src/reductions/scan/scan_exclusive.cu src/reductions/scan/scan_inclusive.cu diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index 67705863d41..4bfef9767ca 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -85,8 +85,7 @@ static cudf::string_scalar create_random_input(int32_t num_chars, // extract the chars from the returned strings column. auto input_column_contents = input_column->release(); - auto chars_column_contents = input_column_contents.children[1]->release(); - auto chars_buffer = chars_column_contents.data.release(); + auto chars_buffer = input_column_contents.data.release(); // turn the chars in to a string scalar. return cudf::string_scalar(std::move(*chars_buffer)); @@ -218,7 +217,7 @@ NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list)) .set_name("multibyte_split_source") .set_min_samples(4) - .add_int64_axis("strip_delimiters", {1}) + .add_int64_axis("strip_delimiters", {0, 1}) .add_int64_axis("delim_size", {1}) .add_int64_axis("delim_percent", {1}) .add_int64_power_of_two_axis("size_approx", {15, 30}) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index d458c831f19..3c1023017be 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,6 +103,7 @@ class aggregation { NUNIQUE, ///< count number of unique elements NTH_ELEMENT, ///< get the nth element ROW_NUMBER, ///< get row-number of current index (relative to rolling window) + EWMA, ///< get exponential weighted moving average at current index RANK, ///< get rank of current index COLLECT_LIST, ///< collect values into a list COLLECT_SET, ///< collect values into a list without duplicate entries @@ -250,6 +251,8 @@ class segmented_reduce_aggregation : public virtual aggregation { enum class udf_type : bool { CUDA, PTX }; /// Type of correlation method. enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN }; +/// Type of treatment of EWM input values' first value +enum class ewm_history : int32_t { INFINITE, FINITE }; /// Factory to create a SUM aggregation /// @return A SUM aggregation object @@ -411,6 +414,42 @@ std::unique_ptr make_nth_element_aggregation( template std::unique_ptr make_row_number_aggregation(); +/** + * @brief Factory to create an EWMA aggregation + * + * `EWMA` returns a non-nullable column with the same type as the input, + * whose values are the exponentially weighted moving average of the input + * sequence. Let these values be known as the y_i. + * + * EWMA aggregations are parameterized by a center of mass (`com`) which + * affects the contribution of the previous values (y_0 ... y_{i-1}) in + * computing the y_i. + * + * EWMA aggregations are also parameterized by a history `cudf::ewm_history`. + * Special considerations have to be given to the mathematical treatment of + * the first value of the input sequence. There are two approaches to this, + * one which considers the first value of the sequence to be the exponential + * weighted moving average of some infinite history of data, and one which + * takes the first value to be the only datapoint known. These assumptions + * lead to two different formulas for the y_i. `ewm_history` selects which. + * + * EWMA aggregations have special null handling. Nulls have two effects. The + * first is to propagate forward the last valid value as far as it has been + * computed. This could be thought of as the nulls not affecting the average + * in any way. The second effect changes the way the y_i are computed. Since + * a moving average is conceptually designed to weight contributing values by + * their recency, nulls ought to count as valid periods even though they do + * not change the average. For example, if the input sequence is {1, NULL, 3} + * then when computing y_2 one should weigh y_0 as if it occurs two periods + * before y_2 rather than just one. + * + * @param center_of_mass the center of mass. + * @param history which assumption to make about the first value + * @return A EWM aggregation object + */ +template +std::unique_ptr make_ewma_aggregation(double const center_of_mass, ewm_history history); + /** * @brief Factory to create a RANK aggregation * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index edee83783b8..843414817e3 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -76,6 +76,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class nth_element_aggregation const& agg); virtual std::vector> visit(data_type col_type, class row_number_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class ewma_aggregation const& agg); virtual std::vector> visit(data_type col_type, class rank_aggregation const& agg); virtual std::vector> visit( @@ -141,6 +143,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class correlation_aggregation const& agg); virtual void visit(class tdigest_aggregation const& agg); virtual void visit(class merge_tdigest_aggregation const& agg); + virtual void visit(class ewma_aggregation const& agg); }; /** @@ -667,6 +670,40 @@ class row_number_aggregation final : public rolling_aggregation { void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } }; +/** + * @brief Derived class for specifying an ewma aggregation + */ +class ewma_aggregation final : public scan_aggregation { + public: + double const center_of_mass; + cudf::ewm_history history; + + ewma_aggregation(double const center_of_mass, cudf::ewm_history history) + : aggregation{EWMA}, center_of_mass{center_of_mass}, history{history} + { + } + + std::unique_ptr clone() const override + { + return std::make_unique(*this); + } + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + + bool is_equal(aggregation const& _other) const override + { + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return this->center_of_mass == other.center_of_mass and this->history == other.history; + } + + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived class for specifying a rank aggregation */ @@ -1336,6 +1373,11 @@ struct target_type_impl { using type = size_type; }; +template +struct target_type_impl { + using type = double; +}; + // Always use size_type accumulator for RANK template struct target_type_impl { @@ -1536,6 +1578,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::MERGE_TDIGEST: return f.template operator()(std::forward(args)...); + case aggregation::EWMA: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index adee9147740..5422304c5cb 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -154,6 +154,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, ewma_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + std::vector> simple_aggregations_collector::visit( data_type col_type, rank_aggregation const& agg) { @@ -333,6 +339,11 @@ void aggregation_finalizer::visit(row_number_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(ewma_aggregation const& agg) +{ + visit(static_cast(agg)); +} + void aggregation_finalizer::visit(rank_aggregation const& agg) { visit(static_cast(agg)); @@ -665,6 +676,17 @@ std::unique_ptr make_row_number_aggregation() template std::unique_ptr make_row_number_aggregation(); template std::unique_ptr make_row_number_aggregation(); +/// Factory to create an EWMA aggregation +template +std::unique_ptr make_ewma_aggregation(double const com, cudf::ewm_history history) +{ + return std::make_unique(com, history); +} +template std::unique_ptr make_ewma_aggregation(double const com, + cudf::ewm_history history); +template std::unique_ptr make_ewma_aggregation( + double const com, cudf::ewm_history history); + /// Factory to create a RANK aggregation template std::unique_ptr make_rank_aggregation(rank_method method, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index ca15b532d07..bed4dbc5a66 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -296,19 +296,6 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream) CUDF_FAIL("Unexpected compound type"); } -// checks to see if the given column has a fixed size. This doesn't -// check every row, so assumes string and list columns are not fixed, even -// if each row is the same width. -// TODO: update this if FIXED_LEN_BYTE_ARRAY is ever supported for writes. -bool is_col_fixed_width(column_view const& column) -{ - if (column.type().id() == type_id::STRUCT) { - return std::all_of(column.child_begin(), column.child_end(), is_col_fixed_width); - } - - return is_fixed_width(column.type()); -} - /** * @brief Extends SchemaElement to add members required in constructing parquet_column_view * @@ -946,6 +933,15 @@ struct parquet_column_view { return schema_node.converted_type.value_or(UNKNOWN); } + // Checks to see if the given column has a fixed-width data type. This doesn't + // check every value, so it assumes string and list columns are not fixed-width, even + // if each value has the same size. + [[nodiscard]] bool is_fixed_width() const + { + // lists and strings are not fixed width + return max_rep_level() == 0 and physical_type() != Type::BYTE_ARRAY; + } + std::vector const& get_path_in_schema() { return path_in_schema; } // LIST related member functions @@ -1764,7 +1760,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // unbalanced in final page sizes, so using 4 which seems to be a good // compromise at smoothing things out without getting fragment sizes too small. auto frag_size_fn = [&](auto const& col, size_t col_size) { - int const target_frags_per_page = is_col_fixed_width(col) ? 1 : 4; + int const target_frags_per_page = col.is_fixed_width() ? 1 : 4; auto const avg_len = target_frags_per_page * util::div_rounding_up_safe(col_size, input.num_rows()); if (avg_len > 0) { @@ -1775,8 +1771,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } }; - std::transform(single_streams_table.begin(), - single_streams_table.end(), + std::transform(parquet_columns.begin(), + parquet_columns.end(), column_sizes.begin(), column_frag_size.begin(), frag_size_fn); diff --git a/cpp/src/io/text/byte_range_info.cpp b/cpp/src/io/text/byte_range_info.cpp index 290e0451839..6a7836ed4e1 100644 --- a/cpp/src/io/text/byte_range_info.cpp +++ b/cpp/src/io/text/byte_range_info.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ std::vector create_byte_range_infos_consecutive(int64_t total_b auto range_size = util::div_rounding_up_safe(total_bytes, range_count); auto ranges = std::vector(); - ranges.reserve(range_size); + ranges.reserve(range_count); for (int64_t i = 0; i < range_count; i++) { auto offset = i * range_size; diff --git a/cpp/src/io/text/data_chunk_source_factories.cpp b/cpp/src/io/text/data_chunk_source_factories.cpp index 596ca3458c8..58faa0ebfe4 100644 --- a/cpp/src/io/text/data_chunk_source_factories.cpp +++ b/cpp/src/io/text/data_chunk_source_factories.cpp @@ -120,7 +120,11 @@ class istream_data_chunk_reader : public data_chunk_reader { { } - void skip_bytes(std::size_t size) override { _datastream->ignore(size); }; + void skip_bytes(std::size_t size) override + { + // 20% faster than _datastream->ignore(size) for large files + _datastream->seekg(_datastream->tellg() + static_cast(size)); + }; std::unique_ptr get_next_chunk(std::size_t read_size, rmm::cuda_stream_view stream) override @@ -265,7 +269,7 @@ class file_data_chunk_source : public data_chunk_source { [[nodiscard]] std::unique_ptr create_reader() const override { return std::make_unique( - std::make_unique(_filename, std::ifstream::in)); + std::make_unique(_filename, std::ifstream::in | std::ifstream::binary)); } private: diff --git a/cpp/src/reductions/scan/ewm.cu b/cpp/src/reductions/scan/ewm.cu new file mode 100644 index 00000000000..3fa2de450ad --- /dev/null +++ b/cpp/src/reductions/scan/ewm.cu @@ -0,0 +1,330 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "scan.cuh" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace cudf { +namespace detail { + +template +using pair_type = thrust::pair; + +/** + * @brief functor to be summed over in a prefix sum such that + * the recurrence in question is solved. See + * G. E. Blelloch. Prefix sums and their applications. Technical Report + * CMU-CS-90-190, Nov. 1990. S. 1.4 + * for details + */ +template +class recurrence_functor { + public: + __device__ pair_type operator()(pair_type ci, pair_type cj) + { + return {ci.first * cj.first, ci.second * cj.first + cj.second}; + } +}; + +template +struct ewma_functor_base { + T beta; + const pair_type IDENTITY{1.0, 0.0}; +}; + +template +struct ewma_adjust_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + // Not const to allow for updating the input value + auto [valid, exp, input] = data; + if (!valid) { return this->IDENTITY; } + if constexpr (not is_numerator) { input = 1; } + + // The value is non-null, but nulls preceding it + // must adjust the second element of the pair + T const beta = this->beta; + return {beta * ((exp != 0) ? pow(beta, exp) : 1), input}; + } +}; + +template +struct ewma_adjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(T const data) + { + T const beta = this->beta; + if constexpr (is_numerator) { + return {beta, data}; + } else { + return {beta, 1.0}; + } + } +}; + +template +struct ewma_noadjust_nulls_functor : public ewma_functor_base { + /* + In the null case, a denominator actually has to be computed. The formula is + y_{i+1} = (1 - alpha)x_{i-1} + alpha x_i, but really there is a "denominator" + which is the sum of the weights: alpha + (1 - alpha) == 1. If a null is + encountered, that means that the "previous" value is downweighted by a + factor (for each missing value). For example with a single null: + data = {x_0, NULL, x_1}, + y_2 = (1 - alpha)**2 x_0 + alpha * x_2 / (alpha + (1-alpha)**2) + + As such, the pairs must be updated before summing like the adjusted case to + properly downweight the previous values. But now but we also need to compute + the normalization factors and divide the results into them at the end. + */ + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index, valid, nullcnt] = data; + if (index == 0) { + return {beta, input}; + } else { + if (!valid) { return this->IDENTITY; } + // preceding value is valid, return normal pair + if (nullcnt == 0) { return {beta, (1.0 - beta) * input}; } + // one or more preceding values is null, adjust by how many + T const factor = (1.0 - beta) + pow(beta, nullcnt + 1); + return {(beta * (pow(beta, nullcnt)) / factor), ((1.0 - beta) * input) / factor}; + } + } +}; + +template +struct ewma_noadjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index] = data; + if (index == 0) { + return {beta, input}; + } else { + return {beta, (1.0 - beta) * input}; + } + } +}; + +/** +* @brief Return an array whose values y_i are the number of null entries +* in between the last valid entry of the input and the current index. +* Example: {1, NULL, 3, 4, NULL, NULL, 7} + -> {0, 0 1, 0, 0, 1, 2} +*/ +rmm::device_uvector null_roll_up(column_view const& input, + rmm::cuda_stream_view stream) +{ + rmm::device_uvector output(input.size(), stream); + + auto device_view = column_device_view::create(input); + auto invalid_it = thrust::make_transform_iterator( + cudf::detail::make_validity_iterator(*device_view), + cuda::proclaim_return_type([] __device__(int valid) -> int { return 1 - valid; })); + + // valid mask {1, 0, 1, 0, 0, 1} leads to output array {0, 0, 1, 0, 1, 2} + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + invalid_it, + invalid_it + input.size() - 1, + invalid_it, + std::next(output.begin())); + return output; +} + +template +rmm::device_uvector compute_ewma_adjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + + if (input.has_nulls()) { + rmm::device_uvector nullcnt = null_roll_up(input, stream); + auto device_view = column_device_view::create(input); + auto valid_it = cudf::detail::make_validity_iterator(*device_view); + auto data = + thrust::make_zip_iterator(thrust::make_tuple(valid_it, nullcnt.begin(), input.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + + } else { + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + input.begin(), + input.end(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + auto itr = thrust::make_counting_iterator(0); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + itr, + itr + input.size(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + } + + thrust::transform( + rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + output.begin(), + [] __device__(pair_type pair, T numerator) -> T { return numerator / pair.second; }); + + return output; +} + +template +rmm::device_uvector compute_ewma_noadjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + rmm::device_uvector nullcnt = + [&input, stream]() -> rmm::device_uvector { + if (input.has_nulls()) { + return null_roll_up(input, stream); + } else { + return rmm::device_uvector(input.size(), stream); + } + }(); + // denominators are all 1 and do not need to be computed + // pairs are all (beta, 1-beta x_i) except for the first one + + if (!input.has_nulls()) { + auto data = thrust::make_zip_iterator( + thrust::make_tuple(input.begin(), thrust::make_counting_iterator(0))); + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_no_nulls_functor{beta}, + recurrence_functor{}); + + } else { + auto device_view = column_device_view::create(input); + auto valid_it = detail::make_validity_iterator(*device_view); + + auto data = thrust::make_zip_iterator(thrust::make_tuple( + input.begin(), thrust::make_counting_iterator(0), valid_it, nullcnt.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_nulls_functor{beta}, + recurrence_functor()); + } + + // copy the second elements to the output for now + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + return output; +} + +struct ewma_functor { + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + CUDF_FAIL("Unsupported type for EWMA."); + } + + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + auto const ewma_agg = dynamic_cast(&agg); + auto const history = ewma_agg->history; + auto const center_of_mass = ewma_agg->center_of_mass; + + // center of mass is easier for the user, but the recurrences are + // better expressed in terms of the derived parameter `beta` + T const beta = center_of_mass / (center_of_mass + 1.0); + + auto result = [&]() { + if (history == cudf::ewm_history::INFINITE) { + return compute_ewma_adjust(input, beta, stream, mr); + } else { + return compute_ewma_noadjust(input, beta, stream, mr); + } + }(); + return std::make_unique(cudf::data_type(cudf::type_to_id()), + input.size(), + result.release(), + rmm::device_buffer{}, + 0); + } +}; + +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return type_dispatcher(input.type(), ewma_functor{}, agg, input, stream, mr); +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/reductions/scan/scan.cuh b/cpp/src/reductions/scan/scan.cuh index aeb9e516cd4..6c237741ac3 100644 --- a/cpp/src/reductions/scan/scan.cuh +++ b/cpp/src/reductions/scan/scan.cuh @@ -36,6 +36,12 @@ std::pair mask_scan(column_view const& input_view rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +// exponentially weighted moving average of the input +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + template