Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gz world: use gz-transport to fix unclosed sockets #3374

Merged
merged 4 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ Deprecated code produces compile-time warnings. These warning serve as
notification to users that their code should be upgraded. The next major
release will remove the deprecated code.

## Gazebo 11.14 to 11.15

### Modifications

The `gz world` tool now publishes a WorldControl message using gz-transport,
and the `physics::World` class and other subscribers now subscribe to both
`gz-transport` (ZeroMQ) and `gazebo_transport` (boost asio) topics.

## Gazebo 11.2 to 11.3

### Modifications
Expand Down
25 changes: 22 additions & 3 deletions gazebo/gui/plot/PlotManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <mutex>

#include <ignition/transport.hh>

#include "gazebo/msgs/msgs.hh"
#include "gazebo/transport/TransportIface.hh"
Expand Down Expand Up @@ -50,6 +51,9 @@ namespace gazebo
/// \brief Subscriber to the world control topic
public: transport::SubscriberPtr worldControlSub;

/// \brief Node for ignition transport communication.
public: ignition::transport::Node ignNode;

/// \brief Handler for updating introspection curves
public: IntrospectionCurveHandler introspectionCurve;

Expand All @@ -73,6 +77,15 @@ PlotManager::PlotManager()
this->dataPtr->worldControlSub =
this->dataPtr->node->Subscribe("~/world_control",
&PlotManager::OnWorldControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&PlotManager::OnControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}
}

/////////////////////////////////////////////////
Expand All @@ -84,10 +97,16 @@ PlotManager::~PlotManager()
/////////////////////////////////////////////////
void PlotManager::OnWorldControl(ConstWorldControlPtr &_data)
{
if (_data->has_reset())
this->OnControl(*_data);
}

/////////////////////////////////////////////////
void PlotManager::OnControl(const msgs::WorldControl &_data)
{
if (_data.has_reset())
{
if ((_data->reset().has_all() && _data->reset().all()) ||
(_data->reset().has_time_only() && _data->reset().time_only()))
if ((_data.reset().has_all() && _data.reset().all()) ||
(_data.reset().has_time_only() && _data.reset().time_only()))
{
std::lock_guard<std::mutex> lock(this->dataPtr->mutex);
for (auto &w : this->dataPtr->windows)
Expand Down
8 changes: 7 additions & 1 deletion gazebo/gui/plot/PlotManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace gazebo
/// \brief Destructor.
public: virtual ~PlotManager();

/// \brief Callback when a world control message is received. It is used
/// \brief Callback when a world control message is received over
/// gazebo_transport using boost asio. It is used
/// to detect simulation resets.
/// \param[in] _data Message data containing world control commands
public: void OnWorldControl(ConstWorldControlPtr &_data);
Expand Down Expand Up @@ -90,6 +91,11 @@ namespace gazebo
/// \brief Returns a pointer to the unique (static) instance
public: static PlotManager* Instance();

/// \brief Callback when a world control message is received over
/// gz-transport using ZeroMQ. It is used to detect simulation resets.
/// \param[in] _data Message data containing world control commands
private: void OnControl(const msgs::WorldControl &_data);

/// \brief This is a singleton class.
private: friend class SingletonT<PlotManager>;

Expand Down
39 changes: 27 additions & 12 deletions gazebo/physics/World.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ void World::Load(sdf::ElementPtr _sdf)
&World::OnFactoryMsg, this);
this->dataPtr->controlSub = this->dataPtr->node->Subscribe("~/world_control",
&World::OnControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&World::OnWorldControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}
this->dataPtr->playbackControlSub = this->dataPtr->node->Subscribe(
"~/playback_control", &World::OnPlaybackControl, this);

Expand Down Expand Up @@ -1609,43 +1618,49 @@ void World::OnFactoryMsg(ConstFactoryPtr &_msg)
//////////////////////////////////////////////////
void World::OnControl(ConstWorldControlPtr &_data)
{
if (_data->has_pause())
this->SetPaused(_data->pause());
this->OnWorldControl(*_data);
}

//////////////////////////////////////////////////
void World::OnWorldControl(const msgs::WorldControl &_data)
{
if (_data.has_pause())
this->SetPaused(_data.pause());

if (_data->has_step())
if (_data.has_step())
this->OnStep();

if (_data->has_multi_step())
if (_data.has_multi_step())
{
// stepWorld is a blocking call so set stepInc directly so that world stats
// will still be published
this->SetPaused(true);
std::lock_guard<std::recursive_mutex> lock(this->dataPtr->worldUpdateMutex);
this->dataPtr->stepInc = _data->multi_step();
this->dataPtr->stepInc = _data.multi_step();
}

if (_data->has_seed())
if (_data.has_seed())
{
ignition::math::Rand::Seed(_data->seed());
this->dataPtr->physicsEngine->SetSeed(_data->seed());
ignition::math::Rand::Seed(_data.seed());
this->dataPtr->physicsEngine->SetSeed(_data.seed());
}

if (_data->has_reset())
if (_data.has_reset())
{
this->dataPtr->needsReset = true;

if (_data->reset().has_all() && _data->reset().all())
if (_data.reset().has_all() && _data.reset().all())
{
this->dataPtr->resetAll = true;
}
else
{
this->dataPtr->resetAll = false;

if (_data->reset().has_time_only() && _data->reset().time_only())
if (_data.reset().has_time_only() && _data.reset().time_only())
this->dataPtr->resetTimeOnly = true;

if (_data->reset().has_model_only() && _data->reset().model_only())
if (_data.reset().has_model_only() && _data.reset().model_only())
this->dataPtr->resetModelOnly = true;
}
}
Expand Down
8 changes: 7 additions & 1 deletion gazebo/physics/World.hh
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,16 @@ namespace gazebo
/// \brief Step callback.
private: void OnStep();

/// \brief Called when a world control message is received.
/// \brief Called when a world control message is received on the
/// gazebo_transport topic using boost asio.
/// \param[in] _data The world control message.
private: void OnControl(ConstWorldControlPtr &_data);

/// \brief Called when a world control message is received on the
/// gz-transport topic using ZeroMQ.
/// \param[in] _data The world control message.
private: void OnWorldControl(const msgs::WorldControl &_data);

/// \brief Called when log playback control message is received.
/// \param[in] _data The log playback control message.
private: void OnPlaybackControl(ConstLogPlaybackControlPtr &_data);
Expand Down
17 changes: 16 additions & 1 deletion gazebo/rendering/OculusCamera.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ OculusCamera::OculusCamera(const std::string &_name, ScenePtr _scene)

this->dataPtr->controlSub = this->dataPtr->node->Subscribe("~/world_control",
&OculusCamera::OnControl, this);
{
// Also subscribe to WorldControl messages over ZeroMQ-based gz-transport
std::string worldControlTopic("/world_control");
if (!this->dataPtr->ignNode.Subscribe(worldControlTopic,
&OculusCamera::OnWorldControl, this))
{
gzerr << "Error advertising topic [" << worldControlTopic << "]\n";
}
}

// Oculus is now ready.
this->dataPtr->ready = true;
Expand Down Expand Up @@ -162,7 +171,13 @@ void OculusCamera::Load(sdf::ElementPtr _sdf)
//////////////////////////////////////////////////
void OculusCamera::OnControl(ConstWorldControlPtr &_data)
{
if (_data->has_reset() && _data->reset().has_all() && _data->reset().all())
this->OnWorldControl(*_data);
}

//////////////////////////////////////////////////
void OculusCamera::OnWorldControl(const msgs::WorldControl &_data)
{
if (_data.has_reset() && _data.reset().has_all() && _data.reset().all())
{
this->ResetSensor();
}
Expand Down
8 changes: 6 additions & 2 deletions gazebo/rendering/OculusCamera.hh
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ namespace gazebo
/// \return True if the camera is now tracking the visual.
protected: virtual bool TrackVisualImpl(VisualPtr _visual);

/// \brief Receive world control messages. Used to reset the oculus
/// sensor.
/// \brief Receive world control messages over gazebo_transport using
/// boost asio. Used to reset the oculus sensor.
private: void OnControl(ConstWorldControlPtr &_data);

/// \brief Receive world control messages over gz-transport using
/// ZeroMQ. Used to reset the oculus sensor.
private: void OnWorldControl(const msgs::WorldControl &_data);

/// \brief Apply distorsion to the render target.
private: void Oculus();

Expand Down
4 changes: 4 additions & 0 deletions gazebo/rendering/OculusCameraPrivate.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define _GAZEBO_RENDERING_OCULUS_CAMERA_PRIVATE_HH_

#include <OVR_CAPI.h>
#include <ignition/transport.hh>
#include "gazebo/util/system.hh"

namespace Ogre
Expand Down Expand Up @@ -73,6 +74,9 @@ namespace gazebo
/// \brief Subscriber used to receive updates on world_control topic.
public: transport::SubscriberPtr controlSub;

/// \brief Node for ignition transport communication.
public: ignition::transport::Node ignNode;

/// \brief True when Oculus is connected and ready to use.
public: bool ready;

Expand Down
2 changes: 1 addition & 1 deletion test/performance/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ if (NOT APPLE AND NOT WIN32)
set(tool_tests
gz_stress.cc
)
gz_build_tests(${tool_tests} EXTRA_LIBS gazebo_transport)
gz_build_tests(${tool_tests} EXTRA_LIBS gazebo_transport ${IGNITION-TRANSPORT_LIBRARIES})
endif()
11 changes: 5 additions & 6 deletions test/performance/gz_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/filesystem.hpp>
#include <ignition/transport/Node.hh>

#include <gazebo/common/CommonIface.hh>
#include <gazebo/msgs/msgs.hh>
Expand All @@ -40,10 +41,10 @@ pid_t g_pid = -1;
boost::condition_variable g_msgCondition;

/////////////////////////////////////////////////
void WorldControlCB(ConstWorldControlPtr &_msg)
void WorldControlCB(const msgs::WorldControl &_msg)
{
boost::mutex::scoped_lock lock(g_mutex);
g_msgDebugOut = _msg->DebugString();
g_msgDebugOut = _msg.DebugString();
g_msgCondition.notify_all();
}

Expand Down Expand Up @@ -149,10 +150,8 @@ TEST_F(gzTest, Stress)
{
init();

gazebo::transport::NodePtr node(new gazebo::transport::Node());
node->Init();
gazebo::transport::SubscriberPtr sub =
node->Subscribe("~/world_control", &WorldControlCB, true);
ignition::transport::Node ignNode;
ignNode.Subscribe("/world_control", &WorldControlCB);

// Run the transport loop: starts a new thread
gazebo::transport::run();
Expand Down
29 changes: 23 additions & 6 deletions tools/gz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,25 @@ bool WorldCommand::RunImpl()
if (this->vm.count("world-name"))
worldName = this->vm["world-name"].as<std::string>();

transport::NodePtr node(new transport::Node());
node->Init(worldName);
ignition::transport::Node ignNode;

transport::PublisherPtr pub =
node->Advertise<msgs::WorldControl>("~/world_control");
pub->WaitForConnection();
const std::string topic{"/world_control"};
auto pub = ignNode.Advertise<msgs::WorldControl>(topic);

// Wait for subscribers
unsigned int maxSleep = 30;
unsigned int sleep = 0;
unsigned int mSleep = 100;
for (; sleep < maxSleep && !pub.HasConnections(); ++sleep)
{
common::Time::MSleep(mSleep);
}
if (sleep == maxSleep)
{
gzerr << "No subscribers to topic [" << topic <<"], timed out after " <<
maxSleep * mSleep << "ms." << std::endl;
return false;
}

msgs::WorldControl msg;
bool good = false;
Expand Down Expand Up @@ -285,9 +298,13 @@ bool WorldCommand::RunImpl()
}

if (good)
pub->Publish(msg, true);
{
pub.Publish(msg);
}
else
{
this->Help();
}

return true;
}
Expand Down
10 changes: 4 additions & 6 deletions tools/gz_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ void FactoryCB(ConstFactoryPtr &_msg)
}

/////////////////////////////////////////////////
void WorldControlCB(ConstWorldControlPtr &_msg)
void WorldControlCB(const msgs::WorldControl &_msg)
{
boost::mutex::scoped_lock lock(g_mutex);
g_msgDebugOut = _msg->DebugString();
g_msgDebugOut = _msg.DebugString();
g_msgCondition.notify_all();
}

Expand Down Expand Up @@ -503,10 +503,8 @@ TEST_F(gzTest, World)
std::string helpOutput = custom_exec_str("gz help world");
EXPECT_NE(helpOutput.find("gz world"), std::string::npos);

gazebo::transport::NodePtr node(new gazebo::transport::Node());
node->Init();
gazebo::transport::SubscriberPtr sub =
node->Subscribe("~/world_control", &WorldControlCB);
ignition::transport::Node ignNode;
ignNode.Subscribe("/world_control", &WorldControlCB);

// Test world pause
{
Expand Down
Loading