Skip to content

Commit

Permalink
Merge #1635: Fix asynchronous message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rhaschke committed Jun 18, 2021
2 parents 9b20006 + b7bc6ad commit 1df798c
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 58 deletions.
4 changes: 0 additions & 4 deletions src/rviz/default_plugin/depth_cloud_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,6 @@ void DepthCloudDisplay::onInitialize()
updateUseAutoSize();
updateUseOcclusionCompensation();

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
threaded_nh_.setCallbackQueue(pointcloud_common_->getCallbackQueue());

// Scan for available transport plugins
scanForTransportSubscriberPlugins();

Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/fluid_pressure_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ FluidPressureDisplay::FluidPressureDisplay() : point_cloud_common_(new PointClou
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your FluidPressure data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

FluidPressureDisplay::~FluidPressureDisplay()
Expand All @@ -65,6 +61,9 @@ FluidPressureDisplay::~FluidPressureDisplay()

void FluidPressureDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);

Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/illuminance_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ IlluminanceDisplay::IlluminanceDisplay() : point_cloud_common_(new PointCloudCom
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your Illuminance data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

IlluminanceDisplay::~IlluminanceDisplay()
Expand All @@ -65,6 +61,9 @@ IlluminanceDisplay::~IlluminanceDisplay()

void IlluminanceDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);

Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/laser_scan_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ LaserScanDisplay::LaserScanDisplay()
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your LaserScan data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

LaserScanDisplay::~LaserScanDisplay()
Expand All @@ -68,6 +64,9 @@ LaserScanDisplay::~LaserScanDisplay()

void LaserScanDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);
}
Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/point_cloud2_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ PointCloud2Display::PointCloud2Display() : point_cloud_common_(new PointCloudCom
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your PointCloud2 data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

PointCloud2Display::~PointCloud2Display()
Expand All @@ -65,6 +61,9 @@ PointCloud2Display::~PointCloud2Display()

void PointCloud2Display::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);
}
Expand Down
10 changes: 1 addition & 9 deletions src/rviz/default_plugin/point_cloud_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ void PointCloudCommon::CloudInfo::clear()

PointCloudCommon::PointCloudCommon(Display* display)
: auto_size_(false)
, spinner_(1, &cbqueue_)
, new_xyz_transformer_(false)
, new_color_transformer_(false)
, needs_retransform_(false)
Expand Down Expand Up @@ -377,18 +376,11 @@ void PointCloudCommon::initialize(DisplayContext* context, Ogre::SceneNode* scen
updateBillboardSize();
updateAlpha();
updateSelectable();

spinner_.start();
}

PointCloudCommon::~PointCloudCommon()
{
spinner_.stop();

if (transformer_class_loader_)
{
delete transformer_class_loader_;
}
delete transformer_class_loader_;
}

void PointCloudCommon::loadTransformers()
Expand Down
11 changes: 0 additions & 11 deletions src/rviz/default_plugin/point_cloud_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>

#include <ros/spinner.h>
#include <ros/callback_queue.h>

#include <message_filters/time_sequencer.h>

#include <pluginlib/class_loader.hpp>
Expand Down Expand Up @@ -130,11 +127,6 @@ class PointCloudCommon : public QObject
void addMessage(const sensor_msgs::PointCloudConstPtr& cloud);
void addMessage(const sensor_msgs::PointCloud2ConstPtr& cloud);

ros::CallbackQueueInterface* getCallbackQueue()
{
return &cbqueue_;
}

Display* getDisplay()
{
return display_;
Expand Down Expand Up @@ -187,9 +179,6 @@ private Q_SLOTS:
void setPropertiesHidden(const QList<Property*>& props, bool hide);
void fillTransformerOptions(EnumProperty* prop, uint32_t mask);

ros::AsyncSpinner spinner_;
ros::CallbackQueue cbqueue_;

D_CloudInfo cloud_infos_;

Ogre::SceneNode* scene_node_;
Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/point_cloud_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ PointCloudDisplay::PointCloudDisplay() : point_cloud_common_(new PointCloudCommo
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your PointCloud data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

PointCloudDisplay::~PointCloudDisplay()
Expand All @@ -63,6 +59,9 @@ PointCloudDisplay::~PointCloudDisplay()

void PointCloudDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);
}
Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/relative_humidity_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ RelativeHumidityDisplay::RelativeHumidityDisplay() : point_cloud_common_(new Poi
"from your RelativeHumidity data, but it can greatly increase memory usage if the "
"messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

RelativeHumidityDisplay::~RelativeHumidityDisplay()
Expand All @@ -66,6 +62,9 @@ RelativeHumidityDisplay::~RelativeHumidityDisplay()

void RelativeHumidityDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);

Expand Down
7 changes: 3 additions & 4 deletions src/rviz/default_plugin/temperature_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ TemperatureDisplay::TemperatureDisplay() : point_cloud_common_(new PointCloudCom
" Increasing this is useful if your incoming TF data is delayed significantly "
"from your Temperature data, but it can greatly increase memory usage if the messages are big.",
this, SLOT(updateQueueSize()));

// PointCloudCommon sets up a callback queue with a thread for each
// instance. Use that for processing incoming messages.
update_nh_.setCallbackQueue(point_cloud_common_->getCallbackQueue());
}

TemperatureDisplay::~TemperatureDisplay()
Expand All @@ -65,6 +61,9 @@ TemperatureDisplay::~TemperatureDisplay()

void TemperatureDisplay::onInitialize()
{
// Use the threaded queue for processing of incoming messages
update_nh_.setCallbackQueue(context_->getThreadedQueue());

MFDClass::onInitialize();
point_cloud_common_->initialize(context_, scene_node_);

Expand Down
1 change: 1 addition & 0 deletions src/rviz/displays_panel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void DisplaysPanel::onDuplicateDisplay()
QList<Display*> duplicated_displays;
QProgressDialog progress_dlg("Duplicating displays...", "Cancel", 0, displays_to_duplicate.size(),
this);
vis_manager_->stopUpdate();
progress_dlg.setWindowModality(Qt::WindowModal);
progress_dlg.show();
QApplication::processEvents(); // explicitly progress events for update
Expand Down
17 changes: 11 additions & 6 deletions src/rviz/visualization_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,26 @@ VisualizationManager::VisualizationManager(RenderPanel* render_panel,

selection_manager_ = new SelectionManager(this);

update_timer_ = new QTimer;
connect(update_timer_, SIGNAL(timeout()), this, SLOT(onUpdate()));

private_->threaded_queue_threads_.create_thread(
boost::bind(&VisualizationManager::threadedQueueThreadFunc, this));

display_factory_ = new DisplayFactory();

ogre_render_queue_clearer_ = new OgreRenderQueueClearer();
Ogre::Root::getSingletonPtr()->addFrameListener(ogre_render_queue_clearer_);

update_timer_ = new QTimer;
connect(update_timer_, SIGNAL(timeout()), this, SLOT(onUpdate()));
}

VisualizationManager::~VisualizationManager()
{
delete update_timer_;

update_timer_->stop();
shutting_down_ = true;
private_->threaded_queue_threads_.join_all();

delete update_timer_;

if (selection_manager_)
{
selection_manager_->setSelection(M_Picked());
Expand Down Expand Up @@ -630,9 +631,13 @@ void VisualizationManager::handleChar(QKeyEvent* event, RenderPanel* panel)

void VisualizationManager::threadedQueueThreadFunc()
{
ros::WallDuration timeout(0.1);
while (!shutting_down_)
{
private_->threaded_queue_.callOne(ros::WallDuration(0.1));
if (update_timer_->isActive())
private_->threaded_queue_.callOne(timeout);
else
timeout.sleep();
}
}

Expand Down

0 comments on commit 1df798c

Please sign in to comment.