Skip to content

Commit

Permalink
improving and cleanup (#42)
Browse files Browse the repository at this point in the history
make it more clean
  • Loading branch information
thorstink authored Jun 21, 2023
1 parent 2a41135 commit 0fb5953
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
6 changes: 5 additions & 1 deletion examples/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#an app
find_package(spdlog REQUIRED)

if(ASAN_BUILD AND NOT TSAN_BUILD)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -fsanitize=address,undefined -fno-omit-frame-pointer -O0")
elseif(TSAN_BUILD AND NOT ASAN_BUILD)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -fsanitize=thread -fno-omit-frame-pointer -O0")
endif()
add_executable(${PROJECT_NAME}_flight flight.cc)
target_link_libraries(${PROJECT_NAME}_flight symmetri spdlog::spdlog)

Expand Down
1 change: 0 additions & 1 deletion examples/flight/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,5 @@ int main(int, char *argv[]) {
spdlog::info("EventLog: {0}, {1}, {2}, {3}", caseid, t, printState(s),
c.time_since_epoch().count());
}

return result == symmetri::State::Completed ? 0 : -1;
}
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package format="2">
<name>symmetri</name>
<version>0.7.2</version>
<version>0.7.3</version>
<description>A C++ library for executing Petri nets as programs</description>
<maintainer email="thomas@mainblades.com">Thomas Horstink</maintainer>
<author>Thomas Horstink</author>
Expand Down
2 changes: 2 additions & 0 deletions symmetri/actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ StoppablePool::~StoppablePool() {
for (auto &&t : pool) {
if (t.joinable()) {
t.join();
} else {
t.detach();
}
}
}
Expand Down
65 changes: 38 additions & 27 deletions symmetri/symmetri.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <thread>
#include <tuple>

#include "model.h"
Expand All @@ -17,6 +19,18 @@

namespace symmetri {

/**
* @brief Get the Thread Id object. We go to "great lengths" to get a 32 bit
* representation of the thread id, because in that case the atomic is lock-free
* on (most) 64 bit systems.
*
* @return unsigned int
*/
unsigned int getThreadId() {
return static_cast<unsigned int>(
std::hash<std::thread::id>()(std::this_thread::get_id()));
}

Result fireTransition(const Application &app) { return app.execute(); };

Result cancelTransition(const Application &app) { return app.exitEarly(); }
Expand Down Expand Up @@ -48,7 +62,7 @@ bool areAllTransitionsInStore(const Store &store, const Net &net) noexcept {
*/
struct Petri {
Model m; ///< The Petri net model
std::thread::id
std::atomic<std::optional<unsigned int>>
thread_id_; ///< The id of the thread from which run is called.
const Marking m0_; ///< The initial marking for this instance
const Net net_; ///< The net
Expand All @@ -60,8 +74,6 @@ struct Petri {
const std::shared_ptr<const StoppablePool> stp;
std::shared_ptr<BlockingConcurrentQueue<Reducer>> reducers;
std::string case_id; ///< The case id of this particular Petri instance
std::atomic<bool>
active; ///< The net is active as long as it is still dequeuing reducers
std::atomic<bool> early_exit; ///< once it is true, no more new transitions
///< will be queued and the run will exit.

Expand All @@ -83,6 +95,7 @@ struct Petri {
const Store &store, const PriorityTable &priorities,
const std::string &case_id)
: m(net, store, priorities, m0),
thread_id_(std::nullopt),
m0_(m0),
net_(net),
store_(store),
Expand All @@ -99,11 +112,10 @@ struct Petri {
stp(stp),
reducers(std::make_shared<BlockingConcurrentQueue<Reducer>>(256)),
case_id(case_id),
active(false),
early_exit(false) {}

bool reset(const std::string &new_case_id) {
if (active || new_case_id == case_id) {
if (thread_id_.load().has_value() || new_case_id == case_id) {
return false;
}

Expand All @@ -112,7 +124,6 @@ struct Petri {
case_id = new_case_id;
reducers = std::make_shared<BlockingConcurrentQueue<Reducer>>(256);

active.store(false);
early_exit.store(false);
return true;
}
Expand All @@ -139,9 +150,8 @@ struct Petri {
*/
Result run() {
// we are running!
thread_id_ = std::this_thread::get_id();
thread_id_.store(getThreadId());
early_exit.store(false);
active.store(true);
// reassign it manually to reset.
m.event_log.clear();
m.tokens_n = m.initial_tokens;
Expand Down Expand Up @@ -190,7 +200,7 @@ struct Petri {
}

spdlog::info("[{1}] finished with result {0}", printState(result), case_id);
active.store(false);
thread_id_.store(std::nullopt);
return {m.event_log, result};
}
};
Expand All @@ -217,7 +227,7 @@ create(const Net &net, const Marking &m0, const Marking &final_marking,
priority, case_id);
return {
impl, [=](const Transition &t) {
if (impl->active.load()) {
if (impl->thread_id_.load()) {
impl->reducers->enqueue([=](Model &&m) -> Model & {
const auto t_index = toIndex(m.net.transition, t);
m.active_transitions_n.push_back(t_index);
Expand Down Expand Up @@ -277,10 +287,9 @@ bool Application::tryFireTransition(const Transition &t) const noexcept {

Result Application::execute() const noexcept {
if (impl == nullptr) {
spdlog::error(
"Something went seriously wrong. Please send a bug report.");
spdlog::error("Something went seriously wrong. Please send a bug report.");
return {{}, State::Error};
} else if (impl->active.load()) {
} else if (impl->thread_id_.load()) {
spdlog::warn(
"[{0}] is already active, it can not run it again before it is "
"finished.",
Expand All @@ -295,15 +304,14 @@ Result Application::execute() const noexcept {
Eventlog Application::getEventLog() const noexcept {
std::promise<Eventlog> el;
std::future<Eventlog> el_getter = el.get_future();
if (impl->active.load()) {
const auto maybe_thread_id = impl->thread_id_.load();
if (maybe_thread_id) {
impl->reducers->enqueue([&](Model &&model) -> Model & {
el.set_value(model.event_log.empty() ? model.event_log : Eventlog({}));
return model;
});
}

return impl->thread_id_ != std::this_thread::get_id() &&
impl->active.load() &&
return maybe_thread_id && maybe_thread_id.value() != getThreadId() &&
el_getter.wait_for(std::chrono::milliseconds(1000)) ==
std::future_status::ready
? el_getter.get()
Expand All @@ -314,14 +322,14 @@ std::pair<std::vector<Transition>, std::vector<Place>> Application::getState()
const noexcept {
std::promise<std::pair<std::vector<Transition>, std::vector<Place>>> state;
auto getter = state.get_future();
if (impl->active.load()) {
const auto maybe_thread_id = impl->thread_id_.load();
if (maybe_thread_id) {
impl->reducers->enqueue([&](Model &&model) -> Model & {
state.set_value(model.getState());
return model;
});
}
return impl->thread_id_ != std::this_thread::get_id() &&
impl->active.load() &&
return maybe_thread_id && maybe_thread_id.value() != getThreadId() &&
getter.wait_for(std::chrono::milliseconds(1000)) ==
std::future_status::ready
? getter.get()
Expand All @@ -332,15 +340,14 @@ std::vector<Transition> Application::getFireableTransitions() const noexcept {
std::promise<std::vector<Transition>> transitions;
std::future<std::vector<Transition>> transitions_getter =
transitions.get_future();

if (impl->active.load()) {
const auto maybe_thread_id = impl->thread_id_.load();
if (maybe_thread_id) {
impl->reducers->enqueue([&](Model &&model) -> Model & {
transitions.set_value(model.getFireableTransitions());
return model;
});
}
return impl->thread_id_ != std::this_thread::get_id() &&
impl->active.load() &&
return maybe_thread_id && maybe_thread_id.value() != getThreadId() &&
transitions_getter.wait_for(std::chrono::milliseconds(1000)) ==
std::future_status::ready
? transitions_getter.get()
Expand All @@ -353,7 +360,8 @@ std::function<void()> Application::registerTransitionCallback(
}

Result Application::exitEarly() const noexcept {
if (impl->thread_id_ != std::this_thread::get_id() && impl->active.load() &&
const auto maybe_thread_id = impl->thread_id_.load();
if (maybe_thread_id && maybe_thread_id.value() != getThreadId() &&
!impl->early_exit.load()) {
std::mutex m;
std::condition_variable cv;
Expand All @@ -370,7 +378,10 @@ Result Application::exitEarly() const noexcept {
std::unique_lock lk(m);
cv.wait(lk, [&] { return ready; });
} else {
impl->early_exit.store(true);
impl->reducers->enqueue([=](Model &&model) -> Model & {
impl->early_exit.store(true);
return model;
});
}

return {getEventLog(), State::UserExit};
Expand All @@ -379,7 +390,7 @@ Result Application::exitEarly() const noexcept {
bool Application::reuseApplication(const std::string &new_case_id) {
if (impl->reset(new_case_id)) {
register_functor = [=](const Transition &t) {
if (impl->active.load()) {
if (impl->thread_id_.load()) {
impl->reducers->enqueue([=](Model &&m) -> Model & {
const auto t_index = toIndex(m.net.transition, t);
m.active_transitions_n.push_back(t_index);
Expand Down

0 comments on commit 0fb5953

Please sign in to comment.