Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the barrier logic for threads synchronization #811

Merged
merged 5 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ All notable changes to this project are documented in this file.
### Changed
- 🤖 [ergoCubSN001] Add logging of the wrist and fix the name of the waist imu (https://github.com/ami-iit/bipedal-locomotion-framework/pull/810)


### Fixed
- Fix the barrier logic for threads synchronization (https://github.com/ami-iit/bipedal-locomotion-framework/pull/811)

### Removed

## [0.18.0] - 2024-01-23
Expand Down
28 changes: 14 additions & 14 deletions src/System/include/BipedalLocomotion/System/AdvanceableRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ template <class _Advanceable> class AdvanceableRunner
* @return a thread of containing the running process. If the runner was not correctly
* initialized the thread is invalid.
*/
std::thread run(std::optional<std::reference_wrapper<Barrier>> barrier = {});
std::thread run(std::shared_ptr<Barrier> barrier = nullptr);

/**
* Stop the thread
Expand Down Expand Up @@ -227,8 +227,7 @@ bool AdvanceableRunner<_Advanceable>::setOutputResource(
}

template <class _Advanceable>
std::thread
AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrier>> barrier)
std::thread AdvanceableRunner<_Advanceable>::run(std::shared_ptr<Barrier> barrier)
{
constexpr auto logPrefix = "[AdvanceableRunner::run]";

Expand Down Expand Up @@ -273,8 +272,18 @@ AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrie

// run the thread
m_isRunning = true;
auto function = [&]() -> bool {
auto function = [&](std::shared_ptr<Barrier> barrier) -> bool {
constexpr auto logPrefix = "[AdvanceableRunner::run]";

// synchronize the threads
if (!(barrier == nullptr))
{
log()->debug("{} - {} This thread is waiting for the other threads.",
logPrefix,
m_info.name);
barrier->wait();
}

auto time = BipedalLocomotion::clock().now();
auto oldTime = time;
auto wakeUpTime = time;
Expand Down Expand Up @@ -363,16 +372,7 @@ AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrie
return this->m_advanceable->close();
};

// if the barrier is passed the run method, synchronization is performed
if (barrier.has_value())
{
log()->debug("{} - {} This thread is waiting for the other threads.",
logPrefix,
m_info.name);
barrier.value().get().wait();
}

return std::thread(function);
return std::thread(function, barrier);
}

template <class _Advanceable> void AdvanceableRunner<_Advanceable>::stop()
Expand Down
11 changes: 8 additions & 3 deletions src/System/include/BipedalLocomotion/System/Barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ class Barrier
{
public:
/**
* Constructor.
* @param counter initial value of the expected counter
* Calls the constructor. It creates a new Barrier with the given counter.
*/
explicit Barrier(const std::size_t counter);
static std::shared_ptr<Barrier> create(const std::size_t counter);

/**
* Blocks this thread at the phase synchronization point until its phase completion step is run
Expand All @@ -40,6 +39,12 @@ class Barrier
std::size_t m_initialCount;
std::size_t m_count;
std::size_t m_generation;

/**
* Constructor.
* @param counter initial value of the expected counter
*/
explicit Barrier(const std::size_t counter);
};
} // namespace System
} // namespace BipedalLocomotion
Expand Down
12 changes: 11 additions & 1 deletion src/System/src/Barrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
* distributed under the terms of the BSD-3-Clause license.
*/

#include <memory>
#include <mutex>

#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/TextLogging/Logger.h>

using namespace BipedalLocomotion::System;

Expand All @@ -18,11 +20,18 @@ Barrier::Barrier(const std::size_t counter)
{
}

std::shared_ptr<Barrier> Barrier::create(const std::size_t counter)
{
return std::shared_ptr<Barrier>(new Barrier(counter));
}

void Barrier::wait()
LoreMoretti marked this conversation as resolved.
Show resolved Hide resolved
{
constexpr auto logPrefix = "[Barrier::wait]";

std::unique_lock lock{m_mutex};
const auto tempGeneration = m_generation;
if ((--m_count) == 1)
if ((--m_count) == 0)
{
// all threads reached the barrier, so we can consider them synchronized
m_generation++;
Expand All @@ -31,6 +40,7 @@ void Barrier::wait()
m_count = m_initialCount;

// notify the other threads
log()->debug("{} All threads reached the barrier.", logPrefix);
m_cond.notify_all();
} else
{
Expand Down
3 changes: 2 additions & 1 deletion src/System/tests/AdvanceableRunnerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/System/SharedResource.h>
#include <BipedalLocomotion/System/Source.h>
#include <memory>

using namespace BipedalLocomotion::System;
using namespace BipedalLocomotion::ParametersHandler;
Expand Down Expand Up @@ -120,7 +121,7 @@ TEST_CASE("Test Block")
SECTION("With synchronization")
{
constexpr std::size_t numberOfRunners = 2;
Barrier barrier(numberOfRunners);
auto barrier = Barrier::create(numberOfRunners);

// run the block
auto thread0 = runner0.run(barrier);
Expand Down
Loading