From ab8698bb521970662bcdc1947984c23b0920ebbf Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Wed, 10 Nov 2021 22:17:14 +0300 Subject: [PATCH 1/8] Fix limiter_node decrementer Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 22 +++++++++++++------ test/tbb/test_limiter_node.cpp | 38 +++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 91a46577b7..6593f095df 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1898,6 +1898,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > spin_mutex::scoped_lock lock(my_mutex); if( delta > 0 && size_t(delta) > my_count ) { my_count = 0; + if( my_tries > 0 ){ + if(size_t(delta) - my_count > my_tries){ + my_tries = 0; + }else{ + my_tries -= (size_t(delta) - my_count); + } + } } else if( delta < 0 && size_t(-delta) > my_threshold - my_count ) { my_count = my_threshold; @@ -1941,7 +1948,8 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - --my_tries; + if( my_tries ) + --my_tries; my_predecessors.try_consume(); if ( check_conditions() ) { if ( is_graph_active(this->my_graph) ) { @@ -1961,7 +1969,8 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > //if we can reserve but can't put, we decrement the tries and release the reservation { spin_mutex::scoped_lock lock(my_mutex); - --my_tries; + if( my_tries ) + --my_tries; if (reserved) my_predecessors.try_release(); if ( check_conditions() ) { if ( is_graph_active(this->my_graph) ) { @@ -2071,10 +2080,10 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > } graph_task* rtask = my_successors.try_put_task(t); - if ( !rtask ) { // try_put_task failed. spin_mutex::scoped_lock lock(my_mutex); - --my_tries; + if( my_tries ) + --my_tries; if (check_conditions() && is_graph_active(this->my_graph)) { small_object_allocator allocator{}; typedef forward_task_bypass> task_type; @@ -2085,8 +2094,9 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > else { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - --my_tries; - } + if( my_tries ) + --my_tries; + } return rtask; } diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index 398a4280e2..4c60f0417f 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -433,8 +433,7 @@ void test_decrementer() { "Limiter node decrementer's port does not accept message" ); m = 0; - while( limit3.try_put( m ) ){ m++; }; - CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); + while( limit3.try_put( m++ ) ){}; actual = -1; m = 0; while( queue.try_get(actual) ){ @@ -517,6 +516,35 @@ void test_deduction_guides() { } #endif +void test_decrement_while_try_put_task() { + constexpr int threshold = 50000; + + tbb::flow::graph graph{}; + std::atomic processed; + tbb::flow::input_node input{ graph, [&](tbb::flow_control & fc) -> int { + static int i = {}; + if (i++ >= threshold) fc.stop(); + return i; + }}; + tbb::flow::limiter_node blockingNode{ graph, 1 }; + tbb::flow::multifunction_node> processing{ graph, tbb::flow::serial, + [&](const int & value, typename decltype(processing)::output_ports_type & out) { + if (value != threshold) + std::get<0>(out).try_put(1); + processed.store(value); + }}; + + tbb::flow::make_edge(input, blockingNode); + tbb::flow::make_edge(blockingNode, processing); + tbb::flow::make_edge(processing, blockingNode.decrementer()); + + input.activate(); + + graph.wait_for_all(); + CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work"); +} + + //! Test puts on limiter_node with decrements and varying parallelism levels //! \brief \ref error_guessing TEST_CASE("Serial and parallel tests") { @@ -537,6 +565,12 @@ TEST_CASE("Test continue_msg reception") { test_continue_msg_reception(); } +//! Test put message on decrementer port does not stop message flow +//! \brief \ref error_guessing +TEST_CASE("Test try_put to decrementer while try_put to limiter_node") { + test_decrement_while_try_put_task(); +} + //! Test multifunction_node connected to limiter_node //! \brief \ref error_guessing TEST_CASE("Multifunction connected to limiter") { From e512424b12deb86d770b9fc1fb13b5ecf3555901 Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Wed, 10 Nov 2021 22:19:27 +0300 Subject: [PATCH 2/8] Fix align Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 5 +++-- test/tbb/test_limiter_node.cpp | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 6593f095df..935c60d086 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1899,9 +1899,10 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > if( delta > 0 && size_t(delta) > my_count ) { my_count = 0; if( my_tries > 0 ){ - if(size_t(delta) - my_count > my_tries){ + if( size_t(delta) - my_count > my_tries ){ my_tries = 0; - }else{ + } + else { my_tries -= (size_t(delta) - my_count); } } diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index 4c60f0417f..e3ece18a1a 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -433,7 +433,8 @@ void test_decrementer() { "Limiter node decrementer's port does not accept message" ); m = 0; - while( limit3.try_put( m++ ) ){}; + while( limit3.try_put( m ) ){ m++; }; + CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); actual = -1; m = 0; while( queue.try_get(actual) ){ From 91a7606b77343c74069fa17d47ba14e386690b92 Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Thu, 11 Nov 2021 15:17:25 +0300 Subject: [PATCH 3/8] add new variable for check unused decrements Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 51 ++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 935c60d086..6e82fb86d6 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "detail/_config.h" #include "detail/_namespace_injection.h" @@ -1886,6 +1887,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > size_t my_threshold; size_t my_count; // number of successful puts size_t my_tries; // number of active put attempts + size_t my_future_decrement; // number of active decrement reservable_predecessor_cache< T, spin_mutex > my_predecessors; spin_mutex my_mutex; broadcast_cache< T > my_successors; @@ -1894,18 +1896,19 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > threshold_regulator< limiter_node, DecrementType > decrement; graph_task* decrement_counter( long long delta ) { + if (delta > 0 && size_t(delta) > my_threshold ){ + delta = my_threshold; + } + if ( delta < 0 && size_t(-delta) > my_threshold ){ + delta = -my_threshold; + } { spin_mutex::scoped_lock lock(my_mutex); if( delta > 0 && size_t(delta) > my_count ) { - my_count = 0; if( my_tries > 0 ){ - if( size_t(delta) - my_count > my_tries ){ - my_tries = 0; - } - else { - my_tries -= (size_t(delta) - my_count); - } + my_future_decrement += (size_t(delta) - my_count); } + my_count = 0; } else if( delta < 0 && size_t(-delta) > my_threshold - my_count ) { my_count = my_threshold; @@ -1949,8 +1952,16 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if( my_tries ) - --my_tries; + if(my_future_decrement){ + if(my_count > my_future_decrement){ + my_count -= my_future_decrement; + my_future_decrement = 0; + }else{ + my_future_decrement -= my_count; + my_count = 0; + } + } + --my_tries; my_predecessors.try_consume(); if ( check_conditions() ) { if ( is_graph_active(this->my_graph) ) { @@ -1970,8 +1981,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > //if we can reserve but can't put, we decrement the tries and release the reservation { spin_mutex::scoped_lock lock(my_mutex); - if( my_tries ) - --my_tries; + --my_tries; if (reserved) my_predecessors.try_release(); if ( check_conditions() ) { if ( is_graph_active(this->my_graph) ) { @@ -1998,8 +2008,8 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > public: //! Constructor limiter_node(graph &g, size_t threshold) - : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_predecessors(this) - , my_successors(this), decrement(this) + : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0), + my_predecessors(this) , my_successors(this), decrement(this) { initialize(); } @@ -2083,8 +2093,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > graph_task* rtask = my_successors.try_put_task(t); if ( !rtask ) { // try_put_task failed. spin_mutex::scoped_lock lock(my_mutex); - if( my_tries ) - --my_tries; + --my_tries; if (check_conditions() && is_graph_active(this->my_graph)) { small_object_allocator allocator{}; typedef forward_task_bypass> task_type; @@ -2095,8 +2104,16 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > else { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if( my_tries ) - --my_tries; + if(my_future_decrement){ + if(my_count > my_future_decrement){ + my_count -= my_future_decrement; + my_future_decrement = 0; + }else{ + my_future_decrement -= my_count; + my_count = 0; + } + } + --my_tries; } return rtask; } From ae2b44146ac37970ee3843b8636cf2450f1add1e Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Thu, 11 Nov 2021 15:19:02 +0300 Subject: [PATCH 4/8] remove iostream Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 6e82fb86d6..61fa715492 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -20,7 +20,6 @@ #include #include #include -#include #include "detail/_config.h" #include "detail/_namespace_injection.h" From ce046600ff38e8f6e53d83e8ef778f756cd3a432 Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Thu, 11 Nov 2021 15:41:57 +0300 Subject: [PATCH 5/8] remove unnesessary cast Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 61fa715492..cb399c3bbb 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1898,9 +1898,6 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > if (delta > 0 && size_t(delta) > my_threshold ){ delta = my_threshold; } - if ( delta < 0 && size_t(-delta) > my_threshold ){ - delta = -my_threshold; - } { spin_mutex::scoped_lock lock(my_mutex); if( delta > 0 && size_t(delta) > my_count ) { From f51fb9000eec5428f8f45d08801b6e29d102ecdf Mon Sep 17 00:00:00 2001 From: "Mishin, Ilya" Date: Fri, 12 Nov 2021 14:09:15 +0300 Subject: [PATCH 6/8] align whitespaces Signed-off-by: Mishin, Ilya --- include/oneapi/tbb/flow_graph.h | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index cb399c3bbb..2eec176d11 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1900,13 +1900,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > } { spin_mutex::scoped_lock lock(my_mutex); - if( delta > 0 && size_t(delta) > my_count ) { + if ( delta > 0 && size_t(delta) > my_count ) { if( my_tries > 0 ){ my_future_decrement += (size_t(delta) - my_count); } my_count = 0; } - else if( delta < 0 && size_t(-delta) > my_threshold - my_count ) { + else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) { my_count = my_threshold; } else { @@ -1948,11 +1948,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if(my_future_decrement){ - if(my_count > my_future_decrement){ + if ( my_future_decrement ){ + if ( my_count > my_future_decrement ){ my_count -= my_future_decrement; my_future_decrement = 0; - }else{ + } + else{ my_future_decrement -= my_count; my_count = 0; } @@ -2100,11 +2101,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > else { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if(my_future_decrement){ - if(my_count > my_future_decrement){ + if ( my_future_decrement ){ + if ( my_count > my_future_decrement ){ my_count -= my_future_decrement; my_future_decrement = 0; - }else{ + } + else{ my_future_decrement -= my_count; my_count = 0; } From ee3b7b6013f4aa5a8e363c12cac3810d728278d2 Mon Sep 17 00:00:00 2001 From: "Serov, Vladimir" Date: Mon, 22 Nov 2021 09:29:01 +0300 Subject: [PATCH 7/8] Fix whitespace alignment Signed-off-by: Serov, Vladimir --- include/oneapi/tbb/flow_graph.h | 49 +++++++++++++++++---------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 2eec176d11..fe77e7edab 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1895,13 +1895,14 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > threshold_regulator< limiter_node, DecrementType > decrement; graph_task* decrement_counter( long long delta ) { - if (delta > 0 && size_t(delta) > my_threshold ){ + if ( delta > 0 && size_t(delta) > my_threshold ) { delta = my_threshold; } + { spin_mutex::scoped_lock lock(my_mutex); if ( delta > 0 && size_t(delta) > my_count ) { - if( my_tries > 0 ){ + if( my_tries > 0 ) { my_future_decrement += (size_t(delta) - my_count); } my_count = 0; @@ -1931,29 +1932,30 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > input_type v; graph_task* rval = NULL; bool reserved = false; - { - spin_mutex::scoped_lock lock(my_mutex); - if ( check_conditions() ) - ++my_tries; - else - return NULL; - } + + { + spin_mutex::scoped_lock lock(my_mutex); + if ( check_conditions() ) + ++my_tries; + else + return NULL; + } //SUCCESS // if we can reserve and can put, we consume the reservation // we increment the count and decrement the tries - if ( (my_predecessors.try_reserve(v)) == true ){ - reserved=true; - if ( (rval = my_successors.try_put_task(v)) != NULL ){ + if ( (my_predecessors.try_reserve(v)) == true ) { + reserved = true; + if ( (rval = my_successors.try_put_task(v)) != NULL ) { { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if ( my_future_decrement ){ - if ( my_count > my_future_decrement ){ + if ( my_future_decrement ) { + if ( my_count > my_future_decrement ) { my_count -= my_future_decrement; my_future_decrement = 0; } - else{ + else { my_future_decrement -= my_count; my_count = 0; } @@ -2006,7 +2008,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > //! Constructor limiter_node(graph &g, size_t threshold) : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0), - my_predecessors(this) , my_successors(this), decrement(this) + my_predecessors(this), my_successors(this), decrement(this) { initialize(); } @@ -2101,12 +2103,12 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > else { spin_mutex::scoped_lock lock(my_mutex); ++my_count; - if ( my_future_decrement ){ - if ( my_count > my_future_decrement ){ + if ( my_future_decrement ) { + if ( my_count > my_future_decrement ) { my_count -= my_future_decrement; my_future_decrement = 0; } - else{ + else { my_future_decrement -= my_count; my_count = 0; } @@ -2118,15 +2120,14 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > graph& graph_reference() const override { return my_graph; } - void reset_node( reset_flags f) override { + void reset_node( reset_flags f ) override { my_count = 0; - if(f & rf_clear_edges) { + if ( f & rf_clear_edges ) { my_predecessors.clear(); my_successors.clear(); } - else - { - my_predecessors.reset( ); + else { + my_predecessors.reset(); } decrement.reset_receiver(f); } From f620936cff7573a6d631fc57c152f1f48374463a Mon Sep 17 00:00:00 2001 From: "Serov, Vladimir" Date: Mon, 22 Nov 2021 09:34:41 +0300 Subject: [PATCH 8/8] Revert workaround for GCC Signed-off-by: Serov, Vladimir --- test/tbb/test_limiter_node.cpp | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index e3ece18a1a..7f499ca52b 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -125,18 +125,6 @@ struct put_dec_body : utils::NoAssign { }; -template< typename Sender, typename Receiver > -void make_edge_impl(Sender& sender, Receiver& receiver){ -#if __GNUC__ < 12 && !TBB_USE_DEBUG - // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03) - // The function pointer to make_edge workarounds the issue for unknown reason - auto make_edge_ptr = tbb::flow::make_edge; - make_edge_ptr(sender, receiver); -#else - tbb::flow::make_edge(sender, receiver); -#endif -} - template< typename T > void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { parallel_receiver r(g); @@ -367,7 +355,7 @@ void test_reserve_release_messages() { broad.try_put(1); //failed message retrieved. g.wait_for_all(); - make_edge_impl(limit, output_queue); //putting the successor back + tbb::flow::make_edge(limit, output_queue); //putting the successor back broad.try_put(1); //drop the count @@ -465,7 +453,7 @@ void test_try_put_without_successors() { } ); - make_edge_impl(ln, fn); + tbb::flow::make_edge(ln, fn); g.wait_for_all(); CHECK((counter == i * try_put_num / 2));