From e5389f28ca09772ba50ca68e416cd28122f8c934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rik=20B=C3=A4hnemann?= Date: Thu, 10 Oct 2019 11:19:11 +0200 Subject: [PATCH 1/2] Thread device process. --- .../piksi_multi_cpp/receiver/receiver.h | 13 +++++-- piksi_multi_cpp/src/piksi_multi_node.cc | 9 ++--- piksi_multi_cpp/src/receiver/receiver.cc | 35 +++++++++++++------ 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h b/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h index c3b79b2f..7bf5a1f5 100644 --- a/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h +++ b/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h @@ -3,8 +3,10 @@ #include #include +#include #include #include +#include #include #include "piksi_multi_cpp/device/device.h" #include "piksi_multi_cpp/receiver/receiver.h" @@ -41,8 +43,6 @@ class Receiver { // Open device. bool init(); - // Read device and process SBP callbacks. - void process(); // Factory methods to create receivers. @@ -75,9 +75,16 @@ class Receiver { static ReceiverType inferType(const std::shared_ptr& dev); static std::string createNameSpace(const ReceiverType type, const size_t id); + // Read device and process SBP callbacks. + void process(); + // Start thread that reads device and processes SBP messages. This thread is + // terminated when the is_running flag ist set false during object + // destruction. + std::thread process_thread_; + std::atomic_bool is_running_; + // The sbp state. std::shared_ptr state_; - // SBP callbacks common for all receivers. std::vector> cb_; }; diff --git a/piksi_multi_cpp/src/piksi_multi_node.cc b/piksi_multi_cpp/src/piksi_multi_node.cc index 9afd2988..f7f20645 100644 --- a/piksi_multi_cpp/src/piksi_multi_node.cc +++ b/piksi_multi_cpp/src/piksi_multi_node.cc @@ -15,7 +15,7 @@ int main(int argc, char** argv) { exit(1); } - // Initialization + // Start all receivers. for (auto rec : receivers) { if (!rec->init()) { ROS_FATAL("Error initializing receiver."); @@ -23,12 +23,7 @@ int main(int argc, char** argv) { } } - // Process incoming data. - // TODO(rikba): Multithreading! - while (ros::ok()) { - for (auto rec : receivers) rec->process(); - ros::spinOnce(); - } + ros::spin(); return 0; } diff --git a/piksi_multi_cpp/src/receiver/receiver.cc b/piksi_multi_cpp/src/receiver/receiver.cc index 614d753c..f5bd6411 100644 --- a/piksi_multi_cpp/src/receiver/receiver.cc +++ b/piksi_multi_cpp/src/receiver/receiver.cc @@ -20,7 +20,7 @@ std::vector Receiver::kTypeVec = Receiver::Receiver(const ros::NodeHandle& nh, const std::shared_ptr& device) - : nh_(nh), device_(device) { + : nh_(nh), device_(device), is_running_(true) { // Initialize SBP state. state_ = std::make_shared(); sbp_state_init(state_.get()); @@ -53,6 +53,10 @@ std::shared_ptr Receiver::create( } Receiver::~Receiver() { + // Close thread. + is_running_.store(false); + process_thread_.join(); + if (device_) device_->close(); } @@ -61,18 +65,29 @@ bool Receiver::init() { ROS_ERROR("Device not set."); return false; } - return device_->open(); + + // Open attached device. + if (!device_->open()) { + ROS_ERROR("Cannot open device."); + return false; + } + + process_thread_ = std::thread(&Receiver::process, this); + + return true; } void Receiver::process() { - if (!device_.get()) return; - // Pass device pointer to process function. - sbp_state_set_io_context(state_.get(), device_.get()); - // Pass device read function to sbp_process. - int result = - sbp_process(state_.get(), &piksi_multi_cpp::Device::read_redirect); - if (result < 0) { - ROS_WARN_STREAM("Error sbp_process: " << result); + while (is_running_.load()) { + if (!device_.get()) return; + // Pass device pointer to process function. + sbp_state_set_io_context(state_.get(), device_.get()); + // Pass device read function to sbp_process. + int result = + sbp_process(state_.get(), &piksi_multi_cpp::Device::read_redirect); + if (result < 0) { + ROS_WARN_STREAM("Error sbp_process: " << result); + } } } From 573e2910d166aa82064d4b3f353cdd5bd308d19f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rik=20B=C3=A4hnemann?= Date: Thu, 10 Oct 2019 15:28:43 +0200 Subject: [PATCH 2/2] Make thread exiting clear. --- .../include/piksi_multi_cpp/receiver/receiver.h | 2 +- piksi_multi_cpp/src/receiver/receiver.cc | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h b/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h index 7bf5a1f5..314acdee 100644 --- a/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h +++ b/piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h @@ -81,7 +81,7 @@ class Receiver { // terminated when the is_running flag ist set false during object // destruction. std::thread process_thread_; - std::atomic_bool is_running_; + std::atomic_bool thread_exit_requested_; // The sbp state. std::shared_ptr state_; diff --git a/piksi_multi_cpp/src/receiver/receiver.cc b/piksi_multi_cpp/src/receiver/receiver.cc index f5bd6411..9ecb6449 100644 --- a/piksi_multi_cpp/src/receiver/receiver.cc +++ b/piksi_multi_cpp/src/receiver/receiver.cc @@ -20,7 +20,7 @@ std::vector Receiver::kTypeVec = Receiver::Receiver(const ros::NodeHandle& nh, const std::shared_ptr& device) - : nh_(nh), device_(device), is_running_(true) { + : nh_(nh), device_(device), thread_exit_requested_(false) { // Initialize SBP state. state_ = std::make_shared(); sbp_state_init(state_.get()); @@ -54,8 +54,8 @@ std::shared_ptr Receiver::create( Receiver::~Receiver() { // Close thread. - is_running_.store(false); - process_thread_.join(); + thread_exit_requested_.store(true); + if (process_thread_.joinable()) process_thread_.join(); if (device_) device_->close(); } @@ -78,7 +78,8 @@ bool Receiver::init() { } void Receiver::process() { - while (is_running_.load()) { + // Setting thread_exit_requested_ will terminate the thread. + while (!thread_exit_requested_.load()) { if (!device_.get()) return; // Pass device pointer to process function. sbp_state_set_io_context(state_.get(), device_.get());