Skip to content

Commit

Permalink
Merge 49dd806 into ad17f89
Browse files Browse the repository at this point in the history
  • Loading branch information
SAtacker authored Feb 2, 2025
2 parents ad17f89 + 49dd806 commit 4cc10fb
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <hpx/functional/invoke.hpp>
#include <hpx/parallel/algorithms/detail/rfa.hpp>
#include <hpx/parallel/util/loop.hpp>
#include <hpx/type_support/pack.hpp>

#include <cstddef>
#include <cstring>
Expand All @@ -35,16 +36,16 @@ namespace hpx::parallel::detail {
{
/// TODO: Put constraint on Reduce to be a binary plus operator
(void) r;
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.set_max_abs_val(init);
rfa.unsafe_add(init);
rfa.renorm();
size_t count = 0;
T max_val = std::abs(*first);
T max_val = std::abs(std::numeric_limits<T>::min());
for (auto e = first; e != last; ++e)
{
T temp_max_val = std::abs(static_cast<T>(*e));
Expand All @@ -65,6 +66,69 @@ namespace hpx::parallel::detail {
}
};

template <typename ExPolicy>
struct sequential_reduce_deterministic_rfa_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_rfa_t<ExPolicy>>
{
private:
template <typename InIterB, typename T>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, std::size_t partition_size, T init,
std::true_type&&)
{
// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.zero();
rfa += init;
size_t count = 0;
T max_val = std::abs(std::numeric_limits<T>::min());
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim < partition_size;
partition_size_lim++, e++)
{
T temp_max_val = std::abs(static_cast<T>(*e));
if (max_val < temp_max_val)
{
rfa.set_max_abs_val(temp_max_val);
max_val = temp_max_val;
}
rfa.unsafe_add(*e);
count++;
if (count == rfa.endurance())
{
rfa.renorm();
count = 0;
}
}
return rfa;
}

template <typename InIterB, typename T>
friend constexpr T tag_fallback_invoke(
sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first,
std::size_t partition_size, T init, std::false_type&&)
{
// __rfa_bin_host_buffer__ should be initialized by the frontend of
// this method

T rfa;
rfa.zero();
rfa += init;
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim < partition_size;
partition_size_lim++, e++)
{
rfa += (*e);
}
return rfa;
}
};

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_t<ExPolicy>
Expand All @@ -80,4 +144,18 @@ namespace hpx::parallel::detail {
}
#endif

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_rfa_t<ExPolicy>
sequential_reduce_deterministic_rfa =
sequential_reduce_deterministic_rfa_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa(
Args&&... args)
{
return sequential_reduce_deterministic_rfa_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif
} // namespace hpx::parallel::detail
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ namespace hpx::parallel::detail::rfa {
///The number of deposits that can be performed before a renorm is necessary.
///Applies also to binned complex double precision.
static constexpr auto ENDURANCE = 1 << (MANT_DIG - BIN_WIDTH - 2);

///Return a binned floating-point reference bin
inline const ftype* binned_bins(const int x) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "detail/reduce_deterministic.hpp"
#if defined(DOXYGEN)

namespace hpx {
Expand Down Expand Up @@ -396,10 +397,67 @@ namespace hpx::parallel {
static constexpr T sequential(ExPolicy&& policy, InIterB first,
InIterE last, T_&& init, Reduce&& r)
{
// TODO: abstract initializing memory
hpx::parallel::detail::rfa::RFA_bins<T_> bins;
bins.initialize_bins();
std::memcpy(hpx::parallel::detail::rfa::__rfa_bin_host_buffer__,
&bins, sizeof(bins));
return hpx::parallel::detail::sequential_reduce_deterministic<
ExPolicy>(HPX_FORWARD(ExPolicy, policy), first, last,
HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r));
}

template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
typename T_, typename Reduce>
static util::detail::algorithm_result_t<ExPolicy, T> parallel(
ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init,
Reduce&& r)
{
(void) r;
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}

// TODO: abstract initializing memory
hpx::parallel::detail::rfa::RFA_bins<T_> bins;
bins.initialize_bins();
std::memcpy(hpx::parallel::detail::rfa::__rfa_bin_host_buffer__,
&bins, sizeof(bins));

auto f1 = [policy](FwdIterB part_begin, std::size_t part_size)
-> hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_> {
T_ val = *part_begin;
// Assumed that __rfa_bin_host_buffer__ is initiallized
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), ++part_begin,
--part_size, HPX_MOVE(val),
std::true_type{});
};

return util::partitioner<ExPolicy, T_,
hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
T_>>::call(HPX_FORWARD(ExPolicy, policy), first,
detail::distance(first, last), HPX_MOVE(f1),
hpx::unwrapping([policy, init](auto&& results) -> T_ {
// Assumed that __rfa_bin_host_buffer__ is initiallized
hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_>
rfa;
rfa.zero();
rfa += init;
return hpx::parallel::detail::
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy),
hpx::util::begin(results),
hpx::util::size(results), HPX_MOVE(rfa),
std::false_type{})
.conv();
}));
}
};
/// \endcond
} // namespace detail
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2024 Shreyas Atre
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <cstddef>

#if !defined(HPX_COMPUTE_DEVICE_CODE)
#include <hpx/algorithm.hpp>
#include <hpx/chrono.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>
#include <hpx/parallel/algorithms/reduce.hpp>
#include <hpx/parallel/algorithms/reduce_deterministic.hpp>

#include <numeric>
#include <random>
#include <vector>

int seed = 1000;
std::mt19937 gen(seed);

template <typename T>
T get_rand(T LO = (std::numeric_limits<T>::min)(),
T HI = (std::numeric_limits<T>::max)())
{
return LO +
static_cast<T>(std::rand()) /
(static_cast<T>(static_cast<T>((RAND_MAX)) / (HI - LO)));
}

///////////////////////////////////////////////////////////////////////////////

void bench_reduce_deterministic(const auto& policy,
const auto& deterministic_shuffled, const auto& val_det, const auto& op)
{
// check if different type for deterministic and nondeeterministic
// and same result

auto r1_shuffled =
hpx::reduce_deterministic(policy, std::begin(deterministic_shuffled),
std::end(deterministic_shuffled), val_det, op);

HPX_UNUSED(r1_shuffled);
}

void bench_reduce(const auto& policy, const auto& deterministic_shuffled,
const auto& val_det, const auto& op)
{
auto r = hpx::reduce(policy, (std::begin(deterministic_shuffled)),
(std::end(deterministic_shuffled)), val_det, op);

HPX_UNUSED(r);
}

//////////////////////////////////////////////////////////////////////////////
int hpx_main(hpx::program_options::variables_map& vm)
{
std::srand(seed);

auto test_count = vm["test_count"].as<int>();
std::size_t vector_size = vm["vector-size"].as<std::size_t>();

hpx::util::perftests_init(vm);

// verify that input is within domain of program
if (test_count == 0 || test_count < 0)
{
std::cerr << "test_count cannot be zero or negative...\n" << std::flush;
hpx::local::finalize();
return -1;
}

{
using FloatTypeDeterministic = float;
std::size_t LEN = vector_size;

constexpr FloatTypeDeterministic num_bounds_det =
std::is_same_v<FloatTypeDeterministic, float> ? 1000.0 : 1000000.0;

std::vector<FloatTypeDeterministic> deterministic(LEN);

for (size_t i = 0; i < LEN; ++i)
{
deterministic[i] = get_rand<FloatTypeDeterministic>(
-num_bounds_det, num_bounds_det);
}

std::vector<FloatTypeDeterministic> deterministic_shuffled =
deterministic;

std::shuffle(
deterministic_shuffled.begin(), deterministic_shuffled.end(), gen);

FloatTypeDeterministic val_det(41.999);

auto op = [](FloatTypeDeterministic v1, FloatTypeDeterministic v2) {
return v1 + v2;
};
{
hpx::util::perftests_report("reduce", "seq", test_count, [&]() {
bench_reduce(
hpx::execution::seq, deterministic_shuffled, val_det, op);
});
}
{
hpx::util::perftests_report("reduce", "par", test_count, [&]() {
bench_reduce(
hpx::execution::par, deterministic_shuffled, val_det, op);
});
}
{
hpx::util::perftests_report(
"reduce deterministic", "seq", test_count, [&]() {
bench_reduce_deterministic(hpx::execution::seq,
deterministic_shuffled, val_det, op);
});
}
{
hpx::util::perftests_report(
"reduce deterministic", "par", test_count, [&]() {
bench_reduce_deterministic(hpx::execution::par,
deterministic_shuffled, val_det, op);
});
}

hpx::util::perftests_print_times();
}

return hpx::local::finalize();
}

///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
using namespace hpx::program_options;

options_description cmdline("usage: " HPX_APPLICATION_STRING " [options]");

// clang-format off
cmdline.add_options()
("test_count", value<int>()->default_value(100),
"number of tests to be averaged")
("vector-size", value<std::size_t>()->default_value(1000000),
"number of elements to be reduced")
;
// clang-format on

hpx::util::perftests_cfg(cmdline);
hpx::local::init_params init_args;
init_args.desc_cmdline = cmdline;
init_args.cfg = {"hpx.os_threads=all"};

return hpx::local::init(hpx_main, argc, argv, init_args);
}
#endif
4 changes: 4 additions & 0 deletions libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,7 @@ foreach(test ${tests})
"modules.algorithms.algorithms" ${test} ${${test}_PARAMETERS}
)
endforeach()

target_compile_options(reduce_deterministic_test PRIVATE -fsanitize=address)

target_link_options(reduce_deterministic_test PRIVATE -fsanitize=address)
Loading

0 comments on commit 4cc10fb

Please sign in to comment.