diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp index 76f093837de..b2de030eed8 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/detail/reduce_deterministic.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -62,6 +63,84 @@ namespace hpx::parallel::detail { } }; + template + struct sequential_reduce_deterministic_rfa_t final + : hpx::functional::detail::tag_fallback< + sequential_reduce_deterministic_rfa_t> + { + private: + template + friend constexpr hpx::parallel::detail::rfa:: + ReproducibleFloatingAccumulator + tag_fallback_invoke(sequential_reduce_deterministic_rfa_t, + ExPolicy&&, InIterB first, InIterE last, T init, Reduce&& r) + { + hpx::parallel::detail::rfa::RFA_bins bins; + bins.initialize_bins(); + std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins)); + + hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator rfa; + + for (auto e = first; e != last; ++e) + { + rfa += *e; + } + return rfa; + } + + template + friend constexpr hpx::parallel::detail::rfa:: + ReproducibleFloatingAccumulator + tag_fallback_invoke(sequential_reduce_deterministic_rfa_t, + ExPolicy&&, InIterB first, std::size_t size, T init, Reduce&& r) + { + hpx::parallel::detail::rfa::RFA_bins bins; + bins.initialize_bins(); + std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins)); + + hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator rfa; + auto e = first; + for (std::size_t i = 0; i < size; ++i, ++e) + { + rfa += *e; + } + return rfa; + } + + // template , + // // hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator< + // // double>>::value> + // > + // friend constexpr T tag_fallback_invoke( + // sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first, + // InIterE last, T init, Reduce&& r) + // { + // static_assert(hpx::util::contains, + // hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator< + // double>>::value); + // hpx::parallel::detail::rfa::RFA_bins bins; + // bins.initialize_bins(); + // std::memcpy(rfa::bin_host_buffer, &bins, sizeof(bins)); + + // hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator rfa; + // rfa.set_max_abs_val(init); + // rfa.unsafe_add(init); + // rfa.renorm(); + // for (auto e = first; e != last; ++e) + // { + // rfa += *e; + // } + // return rfa.conv(); + // } + }; + #if !defined(HPX_COMPUTE_DEVICE_CODE) template inline constexpr sequential_reduce_deterministic_t @@ -77,4 +156,18 @@ namespace hpx::parallel::detail { } #endif +#if !defined(HPX_COMPUTE_DEVICE_CODE) + template + inline constexpr sequential_reduce_deterministic_rfa_t + sequential_reduce_deterministic_rfa = + sequential_reduce_deterministic_rfa_t{}; +#else + template + HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa( + Args&&... args) + { + return sequential_reduce_deterministic_rfa_t{}( + std::forward(args)...); + } +#endif } // namespace hpx::parallel::detail diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp index 56c8d334989..b8435eec9a0 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce_deterministic.hpp @@ -10,6 +10,7 @@ #pragma once +#include "detail/reduce_deterministic.hpp" #if defined(DOXYGEN) namespace hpx { @@ -396,42 +397,49 @@ namespace hpx::parallel { static constexpr T sequential(ExPolicy&& policy, InIterB first, InIterE last, T_&& init, Reduce&& r) { - // hpx::parallel::detail::sequential_reduce_deterministic_t seq; - // return seq(policy,first,last,0.0f,r); - return hpx::parallel::detail::sequential_reduce_deterministic( - HPX_FORWARD(ExPolicy, policy), first, last, + return hpx::parallel::detail::sequential_reduce_deterministic< + ExPolicy>(HPX_FORWARD(ExPolicy, policy), first, last, HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); } - // template - // static util::detail::algorithm_result_t parallel( - // ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init, - // Reduce&& r) - // { - // if (first == last) - // { - // return util::detail::algorithm_result::get( - // HPX_FORWARD(T_, init)); - // } - - // auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { - // T val = *part_begin; - // return detail::sequential_reduce( - // ++part_begin, --part_size, HPX_MOVE(val), r); - // }; - - // return util::partitioner::call( - // HPX_FORWARD(ExPolicy, policy), first, - // detail::distance(first, last), HPX_MOVE(f1), - // hpx::unwrapping( - // [init = HPX_FORWARD(T_, init), - // r = HPX_FORWARD(Reduce, r)](auto&& results) -> T { - // return detail::sequential_reduce( - // hpx::util::begin(results), - // hpx::util::size(results), init, r); - // })); - // } + template + static util::detail::algorithm_result_t parallel( + ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init, + Reduce&& r) + { + if (first == last) + { + return util::detail::algorithm_result::get( + HPX_FORWARD(T_, init)); + } + + auto f1 = [r, policy]( + FwdIterB part_begin, std::size_t part_size) + -> hpx::parallel::detail::rfa:: + ReproducibleFloatingAccumulator { + T val = *part_begin; + return hpx::parallel::detail:: + sequential_reduce_deterministic_rfa( + HPX_FORWARD(ExPolicy, policy), ++part_begin, + --part_size, HPX_MOVE(val), r); + }; + + return util::partitioner>::call(HPX_FORWARD(ExPolicy, policy), first, + detail::distance(first, last), HPX_MOVE(f1), + hpx::unwrapping([init = HPX_FORWARD(T_, init), + r = HPX_FORWARD(Reduce, r), + policy](auto&& results) -> T { + return hpx::parallel::detail:: + sequential_reduce_deterministic_rfa( + HPX_FORWARD(ExPolicy, policy), + hpx::util::begin(results), + hpx::util::size(results), init, r) + .conv(); + })); + } }; /// \endcond } // namespace detail diff --git a/libs/core/algorithms/tests/unit/algorithms/reduce_deterministic.cpp b/libs/core/algorithms/tests/unit/algorithms/reduce_deterministic.cpp index 0a67dea0302..694b50cfb76 100644 --- a/libs/core/algorithms/tests/unit/algorithms/reduce_deterministic.cpp +++ b/libs/core/algorithms/tests/unit/algorithms/reduce_deterministic.cpp @@ -79,6 +79,49 @@ void test_reduce1(IteratorTag) HPX_TEST_EQ(r2, r3); } +template +void test_reduce_parallel1(IteratorTag) +{ + // check if different type for deterministic and nondeeterministic + // and same result i.e. correct computation + using base_iterator_det = std::vector::iterator; + using iterator_det = test::test_iterator; + + using base_iterator_ndet = std::vector::iterator; + using iterator_ndet = test::test_iterator; + + std::vector deterministic(LEN); + std::vector nondeterministic(LEN); + + std::iota( + deterministic.begin(), deterministic.end(), FloatTypeDeterministic(0)); + + std::iota(nondeterministic.begin(), nondeterministic.end(), + FloatTypeNonDeterministic(0)); + + FloatTypeDeterministic val_det(0); + FloatTypeNonDeterministic val_non_det(0); + auto op = [](FloatTypeNonDeterministic v1, FloatTypeNonDeterministic v2) { + return v1 + v2; + }; + + FloatTypeDeterministic r1 = hpx::reduce_deterministic(hpx::execution::par, + iterator_det(std::begin(deterministic)), + iterator_det(std::end(deterministic)), val_det, op); + + // verify values + // FloatTypeNonDeterministic r2 = hpx::reduce(hpx::execution::par, + // iterator_ndet(std::begin(nondeterministic)), + // iterator_ndet(std::end(nondeterministic)), val_non_det, op); + + FloatTypeNonDeterministic r3 = std::accumulate( + nondeterministic.begin(), nondeterministic.end(), val_non_det); + + HPX_TEST_EQ(r1, r3); + // HPX_TEST_EQ(r2, r3); +} + template void test_reduce_determinism(IteratorTag) @@ -178,6 +221,7 @@ void test_reduce1() test_reduce1(IteratorTag()); test_reduce1(IteratorTag()); test_reduce1(IteratorTag()); + test_reduce_parallel1(IteratorTag()); } template