Skip to content

Commit

Permalink
Merge pull request #403 from dic-iit/system/barrier
Browse files Browse the repository at this point in the history
Add a synchronization mechanism for the AdvanceableRunner
  • Loading branch information
GiulioRomualdi authored Sep 2, 2021
2 parents 64eafc8 + ae32890 commit 583645d
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ All notable changes to this project are documented in this file.
### Added
- Implement `AdvanceableRunner::isRunning()` method (https://github.com/dic-iit/bipedal-locomotion-framework/pull/395)
- Implement `ContactPhaseList::getPresentPhase()` method (https://github.com/dic-iit/bipedal-locomotion-framework/pull/396)
- Add a synchronization mechanism for the `AdvanceableRunner` class (https://github.com/dic-iit/bipedal-locomotion-framework/pull/403)

### Changed

Expand Down
3 changes: 2 additions & 1 deletion src/System/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ if(FRAMEWORK_COMPILE_System)
${H_PREFIX}/IClock.h ${H_PREFIX}/StdClock.h ${H_PREFIX}/Clock.h
${H_PREFIX}/SharedResource.h ${H_PREFIX}/AdvanceableRunner.h
${H_PREFIX}/QuitHandler.h
${H_PREFIX}/Barrier.h
SOURCES src/VariablesHandler.cpp src/LinearTask.cpp
src/StdClock.cpp src/Clock.cpp src/QuitHandler.cpp
src/StdClock.cpp src/Clock.cpp src/QuitHandler.cpp src/Barrier.cpp
PUBLIC_LINK_LIBRARIES BipedalLocomotion::ParametersHandler Eigen3::Eigen
SUBDIRECTORIES tests YarpImplementation
)
Expand Down
17 changes: 15 additions & 2 deletions src/System/include/BipedalLocomotion/System/AdvanceableRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
#include <BipedalLocomotion/GenericContainer/TemplateHelpers.h>
#include <BipedalLocomotion/ParametersHandler/IParametersHandler.h>
#include <BipedalLocomotion/System/Advanceable.h>
#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/System/Clock.h>
#include <BipedalLocomotion/System/SharedResource.h>
#include <BipedalLocomotion/TextLogging/Logger.h>

#include <atomic>
#include <memory>
#include <optional>
#include <thread>
#include <type_traits>

Expand Down Expand Up @@ -110,10 +112,12 @@ template <class _Advanceable> class AdvanceableRunner
/**
* Run the advanceable runner. The function create a periodic thread running with a period of
* m_dT seconds.
* @param barrier is an optional parameter that can be used to synchronize the startup of all
* the AdvanceableRunner spawned by the process.
* @return a thread of containing the running process. If the runner was not correctly
* initialized the thread is invalid.
*/
std::thread run();
std::thread run(std::optional<std::reference_wrapper<Barrier>> barrier = {});

/**
* Stop the thread
Expand Down Expand Up @@ -221,7 +225,9 @@ bool AdvanceableRunner<_Advanceable>::setOutputResource(
return true;
}

template <class _Advanceable> std::thread AdvanceableRunner<_Advanceable>::run()
template <class _Advanceable>
std::thread
AdvanceableRunner<_Advanceable>::run(std::optional<std::reference_wrapper<Barrier>> barrier)
{
constexpr auto logPrefix = "[AdvanceableRunner::run]";

Expand Down Expand Up @@ -328,6 +334,13 @@ template <class _Advanceable> std::thread AdvanceableRunner<_Advanceable>::run()
return this->m_advanceable->close();
};

// if the barrier is passed the run method, synchronization is performed
if (barrier.has_value())
{
log()->debug("{} This thread is waiting for the other threads.", logPrefix);
barrier.value().get().wait();
}

return std::thread(function);
}

Expand Down
47 changes: 47 additions & 0 deletions src/System/include/BipedalLocomotion/System/Barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* @file Barrier.h
* @authors Giulio Romualdi
* @copyright 2021 Istituto Italiano di Tecnologia (IIT). This software may be modified and
* distributed under the terms of the GNU Lesser General Public License v2.1 or any later version.
*/

#ifndef BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H
#define BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H

#include <condition_variable>
#include <cstddef>
#include <mutex>

namespace BipedalLocomotion
{
namespace System
{
/**
* Barrier provides a thread-coordination mechanism that allows an expected number of threads to
* block until the expected number of threads arrive at the barrier.
*/
class Barrier
{
public:
/**
* Constructor.
* @param counter initial value of the expected counter
*/
explicit Barrier(const std::size_t counter);

/**
* Blocks this thread at the phase synchronization point until its phase completion step is run
*/
void wait();

private:
std::mutex m_mutex;
std::condition_variable m_cond;
std::size_t m_initialCount;
std::size_t m_count;
std::size_t m_generation;
};
} // namespace System
} // namespace BipedalLocomotion

#endif // BIPEDAL_LOCOMOTION_SYSTEM_BARRIER_H
40 changes: 40 additions & 0 deletions src/System/src/Barrier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @file Barrier.cpp
* @authors Giulio Romualdi
* @copyright 2021 Istituto Italiano di Tecnologia (IIT). This software may be modified and
* distributed under the terms of the GNU Lesser General Public License v2.1 or any later version.
*/

#include <mutex>

#include <BipedalLocomotion/System/Barrier.h>

using namespace BipedalLocomotion::System;

Barrier::Barrier(const std::size_t counter)
: m_initialCount(counter)
, m_count(counter)
, m_generation(0)
{
}

void Barrier::wait()
{
std::unique_lock lock{m_mutex};
const auto tempGeneration = m_generation;
if ((--m_count) == 1)
{
// all threads reached the barrier, so we can consider them synchronized
m_generation++;

// reset the counter
m_count = m_initialCount;

// notify the other threads
m_cond.notify_all();
} else
{
// waiting for the other threads
m_cond.wait(lock, [this, tempGeneration] { return tempGeneration != m_generation; });
}
}
95 changes: 70 additions & 25 deletions src/System/tests/AdvanceableRunnerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <BipedalLocomotion/ParametersHandler/StdImplementation.h>
#include <BipedalLocomotion/System/AdvanceableRunner.h>
#include <BipedalLocomotion/System/Barrier.h>
#include <BipedalLocomotion/System/SharedResource.h>
#include <BipedalLocomotion/System/Source.h>

Expand Down Expand Up @@ -73,38 +74,82 @@ TEST_CASE("Test Block")
REQUIRE(runner1.setOutputResource(output1));
REQUIRE(runner1.setAdvanceable(std::move(block1)));

// run the block
auto thread0 = runner0.run();
auto thread1 = runner1.run();

while (!output0->get() || !output1->get())
SECTION("Without synchronization")
{
BipedalLocomotion::clock().sleepFor(std::chrono::duration<double>(0.01));
}
// run the block
auto thread0 = runner0.run();
auto thread1 = runner1.run();

while (!output0->get() || !output1->get())
{
BipedalLocomotion::clock().sleepFor(std::chrono::duration<double>(0.01));
}

// close the runner
runner0.stop();
runner1.stop();
// close the runner
runner0.stop();
runner1.stop();

// print some information
BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}",
runner0.getInfo().deadlineMiss);
BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}",
runner1.getInfo().deadlineMiss);
// print some information
BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}",
runner0.getInfo().deadlineMiss);
BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}",
runner1.getInfo().deadlineMiss);

REQUIRE(output0->get());
REQUIRE(output1->get());
REQUIRE(output0->get());
REQUIRE(output1->get());

// join the treads
if(thread0.joinable())
{
thread0.join();
thread0 = std::thread();
// join the treads
if (thread0.joinable())
{
thread0.join();
thread0 = std::thread();
}

if (thread1.joinable())
{
thread1.join();
thread1 = std::thread();
}
}

if(thread1.joinable())
SECTION("With synchronization")
{
thread1.join();
thread1 = std::thread();
constexpr std::size_t numberOfRunners = 2;
Barrier barrier(numberOfRunners);

// run the block
auto thread0 = runner0.run(barrier);
auto thread1 = runner1.run(barrier);

while (!output0->get() || !output1->get())
{
BipedalLocomotion::clock().sleepFor(std::chrono::duration<double>(0.01));
}

// close the runner
runner0.stop();
runner1.stop();

// print some information
BipedalLocomotion::log()->info("Runner0 : Number of deadline miss {}",
runner0.getInfo().deadlineMiss);
BipedalLocomotion::log()->info("Runner1 : Number of deadline miss {}",
runner1.getInfo().deadlineMiss);

REQUIRE(output0->get());
REQUIRE(output1->get());

// join the treads
if (thread0.joinable())
{
thread0.join();
thread0 = std::thread();
}

if (thread1.joinable())
{
thread1.join();
thread1 = std::thread();
}
}
}

0 comments on commit 583645d

Please sign in to comment.