Skip to content

Commit

Permalink
Fix deadlock bug and handle shutdown race
Browse files Browse the repository at this point in the history
If we get a SIGINT while querying a topic's QoS, then exit cleanly.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
  • Loading branch information
jacobperron committed Mar 22, 2021
1 parent 4ae1095 commit eac644d
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions src/domain_bridge/wait_for_qos_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ class WaitForQosHandler

~WaitForQosHandler()
{
shutting_down_.store(true);
{
std::lock_guard<std::mutex> lock(waiting_map_mutex_);
shutting_down_ = true;
}
for (const auto & t : waiting_threads_) {
// Notify and join
t.second.second->notify_all();
Expand Down Expand Up @@ -120,38 +123,51 @@ class WaitForQosHandler
auto invoke_callback_when_qos_ready = [this, node, cv]()
{
auto event = node->get_graph_event();
while (!this->shutting_down_.load()) {
while (true) {
// Wait for graph event
// Note, in case new publishers don't trigger a graph event we add a
// timeout so that we can still poll periodically for new publishers
node->wait_for_graph_change(event, std::chrono::seconds(1));
event->check_and_clear();
// Check if QoS is ready for any of the topics

{
std::unique_lock<std::mutex> lock(waiting_map_mutex_);
// If we're shuttdown down, exit the thread
if (this->shutting_down_) {
return;
}

// Check if QoS is ready for any of the topics
auto & topic_callback_vec = waiting_map_.at(node);
for (auto it = topic_callback_vec.begin(); it != topic_callback_vec.end(); ++it) {
const std::string & topic = it->first;
const auto callback = it->second;
auto opt_qos = this->get_topic_qos(topic, node);
std::optional<QosMatchInfo> opt_qos;
try {
opt_qos = this->get_topic_qos(topic, node);
} catch (const rclcpp::exceptions::RCLError & ex) {
// If the context was shutdown, then exit cleanly
// This can happen if we get a SIGINT
const auto context = node->get_node_options().context();
if (!context->is_valid()) {
return;
}
throw ex;
}

if (opt_qos) {
const QosMatchInfo & qos = opt_qos.value();
callback(qos);
topic_callback_vec.erase(it--);
}
}
// If there are no more publishers to wait on,
// remove the entry from the map and terminate the thread
if (topic_callback_vec.size() == 0u) {
waiting_map_.erase(waiting_map_.find(node));
}

// It's only worth continuing if we have callbacks to handle,
// It's only worth continuing if we have callbacks to handle
// or we're shutting down
cv->wait(
lock,
[this, &topic_callback_vec]
{return topic_callback_vec.size() > 0u || this->shutting_down_.load();});
[this, &topic_callback_vec, event]
{return (topic_callback_vec.size() > 0u) || this->shutting_down_;});
}
}
};
Expand Down Expand Up @@ -256,7 +272,7 @@ class WaitForQosHandler
}

/// Flag to tell waiting threads we're shutting down
std::atomic_bool shutting_down_{false};
bool shutting_down_{false};

/// Threads used for waiting on publishers to become available
ThreadMap waiting_threads_;
Expand Down

0 comments on commit eac644d

Please sign in to comment.