Skip to content

Commit

Permalink
gz world: use gz-transport to fix unclosed sockets (#3374)
Browse files Browse the repository at this point in the history
This fixes the issue with unclosed sockets leading to
"too many open files" errors after repeated runs of
`gz world` by adding a gz-transport /world_control
topic alongside the ~/world_control classic topic.
This works because gz-transport does a better job
closing sockets.

Subscribers to /world_control are added alongside
any existing subscribers to ~/world_control and the
`gz world` tool is changed to publish only on
/world_control.

Fixes #3341.

Signed-off-by: Steve Peters <scpeters@openrobotics.org>
  • Loading branch information
scpeters authored Feb 23, 2024
1 parent 9442fa6 commit fa2e5e2
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 39 deletions.
8 changes: 8 additions & 0 deletions Migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ Deprecated code produces compile-time warnings. These warning serve as
notification to users that their code should be upgraded. The next major
release will remove the deprecated code.

## Gazebo 11.14 to 11.15

### Modifications

The `gz world` tool now publishes a WorldControl message using gz-transport,
and the `physics::World` class and other subscribers now subscribe to both
`gz-transport` (ZeroMQ) and `gazebo_transport` (boost asio) topics.

## Gazebo 11.2 to 11.3

### Modifications
Expand Down
25 changes: 22 additions & 3 deletions gazebo/gui/plot/PlotManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <mutex>

#include <ignition/transport.hh>

#include "gazebo/msgs/msgs.hh"
#include "gazebo/transport/TransportIface.hh"
Expand Down Expand Up @@ -50,6 +51,9 @@ namespace gazebo
/// \brief Subscriber to the world control topic
public: transport::SubscriberPtr worldControlSub;

/// \brief Node for ignition transport communication.
public: ignition::transport::Node ignNode;

/// \brief Handler for updating introspection curves
public: IntrospectionCurveHandler introspectionCurve;

Expand All @@ -73,6 +77,15 @@ PlotManager::PlotManager()
this->dataPtr->worldControlSub =
this->dataPtr->node->Subscribe("~/world_control",
&PlotManager::OnWorldControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&PlotManager::OnControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}
}

/////////////////////////////////////////////////
Expand All @@ -84,10 +97,16 @@ PlotManager::~PlotManager()
/////////////////////////////////////////////////
void PlotManager::OnWorldControl(ConstWorldControlPtr &_data)
{
if (_data->has_reset())
this->OnControl(*_data);
}

/////////////////////////////////////////////////
void PlotManager::OnControl(const msgs::WorldControl &_data)
{
if (_data.has_reset())
{
if ((_data->reset().has_all() && _data->reset().all()) ||
(_data->reset().has_time_only() && _data->reset().time_only()))
if ((_data.reset().has_all() && _data.reset().all()) ||
(_data.reset().has_time_only() && _data.reset().time_only()))
{
std::lock_guard<std::mutex> lock(this->dataPtr->mutex);
for (auto &w : this->dataPtr->windows)
Expand Down
8 changes: 7 additions & 1 deletion gazebo/gui/plot/PlotManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace gazebo
/// \brief Destructor.
public: virtual ~PlotManager();

/// \brief Callback when a world control message is received. It is used
/// \brief Callback when a world control message is received over
/// gazebo_transport using boost asio. It is used
/// to detect simulation resets.
/// \param[in] _data Message data containing world control commands
public: void OnWorldControl(ConstWorldControlPtr &_data);
Expand Down Expand Up @@ -90,6 +91,11 @@ namespace gazebo
/// \brief Returns a pointer to the unique (static) instance
public: static PlotManager* Instance();

/// \brief Callback when a world control message is received over
/// gz-transport using ZeroMQ. It is used to detect simulation resets.
/// \param[in] _data Message data containing world control commands
private: void OnControl(const msgs::WorldControl &_data);

/// \brief This is a singleton class.
private: friend class SingletonT<PlotManager>;

Expand Down
39 changes: 27 additions & 12 deletions gazebo/physics/World.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ void World::Load(sdf::ElementPtr _sdf)
&World::OnFactoryMsg, this);
this->dataPtr->controlSub = this->dataPtr->node->Subscribe("~/world_control",
&World::OnControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&World::OnWorldControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}
this->dataPtr->playbackControlSub = this->dataPtr->node->Subscribe(
"~/playback_control", &World::OnPlaybackControl, this);

Expand Down Expand Up @@ -1609,43 +1618,49 @@ void World::OnFactoryMsg(ConstFactoryPtr &_msg)
//////////////////////////////////////////////////
void World::OnControl(ConstWorldControlPtr &_data)
{
if (_data->has_pause())
this->SetPaused(_data->pause());
this->OnWorldControl(*_data);
}

//////////////////////////////////////////////////
void World::OnWorldControl(const msgs::WorldControl &_data)
{
if (_data.has_pause())
this->SetPaused(_data.pause());

if (_data->has_step())
if (_data.has_step())
this->OnStep();

if (_data->has_multi_step())
if (_data.has_multi_step())
{
// stepWorld is a blocking call so set stepInc directly so that world stats
// will still be published
this->SetPaused(true);
std::lock_guard<std::recursive_mutex> lock(this->dataPtr->worldUpdateMutex);
this->dataPtr->stepInc = _data->multi_step();
this->dataPtr->stepInc = _data.multi_step();
}

if (_data->has_seed())
if (_data.has_seed())
{
ignition::math::Rand::Seed(_data->seed());
this->dataPtr->physicsEngine->SetSeed(_data->seed());
ignition::math::Rand::Seed(_data.seed());
this->dataPtr->physicsEngine->SetSeed(_data.seed());
}

if (_data->has_reset())
if (_data.has_reset())
{
this->dataPtr->needsReset = true;

if (_data->reset().has_all() && _data->reset().all())
if (_data.reset().has_all() && _data.reset().all())
{
this->dataPtr->resetAll = true;
}
else
{
this->dataPtr->resetAll = false;

if (_data->reset().has_time_only() && _data->reset().time_only())
if (_data.reset().has_time_only() && _data.reset().time_only())
this->dataPtr->resetTimeOnly = true;

if (_data->reset().has_model_only() && _data->reset().model_only())
if (_data.reset().has_model_only() && _data.reset().model_only())
this->dataPtr->resetModelOnly = true;
}
}
Expand Down
8 changes: 7 additions & 1 deletion gazebo/physics/World.hh
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,16 @@ namespace gazebo
/// \brief Step callback.
private: void OnStep();

/// \brief Called when a world control message is received.
/// \brief Called when a world control message is received on the
/// gazebo_transport topic using boost asio.
/// \param[in] _data The world control message.
private: void OnControl(ConstWorldControlPtr &_data);

/// \brief Called when a world control message is received on the
/// gz-transport topic using ZeroMQ.
/// \param[in] _data The world control message.
private: void OnWorldControl(const msgs::WorldControl &_data);

/// \brief Called when log playback control message is received.
/// \param[in] _data The log playback control message.
private: void OnPlaybackControl(ConstLogPlaybackControlPtr &_data);
Expand Down
17 changes: 16 additions & 1 deletion gazebo/rendering/OculusCamera.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ OculusCamera::OculusCamera(const std::string &_name, ScenePtr _scene)

this->dataPtr->controlSub = this->dataPtr->node->Subscribe("~/world_control",
&OculusCamera::OnControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&OculusCamera::OnWorldControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}

// Oculus is now ready.
this->dataPtr->ready = true;
Expand Down Expand Up @@ -162,7 +171,13 @@ void OculusCamera::Load(sdf::ElementPtr _sdf)
//////////////////////////////////////////////////
void OculusCamera::OnControl(ConstWorldControlPtr &_data)
{
if (_data->has_reset() && _data->reset().has_all() && _data->reset().all())
this->OnWorldControl(*_data);
}

//////////////////////////////////////////////////
void OculusCamera::OnWorldControl(const msgs::WorldControl &_data)
{
if (_data.has_reset() && _data.reset().has_all() && _data.reset().all())
{
this->ResetSensor();
}
Expand Down
8 changes: 6 additions & 2 deletions gazebo/rendering/OculusCamera.hh
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ namespace gazebo
/// \return True if the camera is now tracking the visual.
protected: virtual bool TrackVisualImpl(VisualPtr _visual);

/// \brief Receive world control messages. Used to reset the oculus
/// sensor.
/// \brief Receive world control messages over gazebo_transport using
/// boost asio. Used to reset the oculus sensor.
private: void OnControl(ConstWorldControlPtr &_data);

/// \brief Receive world control messages over gz-transport using
/// ZeroMQ. Used to reset the oculus sensor.
private: void OnWorldControl(const msgs::WorldControl &_data);

/// \brief Apply distorsion to the render target.
private: void Oculus();

Expand Down
4 changes: 4 additions & 0 deletions gazebo/rendering/OculusCameraPrivate.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _GAZEBO_RENDERING_OCULUS_CAMERA_PRIVATE_HH_

#include <OVR_CAPI.h>
#include <ignition/transport.hh>
#include "gazebo/util/system.hh"

namespace Ogre
Expand Down Expand Up @@ -73,6 +74,9 @@ namespace gazebo
/// \brief Subscriber used to receive updates on world_control topic.
public: transport::SubscriberPtr controlSub;

/// \brief Node for ignition transport communication.
public: ignition::transport::Node ignNode;

/// \brief True when Oculus is connected and ready to use.
public: bool ready;

Expand Down
2 changes: 1 addition & 1 deletion test/performance/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ if (NOT APPLE AND NOT WIN32)
set(tool_tests
gz_stress.cc
)
gz_build_tests(${tool_tests} EXTRA_LIBS gazebo_transport)
gz_build_tests(${tool_tests} EXTRA_LIBS gazebo_transport ${IGNITION-TRANSPORT_LIBRARIES})
endif()
11 changes: 5 additions & 6 deletions test/performance/gz_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/filesystem.hpp>
#include <ignition/transport/Node.hh>

#include <gazebo/common/CommonIface.hh>
#include <gazebo/msgs/msgs.hh>
Expand All @@ -40,10 +41,10 @@ pid_t g_pid = -1;
boost::condition_variable g_msgCondition;

/////////////////////////////////////////////////
void WorldControlCB(ConstWorldControlPtr &_msg)
void WorldControlCB(const msgs::WorldControl &_msg)
{
boost::mutex::scoped_lock lock(g_mutex);
g_msgDebugOut = _msg->DebugString();
g_msgDebugOut = _msg.DebugString();
g_msgCondition.notify_all();
}

Expand Down Expand Up @@ -149,10 +150,8 @@ TEST_F(gzTest, Stress)
{
init();

gazebo::transport::NodePtr node(new gazebo::transport::Node());
node->Init();
gazebo::transport::SubscriberPtr sub =
node->Subscribe("~/world_control", &WorldControlCB, true);
ignition::transport::Node ignNode;
ignNode.Subscribe("/world_control", &WorldControlCB);

// Run the transport loop: starts a new thread
gazebo::transport::run();
Expand Down
29 changes: 23 additions & 6 deletions tools/gz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,25 @@ bool WorldCommand::RunImpl()
if (this->vm.count("world-name"))
worldName = this->vm["world-name"].as<std::string>();

transport::NodePtr node(new transport::Node());
node->Init(worldName);
ignition::transport::Node ignNode;

transport::PublisherPtr pub =
node->Advertise<msgs::WorldControl>("~/world_control");
pub->WaitForConnection();
const std::string topic{"/world_control"};
auto pub = ignNode.Advertise<msgs::WorldControl>(topic);

// Wait for subscribers
unsigned int maxSleep = 30;
unsigned int sleep = 0;
unsigned int mSleep = 100;
for (; sleep < maxSleep && !pub.HasConnections(); ++sleep)
{
common::Time::MSleep(mSleep);
}
if (sleep == maxSleep)
{
gzerr << "No subscribers to topic [" << topic <<"], timed out after " <<
maxSleep * mSleep << "ms." << std::endl;
return false;
}

msgs::WorldControl msg;
bool good = false;
Expand Down Expand Up @@ -285,9 +298,13 @@ bool WorldCommand::RunImpl()
}

if (good)
pub->Publish(msg, true);
{
pub.Publish(msg);
}
else
{
this->Help();
}

return true;
}
Expand Down
10 changes: 4 additions & 6 deletions tools/gz_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ void FactoryCB(ConstFactoryPtr &_msg)
}

/////////////////////////////////////////////////
void WorldControlCB(ConstWorldControlPtr &_msg)
void WorldControlCB(const msgs::WorldControl &_msg)
{
boost::mutex::scoped_lock lock(g_mutex);
g_msgDebugOut = _msg->DebugString();
g_msgDebugOut = _msg.DebugString();
g_msgCondition.notify_all();
}

Expand Down Expand Up @@ -503,10 +503,8 @@ TEST_F(gzTest, World)
std::string helpOutput = custom_exec_str("gz help world");
EXPECT_NE(helpOutput.find("gz world"), std::string::npos);

gazebo::transport::NodePtr node(new gazebo::transport::Node());
node->Init();
gazebo::transport::SubscriberPtr sub =
node->Subscribe("~/world_control", &WorldControlCB);
ignition::transport::Node ignNode;
ignNode.Subscribe("/world_control", &WorldControlCB);

// Test world pause
{
Expand Down

0 comments on commit fa2e5e2

Please sign in to comment.