Skip to content

Commit

Permalink
Merge pull request #106 from ethz-asl/feature/multithreading
Browse files Browse the repository at this point in the history
Feature/multithreading
  • Loading branch information
rikba authored Oct 10, 2019
2 parents 976c55f + 573e291 commit 35fc411
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 20 deletions.
13 changes: 10 additions & 3 deletions piksi_multi_cpp/include/piksi_multi_cpp/receiver/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

#include <libsbp/sbp.h>
#include <ros/ros.h>
#include <atomic>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "piksi_multi_cpp/device/device.h"
#include "piksi_multi_cpp/receiver/receiver.h"
Expand Down Expand Up @@ -41,8 +43,6 @@ class Receiver {

// Open device.
bool init();
// Read device and process SBP callbacks.
void process();

// Factory methods to create receivers.

Expand Down Expand Up @@ -75,9 +75,16 @@ class Receiver {
static ReceiverType inferType(const std::shared_ptr<Device>& dev);
static std::string createNameSpace(const ReceiverType type, const size_t id);

// Read device and process SBP callbacks.
void process();
// Start thread that reads device and processes SBP messages. This thread is
// terminated when the is_running flag ist set false during object
// destruction.
std::thread process_thread_;
std::atomic_bool thread_exit_requested_;

// The sbp state.
std::shared_ptr<sbp_state_t> state_;

// SBP callbacks common for all receivers.
std::vector<std::shared_ptr<SBPCallback>> cb_;
};
Expand Down
9 changes: 2 additions & 7 deletions piksi_multi_cpp/src/piksi_multi_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,15 @@ int main(int argc, char** argv) {
exit(1);
}

// Initialization
// Start all receivers.
for (auto rec : receivers) {
if (!rec->init()) {
ROS_FATAL("Error initializing receiver.");
exit(1);
}
}

// Process incoming data.
// TODO(rikba): Multithreading!
while (ros::ok()) {
for (auto rec : receivers) rec->process();
ros::spinOnce();
}
ros::spin();

return 0;
}
36 changes: 26 additions & 10 deletions piksi_multi_cpp/src/receiver/receiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ std::vector<Receiver::ReceiverType> Receiver::kTypeVec =

Receiver::Receiver(const ros::NodeHandle& nh,
const std::shared_ptr<Device>& device)
: nh_(nh), device_(device) {
: nh_(nh), device_(device), thread_exit_requested_(false) {
// Initialize SBP state.
state_ = std::make_shared<sbp_state_t>();
sbp_state_init(state_.get());
Expand Down Expand Up @@ -53,6 +53,10 @@ std::shared_ptr<Receiver> Receiver::create(
}

Receiver::~Receiver() {
// Close thread.
thread_exit_requested_.store(true);
if (process_thread_.joinable()) process_thread_.join();

if (device_) device_->close();
}

Expand All @@ -61,18 +65,30 @@ bool Receiver::init() {
ROS_ERROR("Device not set.");
return false;
}
return device_->open();

// Open attached device.
if (!device_->open()) {
ROS_ERROR("Cannot open device.");
return false;
}

process_thread_ = std::thread(&Receiver::process, this);

return true;
}

void Receiver::process() {
if (!device_.get()) return;
// Pass device pointer to process function.
sbp_state_set_io_context(state_.get(), device_.get());
// Pass device read function to sbp_process.
int result =
sbp_process(state_.get(), &piksi_multi_cpp::Device::read_redirect);
if (result < 0) {
ROS_WARN_STREAM("Error sbp_process: " << result);
// Setting thread_exit_requested_ will terminate the thread.
while (!thread_exit_requested_.load()) {
if (!device_.get()) return;
// Pass device pointer to process function.
sbp_state_set_io_context(state_.get(), device_.get());
// Pass device read function to sbp_process.
int result =
sbp_process(state_.get(), &piksi_multi_cpp::Device::read_redirect);
if (result < 0) {
ROS_WARN_STREAM("Error sbp_process: " << result);
}
}
}

Expand Down

0 comments on commit 35fc411

Please sign in to comment.