From eac644dbba9e619f882f062f59a43bd534df508b Mon Sep 17 00:00:00 2001 From: Jacob Perron Date: Mon, 22 Mar 2021 16:17:50 -0700 Subject: [PATCH] Fix deadlock bug and handle shutdown race If we get a SIGINT while querying a topic's QoS, then exit cleanly. Signed-off-by: Jacob Perron --- src/domain_bridge/wait_for_qos_handler.hpp | 42 +++++++++++++++------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/src/domain_bridge/wait_for_qos_handler.hpp b/src/domain_bridge/wait_for_qos_handler.hpp index 16ce645..28cc02a 100644 --- a/src/domain_bridge/wait_for_qos_handler.hpp +++ b/src/domain_bridge/wait_for_qos_handler.hpp @@ -64,7 +64,10 @@ class WaitForQosHandler ~WaitForQosHandler() { - shutting_down_.store(true); + { + std::lock_guard lock(waiting_map_mutex_); + shutting_down_ = true; + } for (const auto & t : waiting_threads_) { // Notify and join t.second.second->notify_all(); @@ -120,38 +123,51 @@ class WaitForQosHandler auto invoke_callback_when_qos_ready = [this, node, cv]() { auto event = node->get_graph_event(); - while (!this->shutting_down_.load()) { + while (true) { // Wait for graph event // Note, in case new publishers don't trigger a graph event we add a // timeout so that we can still poll periodically for new publishers node->wait_for_graph_change(event, std::chrono::seconds(1)); event->check_and_clear(); - // Check if QoS is ready for any of the topics + { std::unique_lock lock(waiting_map_mutex_); + // If we're shuttdown down, exit the thread + if (this->shutting_down_) { + return; + } + + // Check if QoS is ready for any of the topics auto & topic_callback_vec = waiting_map_.at(node); for (auto it = topic_callback_vec.begin(); it != topic_callback_vec.end(); ++it) { const std::string & topic = it->first; const auto callback = it->second; - auto opt_qos = this->get_topic_qos(topic, node); + std::optional opt_qos; + try { + opt_qos = this->get_topic_qos(topic, node); + } catch (const rclcpp::exceptions::RCLError & ex) { + // If the context was shutdown, then exit cleanly + // This can happen if we get a SIGINT + const auto context = node->get_node_options().context(); + if (!context->is_valid()) { + return; + } + throw ex; + } + if (opt_qos) { const QosMatchInfo & qos = opt_qos.value(); callback(qos); topic_callback_vec.erase(it--); } } - // If there are no more publishers to wait on, - // remove the entry from the map and terminate the thread - if (topic_callback_vec.size() == 0u) { - waiting_map_.erase(waiting_map_.find(node)); - } - // It's only worth continuing if we have callbacks to handle, + // It's only worth continuing if we have callbacks to handle // or we're shutting down cv->wait( lock, - [this, &topic_callback_vec] - {return topic_callback_vec.size() > 0u || this->shutting_down_.load();}); + [this, &topic_callback_vec, event] + {return (topic_callback_vec.size() > 0u) || this->shutting_down_;}); } } }; @@ -256,7 +272,7 @@ class WaitForQosHandler } /// Flag to tell waiting threads we're shutting down - std::atomic_bool shutting_down_{false}; + bool shutting_down_{false}; /// Threads used for waiting on publishers to become available ThreadMap waiting_threads_;