diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index b630a4b99f..82d57f05f3 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -63,9 +63,9 @@ class function_input_base : public receiver, no_assign { static_assert(!has_policy::value || !has_policy::value, ""); //! Constructor for function_input_base - function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority ) + function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw ) : my_graph_ref(g), my_max_concurrency(max_concurrency) - , my_concurrency(0), my_priority(a_priority) + , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw) , my_queue(!has_policy::value ? new input_queue_type() : NULL) , my_predecessors(this) , forwarder_busy(false) @@ -75,7 +75,7 @@ class function_input_base : public receiver, no_assign { //! Copy constructor function_input_base( const function_input_base& src ) - : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {} + : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {} //! Destructor // The queue is allocated by the constructor for {multi}function_node. @@ -87,7 +87,10 @@ class function_input_base : public receiver, no_assign { } graph_task* try_put_task( const input_type& t) override { - return try_put_task_impl(t, has_policy()); + if ( my_is_no_throw ) + return try_put_task_impl(t, has_policy()); + else + return try_put_task_impl(t, std::false_type()); } //! Adds src to the list of cached predecessors. @@ -121,6 +124,7 @@ class function_input_base : public receiver, no_assign { const size_t my_max_concurrency; size_t my_concurrency; node_priority_t my_priority; + const bool my_is_no_throw; input_queue_type *my_queue; predecessor_cache my_predecessors; @@ -357,7 +361,7 @@ class function_input : public function_input_base function_input( graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority ) - : base_type(g, max_concurrency, a_priority) + : base_type(g, max_concurrency, a_priority, noexcept(body(input_type()))) , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) { } @@ -492,7 +496,7 @@ class multifunction_input : public function_input_base multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) - : base_type(g, max_concurrency, a_priority) + : base_type(g, max_concurrency, a_priority, noexcept(body(input_type(), my_output_ports))) , my_body( new multifunction_body_leaf(body) ) , my_init_body( new multifunction_body_leaf(body) ) , my_output_ports(init_output_ports::call(g, my_output_ports)){ diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 91a46577b7..0fbab190b0 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -2820,6 +2820,9 @@ class async_body_base: no_assign { template class async_body: public async_body_base { +private: + Body my_body; + public: typedef async_body_base base_type; typedef Gateway gateway_type; @@ -2827,14 +2830,11 @@ class async_body: public async_body_base { async_body(const Body &body, gateway_type *gateway) : base_type(gateway), my_body(body) { } - void operator()( const Input &v, Ports & ) { + void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval()))) { my_body(v, *this->my_gateway); } Body get_body() { return my_body; } - -private: - Body my_body; }; //! Implements async node diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index df9a4b1cfb..709a9a5971 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -24,6 +24,7 @@ #include "config.h" #include "oneapi/tbb/flow_graph.h" +#include "oneapi/tbb/task.h" #include "oneapi/tbb/null_rw_mutex.h" #include "oneapi/tbb/concurrent_unordered_set.h" @@ -688,7 +689,7 @@ class native_loop_body { public: native_loop_body(NodeType& node) : my_node(node) {} - void operator()(int) const { + void operator()(int) const noexcept { std::thread::id this_id = std::this_thread::get_id(); my_node.try_put(this_id); } @@ -701,9 +702,9 @@ class concurrency_checker_body { concurrency_checker_body() { g_body_count = 0; } template - void operator()(const std::thread::id& input, gateway_type&) { increase_and_check(input); } + void operator()(const std::thread::id& input, gateway_type&) noexcept { increase_and_check(input); } - output_tuple_type operator()(const std::thread::id& input) { + output_tuple_type operator()(const std::thread::id& input) noexcept { increase_and_check(input); return output_tuple_type(); } @@ -740,7 +741,7 @@ class native_loop_limited_body { public: native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier): my_node(node), my_barrier(barrier) {} - void operator()(int) const { + void operator()(int) const noexcept { std::thread::id this_id = std::this_thread::get_id(); my_node.try_put(this_id); if(!lightweight_work_processed) { @@ -760,6 +761,7 @@ struct condition_predicate { std::atomic g_lightweight_count; std::atomic g_task_count; +template class limited_lightweight_checker_body { public: limited_lightweight_checker_body() { @@ -770,10 +772,9 @@ class limited_lightweight_checker_body { private: void increase_and_check(const std::thread::id& /*input*/) { ++g_body_count; - // TODO revamp: in order not to rely on scheduler functionality anymore add - // __TBB_EXTRA_DEBUG for counting the number of tasks actually created by the flow graph, - // hence consider moving lightweight testing into whitebox test for the flow graph. - bool is_inside_task = false;/*tbb::task::self().state() == tbb::task::executing;*/ + + bool is_inside_task = oneapi::tbb::task::current_context() != nullptr; + if(is_inside_task) { ++g_task_count; } else { @@ -785,10 +786,10 @@ class limited_lightweight_checker_body { } public: template - void operator()(const std::thread::id& input, gateway_type&) { + void operator()(const std::thread::id& input, gateway_type&) noexcept(NoExcept) { increase_and_check(input); } - output_tuple_type operator()(const std::thread::id& input) { + output_tuple_type operator()(const std::thread::id& input) noexcept(NoExcept) { increase_and_check(input); return output_tuple_type(); } @@ -799,25 +800,102 @@ void test_limited_lightweight_execution(unsigned N, unsigned concurrency) { CHECK_MESSAGE(concurrency != tbb::flow::unlimited, "Test for limited concurrency cannot be called with unlimited concurrency argument"); tbb::flow::graph g; - NodeType node(g, concurrency, limited_lightweight_checker_body()); + NodeType node(g, concurrency, limited_lightweight_checker_body()); // Execute first body as lightweight, then wait for all other threads to fill internal buffer. // Then unblock the lightweight thread and check if other body executions are inside oneTBB task. utils::SpinBarrier barrier(N - concurrency); utils::NativeParallelFor(N, native_loop_limited_body(node, barrier)); g.wait_for_all(); CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); - // TODO revamp: enable the following checks once whitebox flow graph testing is ready for it. - // CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once"); - // CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times"); + CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once"); + CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times"); work_submitted = false; lightweight_work_processed = false; } +template +void test_limited_lightweight_execution_with_throwing_body(unsigned N, unsigned concurrency) { + CHECK_MESSAGE(concurrency != tbb::flow::unlimited, + "Test for limited concurrency cannot be called with unlimited concurrency argument"); + tbb::flow::graph g; + NodeType node(g, concurrency, limited_lightweight_checker_body()); + // Body is no noexcept, in this case it must be executed as tasks, instead of lightweight execution + utils::SpinBarrier barrier(N); + utils::NativeParallelFor(N, native_loop_limited_body(node, barrier)); + g.wait_for_all(); + CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); + CHECK_MESSAGE(g_lightweight_count == 0, "Body needs to be executed with queueing policy"); + CHECK_MESSAGE(g_task_count == N, "Body needs to be executed as task N times"); + work_submitted = false; + lightweight_work_processed = false; +} + +template +struct throwing_body{ + std::atomic& my_counter; + + throwing_body(std::atomic& counter) : my_counter(counter) {} + + template + void operator()(const input_type&, gateway_type&) { + ++my_counter; + if(my_counter == Threshold) + throw Threshold; + } + + template + output_tuple_type operator()(const input_type&) { + ++my_counter; + if(my_counter == Threshold) + throw Threshold; + return output_tuple_type(); + } +}; + +#if TBB_USE_EXCEPTIONS +//! Test excesption thrown in node with lightweight policy was rethrown by graph +template class NodeType> +void test_exception_ligthweight_policy(){ + std::atomic counter {0}; + constexpr int threshold = 10; + + using IndexerNodeType = oneapi::tbb::flow::indexer_node; + using FuncNodeType = NodeType; + oneapi::tbb::flow::graph g; + + IndexerNodeType indexer(g); + FuncNodeType tested_node(g, oneapi::tbb::flow::serial, throwing_body(counter)); + oneapi::tbb::flow::make_edge(indexer, tested_node); + + utils::NativeParallelFor( threshold * 2, [&](int i){ + if(i % 2) + std::get<1>(indexer.input_ports()).try_put(1); + else + std::get<0>(indexer.input_ports()).try_put(0); + }); + + bool catchException = false; + try + { + g.wait_for_all(); + } + catch (const int& exc) + { + catchException = true; + CHECK_MESSAGE( exc == threshold, "graph.wait_for_all() rethrow current exception" ); + } + CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" ); + CHECK_MESSAGE( counter == threshold, "Graph must cancel all tasks after exception" ); +} +#endif /* TBB_USE_EXCEPTIONS */ + template void test_lightweight(unsigned N) { test_unlimited_lightweight_execution(N); test_limited_lightweight_execution(N, tbb::flow::serial); test_limited_lightweight_execution(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2)); + + test_limited_lightweight_execution_with_throwing_body(N, tbb::flow::serial); } template class NodeType> @@ -825,6 +903,10 @@ void test(unsigned N) { typedef std::thread::id input_type; typedef NodeType node_type; test_lightweight(N); + +#if TBB_USE_EXCEPTIONS + test_exception_ligthweight_policy(); +#endif /* TBB_USE_EXCEPTIONS */ } } // namespace lightweight_testing diff --git a/test/tbb/test_flow_graph_priorities.cpp b/test/tbb/test_flow_graph_priorities.cpp index e4f22ba270..5c798063ab 100644 --- a/test/tbb/test_flow_graph_priorities.cpp +++ b/test/tbb/test_flow_graph_priorities.cpp @@ -332,7 +332,11 @@ struct DeciderBody { struct AsyncSubmissionBody { AsyncActivity* my_activity; - void operator()(data_type input, async_node_type::gateway_type& gateway) { + // It is important that async_node in the test executes without spawning a TBB task, because + // it passes the work to asynchronous thread, which unlocks the barrier that is waited + // by every execution thread (asynchronous thread and any TBB worker or main thread). + // This is why async_node's body marked noexcept. + void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept { my_activity->submit(input, &gateway); } AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}