Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition in rmw_wait. #160

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ void EventsManager::add_new_event(
///=============================================================================
void EventsManager::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
Expand All @@ -194,7 +195,8 @@ void EventsManager::attach_event_condition(
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
event_condition_mutexes_[event_id] = condition_mutex;
event_conditions_[event_id] = condition_variable;
}

Expand All @@ -209,7 +211,8 @@ void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
event_condition_mutexes_[event_id] = nullptr;
event_conditions_[event_id] = nullptr;
}

Expand All @@ -224,8 +227,9 @@ void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
return;
}

std::lock_guard<std::mutex> lock(event_condition_mutex_);
std::lock_guard<std::mutex> lock(update_event_condition_mutex_);
if (event_conditions_[event_id] != nullptr) {
std::lock_guard<std::mutex> cvlk(*event_condition_mutexes_[event_id]);
event_conditions_[event_id]->notify_one();
}
}
4 changes: 3 additions & 1 deletion rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class EventsManager
/// @param condition_variable to attach.
void attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::mutex * condition_mutex,
std::condition_variable * condition_variable);

/// @brief Detach the condition variable provided by rmw_wait.
Expand All @@ -154,7 +155,8 @@ class EventsManager
/// Mutex to lock when read/writing members.
mutable std::mutex event_mutex_;
/// Mutex to lock for event_condition.
mutable std::mutex event_condition_mutex_;
mutable std::mutex update_event_condition_mutex_;
std::mutex * event_condition_mutexes_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via data_callback_mgr.set_callback().
Expand Down
14 changes: 13 additions & 1 deletion rmw_zenoh_cpp/src/detail/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,36 @@ void GuardCondition::trigger()
has_triggered_ = true;

if (condition_variable_ != nullptr) {
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_variable_->notify_one();
}
}

///==============================================================================
void GuardCondition::attach_condition(std::condition_variable * condition_variable)
void GuardCondition::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_mutex_ = condition_mutex;
condition_variable_ = condition_variable;
}

///==============================================================================
void GuardCondition::detach_condition()
{
std::lock_guard<std::mutex> lock(internal_mutex_);
condition_mutex_ = nullptr;
condition_variable_ = nullptr;
}

///==============================================================================
bool GuardCondition::get_trigger() const
{
std::lock_guard<std::mutex> lock(internal_mutex_);
return has_triggered_;
}

///==============================================================================
bool GuardCondition::get_and_reset_trigger()
{
Expand Down
7 changes: 5 additions & 2 deletions rmw_zenoh_cpp/src/detail/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ class GuardCondition final
// Sets has_triggered_ to true and calls notify_one() on condition_variable_ if set.
void trigger();

void attach_condition(std::condition_variable * condition_variable);
void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);

void detach_condition();

bool get_trigger() const;

bool get_and_reset_trigger();

private:
mutable std::mutex internal_mutex_;
std::atomic_bool has_triggered_;
std::condition_variable * condition_variable_;
std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_variable_{nullptr};
};

#endif // DETAIL__GUARD_CONDITION_HPP_
45 changes: 33 additions & 12 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,32 @@ size_t rmw_publisher_data_t::get_next_sequence_number()
}

///=============================================================================
void rmw_subscription_data_t::attach_condition(std::condition_variable * condition_variable)
void rmw_subscription_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
condition_ = condition_variable;
}

///=============================================================================
void rmw_subscription_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}

///=============================================================================
void rmw_subscription_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
condition_ = nullptr;
}

Expand Down Expand Up @@ -150,16 +157,20 @@ bool rmw_service_data_t::query_queue_is_empty() const
}

///=============================================================================
void rmw_service_data_t::attach_condition(std::condition_variable * condition_variable)
void rmw_service_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
condition_ = condition_variable;
}

///=============================================================================
void rmw_service_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
condition_ = nullptr;
}

Expand All @@ -180,8 +191,11 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::pop_next_query()
///=============================================================================
void rmw_service_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}
Expand Down Expand Up @@ -282,8 +296,11 @@ std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
///=============================================================================
void rmw_client_data_t::notify()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
if (condition_ != nullptr) {
// We also need to take the mutex for the condition_variable; see the comment
// in rmw_wait for more information
std::lock_guard<std::mutex> cvlk(*condition_mutex_);
condition_->notify_one();
}
}
Expand Down Expand Up @@ -320,16 +337,20 @@ bool rmw_client_data_t::reply_queue_is_empty() const
}

///=============================================================================
void rmw_client_data_t::attach_condition(std::condition_variable * condition_variable)
void rmw_client_data_t::attach_condition(
std::mutex * condition_mutex,
std::condition_variable * condition_variable)
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = condition_mutex;
condition_ = condition_variable;
}

///=============================================================================
void rmw_client_data_t::detach_condition()
{
std::lock_guard<std::mutex> lock(condition_mutex_);
std::lock_guard<std::mutex> lock(update_condition_mutex_);
condition_mutex_ = nullptr;
condition_ = nullptr;
}

Expand Down
15 changes: 9 additions & 6 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class rmw_subscription_data_t final
MessageTypeSupport * type_support;
rmw_context_t * context;

void attach_condition(std::condition_variable * condition_variable);
void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -191,8 +191,9 @@ class rmw_subscription_data_t final

void notify();

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
std::mutex update_condition_mutex_;
};


Expand Down Expand Up @@ -243,7 +244,7 @@ class rmw_service_data_t final

bool query_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);
void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -269,8 +270,9 @@ class rmw_service_data_t final
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
std::mutex update_condition_mutex_;
};

///=============================================================================
Expand Down Expand Up @@ -320,7 +322,7 @@ class rmw_client_data_t final

bool reply_queue_is_empty() const;

void attach_condition(std::condition_variable * condition_variable);
void attach_condition(std::mutex * condition_mutex, std::condition_variable * condition_variable);

void detach_condition();

Expand All @@ -334,8 +336,9 @@ class rmw_client_data_t final
size_t sequence_number_{1};
std::mutex sequence_number_mutex_;

std::mutex * condition_mutex_{nullptr};
std::condition_variable * condition_{nullptr};
std::mutex condition_mutex_;
std::mutex update_condition_mutex_;

std::deque<std::unique_ptr<ZenohReply>> reply_queue_;
mutable std::mutex reply_queue_mutex_;
Expand Down
Loading
Loading