Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch exception in lightweight node #623

Merged
merged 17 commits into from
Dec 8, 2021
16 changes: 10 additions & 6 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class function_input_base : public receiver<Input>, no_assign {
static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");

//! Constructor for function_input_base
function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority )
function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
Iliamish marked this conversation as resolved.
Show resolved Hide resolved
: my_graph_ref(g), my_max_concurrency(max_concurrency)
, my_concurrency(0), my_priority(a_priority)
, my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
, my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
, my_predecessors(this)
, forwarder_busy(false)
Expand All @@ -75,7 +75,7 @@ class function_input_base : public receiver<Input>, no_assign {

//! Copy constructor
function_input_base( const function_input_base& src )
: function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {}
: function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}

//! Destructor
// The queue is allocated by the constructor for {multi}function_node.
Expand All @@ -87,7 +87,10 @@ class function_input_base : public receiver<Input>, no_assign {
}

graph_task* try_put_task( const input_type& t) override {
return try_put_task_impl(t, has_policy<lightweight, Policy>());
if ( my_is_no_throw )
return try_put_task_impl(t, has_policy<lightweight, Policy>());
else
return try_put_task_impl(t, std::false_type());
}

//! Adds src to the list of cached predecessors.
Expand Down Expand Up @@ -121,6 +124,7 @@ class function_input_base : public receiver<Input>, no_assign {
const size_t my_max_concurrency;
size_t my_concurrency;
node_priority_t my_priority;
const bool my_is_no_throw;
aleksei-fedotov marked this conversation as resolved.
Show resolved Hide resolved
input_queue_type *my_queue;
predecessor_cache<input_type, null_mutex > my_predecessors;

Expand Down Expand Up @@ -357,7 +361,7 @@ class function_input : public function_input_base<Input, Policy, A, function_inp
template<typename Body>
function_input(
graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
: base_type(g, max_concurrency, a_priority)
: base_type(g, max_concurrency, a_priority, noexcept(body(input_type())))
, my_body( new function_body_leaf< input_type, output_type, Body>(body) )
, my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
}
Expand Down Expand Up @@ -492,7 +496,7 @@ class multifunction_input : public function_input_base<Input, Policy, A, multifu
// constructor
template<typename Body>
multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
: base_type(g, max_concurrency, a_priority)
: base_type(g, max_concurrency, a_priority, noexcept(body(input_type(), my_output_ports)))
, my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
, my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
Expand Down
8 changes: 4 additions & 4 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -2820,21 +2820,21 @@ class async_body_base: no_assign {

template<typename Input, typename Ports, typename Gateway, typename Body>
class async_body: public async_body_base<Gateway> {
private:
Body my_body;

aleksei-fedotov marked this conversation as resolved.
Show resolved Hide resolved
public:
typedef async_body_base<Gateway> base_type;
typedef Gateway gateway_type;

async_body(const Body &body, gateway_type *gateway)
: base_type(gateway), my_body(body) { }

void operator()( const Input &v, Ports & ) {
void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval<gateway_type&>()))) {
my_body(v, *this->my_gateway);
}

Body get_body() { return my_body; }

private:
Body my_body;
};

//! Implements async node
Expand Down
110 changes: 96 additions & 14 deletions test/common/graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "config.h"

#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/task.h"
#include "oneapi/tbb/null_rw_mutex.h"
#include "oneapi/tbb/concurrent_unordered_set.h"

Expand Down Expand Up @@ -688,7 +689,7 @@ class native_loop_body {
public:
native_loop_body(NodeType& node) : my_node(node) {}

void operator()(int) const {
void operator()(int) const noexcept {
std::thread::id this_id = std::this_thread::get_id();
my_node.try_put(this_id);
}
Expand All @@ -701,9 +702,9 @@ class concurrency_checker_body {
concurrency_checker_body() { g_body_count = 0; }

template<typename gateway_type>
void operator()(const std::thread::id& input, gateway_type&) { increase_and_check(input); }
void operator()(const std::thread::id& input, gateway_type&) noexcept { increase_and_check(input); }

output_tuple_type operator()(const std::thread::id& input) {
output_tuple_type operator()(const std::thread::id& input) noexcept {
increase_and_check(input);
return output_tuple_type();
}
Expand Down Expand Up @@ -740,7 +741,7 @@ class native_loop_limited_body {
public:
native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier):
my_node(node), my_barrier(barrier) {}
void operator()(int) const {
void operator()(int) const noexcept {
std::thread::id this_id = std::this_thread::get_id();
my_node.try_put(this_id);
if(!lightweight_work_processed) {
Expand All @@ -760,6 +761,7 @@ struct condition_predicate {
std::atomic<unsigned> g_lightweight_count;
std::atomic<unsigned> g_task_count;

template <bool NoExcept>
class limited_lightweight_checker_body {
public:
limited_lightweight_checker_body() {
Expand All @@ -770,10 +772,9 @@ class limited_lightweight_checker_body {
private:
void increase_and_check(const std::thread::id& /*input*/) {
++g_body_count;
// TODO revamp: in order not to rely on scheduler functionality anymore add
// __TBB_EXTRA_DEBUG for counting the number of tasks actually created by the flow graph,
// hence consider moving lightweight testing into whitebox test for the flow graph.
bool is_inside_task = false;/*tbb::task::self().state() == tbb::task::executing;*/

bool is_inside_task = oneapi::tbb::task::current_context() != nullptr;

if(is_inside_task) {
++g_task_count;
} else {
Expand All @@ -785,10 +786,10 @@ class limited_lightweight_checker_body {
}
public:
template<typename gateway_type>
void operator()(const std::thread::id& input, gateway_type&) {
void operator()(const std::thread::id& input, gateway_type&) noexcept(NoExcept) {
increase_and_check(input);
}
output_tuple_type operator()(const std::thread::id& input) {
output_tuple_type operator()(const std::thread::id& input) noexcept(NoExcept) {
increase_and_check(input);
return output_tuple_type();
}
Expand All @@ -799,32 +800,113 @@ void test_limited_lightweight_execution(unsigned N, unsigned concurrency) {
CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
"Test for limited concurrency cannot be called with unlimited concurrency argument");
tbb::flow::graph g;
NodeType node(g, concurrency, limited_lightweight_checker_body());
NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/true>());
// Execute first body as lightweight, then wait for all other threads to fill internal buffer.
// Then unblock the lightweight thread and check if other body executions are inside oneTBB task.
utils::SpinBarrier barrier(N - concurrency);
utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
g.wait_for_all();
CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
// TODO revamp: enable the following checks once whitebox flow graph testing is ready for it.
// CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
// CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
work_submitted = false;
lightweight_work_processed = false;
}

template<typename NodeType>
void test_limited_lightweight_execution_with_throwing_body(unsigned N, unsigned concurrency) {
CHECK_MESSAGE(concurrency != tbb::flow::unlimited,
"Test for limited concurrency cannot be called with unlimited concurrency argument");
tbb::flow::graph g;
NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/false>());
// Body is no noexcept, in this case it must be executed as tasks, instead of lightweight execution
utils::SpinBarrier barrier(N);
utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
g.wait_for_all();
CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times");
CHECK_MESSAGE(g_lightweight_count == 0, "Body needs to be executed with queueing policy");
CHECK_MESSAGE(g_task_count == N, "Body needs to be executed as task N times");
work_submitted = false;
lightweight_work_processed = false;
}

template <int Threshold>
struct throwing_body{
std::atomic<int>& my_counter;

throwing_body(std::atomic<int>& counter) : my_counter(counter) {}

template<typename input_type, typename gateway_type>
void operator()(const input_type&, gateway_type&) {
++my_counter;
if(my_counter == Threshold)
throw Threshold;
}

template<typename input_type>
output_tuple_type operator()(const input_type&) {
++my_counter;
if(my_counter == Threshold)
throw Threshold;
return output_tuple_type();
}
};

#if TBB_USE_EXCEPTIONS
//! Test excesption thrown in node with lightweight policy was rethrown by graph
template<template<typename, typename, typename> class NodeType>
void test_exception_ligthweight_policy(){
std::atomic<int> counter {0};
constexpr int threshold = 10;

using IndexerNodeType = oneapi::tbb::flow::indexer_node<int, int>;
using FuncNodeType = NodeType<IndexerNodeType::output_type, output_tuple_type, tbb::flow::lightweight>;
oneapi::tbb::flow::graph g;

IndexerNodeType indexer(g);
FuncNodeType tested_node(g, oneapi::tbb::flow::serial, throwing_body<threshold>(counter));
oneapi::tbb::flow::make_edge(indexer, tested_node);

utils::NativeParallelFor( threshold * 2, [&](int i){
if(i % 2)
std::get<1>(indexer.input_ports()).try_put(1);
else
std::get<0>(indexer.input_ports()).try_put(0);
});

bool catchException = false;
try
{
g.wait_for_all();
}
catch (const int& exc)
{
catchException = true;
CHECK_MESSAGE( exc == threshold, "graph.wait_for_all() rethrow current exception" );
}
CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" );
CHECK_MESSAGE( counter == threshold, "Graph must cancel all tasks after exception" );
}
#endif /* TBB_USE_EXCEPTIONS */

template<typename NodeType>
void test_lightweight(unsigned N) {
test_unlimited_lightweight_execution<NodeType>(N);
test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial);
test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2));

test_limited_lightweight_execution_with_throwing_body<NodeType>(N, tbb::flow::serial);
}

template<template<typename, typename, typename> class NodeType>
void test(unsigned N) {
typedef std::thread::id input_type;
typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type;
test_lightweight<node_type>(N);

#if TBB_USE_EXCEPTIONS
test_exception_ligthweight_policy<NodeType>();
#endif /* TBB_USE_EXCEPTIONS */
}

} // namespace lightweight_testing
Expand Down
6 changes: 5 additions & 1 deletion test/tbb/test_flow_graph_priorities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ struct DeciderBody {

struct AsyncSubmissionBody {
AsyncActivity* my_activity;
void operator()(data_type input, async_node_type::gateway_type& gateway) {
// It is important that async_node in the test executes without spawning a TBB task, because
// it passes the work to asynchronous thread, which unlocks the barrier that is waited
// by every execution thread (asynchronous thread and any TBB worker or main thread).
// This is why async_node's body marked noexcept.
void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
Iliamish marked this conversation as resolved.
Show resolved Hide resolved
my_activity->submit(input, &gateway);
}
AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
Expand Down