diff --git a/CHANGELOG.md b/CHANGELOG.md index 0935f849fb..0f0bc0edec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project are documented in this file. ### Added - Implement `AdvanceableRunner::isRunning()` method (https://github.com/dic-iit/bipedal-locomotion-framework/pull/395) - Implement `ContactPhaseList::getPresentPhase()` method (https://github.com/dic-iit/bipedal-locomotion-framework/pull/396) +- Add a synchronization mechanism for the `AdvanceableRunner` class (https://github.com/dic-iit/bipedal-locomotion-framework/pull/403) ### Changed diff --git a/src/System/CMakeLists.txt b/src/System/CMakeLists.txt index f8bb778100..86b95b93dc 100644 --- a/src/System/CMakeLists.txt +++ b/src/System/CMakeLists.txt @@ -15,8 +15,9 @@ if(FRAMEWORK_COMPILE_System) ${H_PREFIX}/IClock.h ${H_PREFIX}/StdClock.h ${H_PREFIX}/Clock.h ${H_PREFIX}/SharedResource.h ${H_PREFIX}/AdvanceableRunner.h ${H_PREFIX}/QuitHandler.h + ${H_PREFIX}/Barrier.h SOURCES src/VariablesHandler.cpp src/LinearTask.cpp - src/StdClock.cpp src/Clock.cpp src/QuitHandler.cpp + src/StdClock.cpp src/Clock.cpp src/QuitHandler.cpp src/Barrier.cpp PUBLIC_LINK_LIBRARIES BipedalLocomotion::ParametersHandler Eigen3::Eigen SUBDIRECTORIES tests YarpImplementation ) diff --git a/src/System/include/BipedalLocomotion/System/AdvanceableRunner.h b/src/System/include/BipedalLocomotion/System/AdvanceableRunner.h index 22e5761781..63265ec611 100644 --- a/src/System/include/BipedalLocomotion/System/AdvanceableRunner.h +++ b/src/System/include/BipedalLocomotion/System/AdvanceableRunner.h @@ -11,12 +11,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include @@ -110,10 +112,12 @@ template class AdvanceableRunner /** * Run the advanceable runner. The function create a periodic thread running with a period of * m_dT seconds. + * @param barrier is an optional parameter that can be used to synchronize the startup of all + * the AdvanceableRunner spawned by the process. * @return a thread of containing the running process. If the runner was not correctly * initialized the thread is invalid. */ - std::thread run(); + std::thread run(std::optional> barrier = {}); /** * Stop the thread @@ -221,7 +225,9 @@ bool AdvanceableRunner<_Advanceable>::setOutputResource( return true; } -template std::thread AdvanceableRunner<_Advanceable>::run() +template +std::thread +AdvanceableRunner<_Advanceable>::run(std::optional> barrier) { constexpr auto logPrefix = "[AdvanceableRunner::run]"; @@ -328,6 +334,13 @@ template std::thread AdvanceableRunner<_Advanceable>::run() return this->m_advanceable->close(); }; + // if the barrier is passed the run method, synchronization is performed + if (barrier.has_value()) + { + log()->debug("{} This thread is waiting for the other threads.", logPrefix); + barrier.value().get().wait(); + } + return std::thread(function); } diff --git a/src/System/include/BipedalLocomotion/System/Barrier.h b/src/System/include/BipedalLocomotion/System/Barrier.h new file mode 100644 index 0000000000..f53537d563 --- /dev/null +++ b/src/System/include/BipedalLocomotion/System/Barrier.h @@ -0,0 +1,47 @@ +/** + * @file Barrier.h + * @authors Giulio Romualdi + * @copyright 2021 Istituto Italiano di Tecnologia (IIT). This software may be modified and + * distributed under the terms of the GNU Lesser General Public License v2.1 or any later version. + */ + +#ifndef BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H +#define BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H + +#include +#include +#include + +namespace BipedalLocomotion +{ +namespace System +{ +/** + * Barrier provides a thread-coordination mechanism that allows an expected number of threads to + * block until the expected number of threads arrive at the barrier. + */ +class Barrier +{ +public: + /** + * Constructor. + * @param counter initial value of the expected counter + */ + explicit Barrier(const std::size_t counter); + + /** + * Blocks this thread at the phase synchronization point until its phase completion step is run + */ + void wait(); + +private: + std::mutex m_mutex; + std::condition_variable m_cond; + std::size_t m_initialCount; + std::size_t m_count; + std::size_t m_generation; +}; +} // namespace System +} // namespace BipedalLocomotion + +#endif // BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H diff --git a/src/System/src/Barrier.cpp b/src/System/src/Barrier.cpp new file mode 100644 index 0000000000..480e3789a1 --- /dev/null +++ b/src/System/src/Barrier.cpp @@ -0,0 +1,40 @@ +/** + * @file Barrier.cpp + * @authors Giulio Romualdi + * @copyright 2021 Istituto Italiano di Tecnologia (IIT). This software may be modified and + * distributed under the terms of the GNU Lesser General Public License v2.1 or any later version. + */ + +#include + +#include + +using namespace BipedalLocomotion::System; + +Barrier::Barrier(const std::size_t counter) + : m_initialCount(counter) + , m_count(counter) + , m_generation(0) +{ +} + +void Barrier::wait() +{ + std::unique_lock lock{m_mutex}; + const auto tempGeneration = m_generation; + if ((--m_count) == 1) + { + // all threads reached the barrier, so we can consider them synchronized + m_generation++; + + // reset the counter + m_count = m_initialCount; + + // notify the other threads + m_cond.notify_all(); + } else + { + // waiting for the other threads + m_cond.wait(lock, [this, tempGeneration] { return tempGeneration != m_generation; }); + } +} diff --git a/src/System/tests/AdvanceableRunnerTest.cpp b/src/System/tests/AdvanceableRunnerTest.cpp index 3ff9770471..152bc227f0 100644 --- a/src/System/tests/AdvanceableRunnerTest.cpp +++ b/src/System/tests/AdvanceableRunnerTest.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -73,38 +74,82 @@ TEST_CASE("Test Block") REQUIRE(runner1.setOutputResource(output1)); REQUIRE(runner1.setAdvanceable(std::move(block1))); - // run the block - auto thread0 = runner0.run(); - auto thread1 = runner1.run(); - - while (!output0->get() || !output1->get()) + SECTION("Without synchronization") { - BipedalLocomotion::clock().sleepFor(std::chrono::duration(0.01)); - } + // run the block + auto thread0 = runner0.run(); + auto thread1 = runner1.run(); + + while (!output0->get() || !output1->get()) + { + BipedalLocomotion::clock().sleepFor(std::chrono::duration(0.01)); + } - // close the runner - runner0.stop(); - runner1.stop(); + // close the runner + runner0.stop(); + runner1.stop(); - // print some information - BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}", - runner0.getInfo().deadlineMiss); - BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}", - runner1.getInfo().deadlineMiss); + // print some information + BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}", + runner0.getInfo().deadlineMiss); + BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}", + runner1.getInfo().deadlineMiss); - REQUIRE(output0->get()); - REQUIRE(output1->get()); + REQUIRE(output0->get()); + REQUIRE(output1->get()); - // join the treads - if(thread0.joinable()) - { - thread0.join(); - thread0 = std::thread(); + // join the treads + if (thread0.joinable()) + { + thread0.join(); + thread0 = std::thread(); + } + + if (thread1.joinable()) + { + thread1.join(); + thread1 = std::thread(); + } } - if(thread1.joinable()) + SECTION("With synchronization") { - thread1.join(); - thread1 = std::thread(); + constexpr std::size_t numberOfRunners = 2; + Barrier barrier(numberOfRunners); + + // run the block + auto thread0 = runner0.run(barrier); + auto thread1 = runner1.run(barrier); + + while (!output0->get() || !output1->get()) + { + BipedalLocomotion::clock().sleepFor(std::chrono::duration(0.01)); + } + + // close the runner + runner0.stop(); + runner1.stop(); + + // print some information + BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}", + runner0.getInfo().deadlineMiss); + BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}", + runner1.getInfo().deadlineMiss); + + REQUIRE(output0->get()); + REQUIRE(output1->get()); + + // join the treads + if (thread0.joinable()) + { + thread0.join(); + thread0 = std::thread(); + } + + if (thread1.joinable()) + { + thread1.join(); + thread1 = std::thread(); + } } }