diff --git a/README.md b/README.md index 55ff4c6..ffabc69 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ Parameters are provided to configure the behavior of the bridge. These parameter * (ROS 1) __max_update_ms__: The maximum number of milliseconds to wait in between polling `roscore` for new topics, services, or parameters. Defaults to `5000`. * (ROS 2) __num_threads__: The number of threads to use for the ROS node executor. This controls the number of subscriptions that can be processed in parallel. 0 means one thread per CPU core. Defaults to `0`. * (ROS 2) __min_qos_depth__: Minimum depth used for the QoS profile of subscriptions. Defaults to `1`. This is to set a lower limit for a subscriber's QoS depth which is computed by summing up depths of all publishers. See also [#208](https://github.com/foxglove/ros-foxglove-bridge/issues/208). - * (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `10`. + * (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `25`. * (ROS 2) __include_hidden__: Include hidden topics and services. Defaults to `false`. ## Building from source diff --git a/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp b/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp index daacb8a..ff1483a 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/test/test_client.hpp @@ -22,6 +22,9 @@ std::future waitForServiceResponse(std::shared_ptr waitForService(std::shared_ptr client, const std::string& serviceName); +std::future waitForChannel(std::shared_ptr client, + const std::string& topicName); + extern template class Client; } // namespace foxglove diff --git a/foxglove_bridge_base/src/test/test_client.cpp b/foxglove_bridge_base/src/test/test_client.cpp index 0d8cdb6..019f23d 100644 --- a/foxglove_bridge_base/src/test/test_client.cpp +++ b/foxglove_bridge_base/src/test/test_client.cpp @@ -12,34 +12,29 @@ namespace foxglove { constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(5); std::vector connectClientAndReceiveMsg(const std::string& uri, - const std::string& topic_name) { + const std::string& topicName) { // Set up text message handler to resolve the promise when the topic is advertised - foxglove::Client wsClient; + auto wsClient = std::make_shared>(); std::promise channelPromise; - auto channelFuture = channelPromise.get_future(); - wsClient.setTextMessageHandler([&topic_name, &channelPromise](const std::string& payload) { - const auto msg = nlohmann::json::parse(payload); - const auto& op = msg.at("op").get(); - if (op == "advertise") { - for (const auto& channel : msg.at("channels")) { - if (topic_name == channel.at("topic")) { - channelPromise.set_value(channel); - } - } - } - }); + auto channelFuture = waitForChannel(wsClient, topicName); // Connect the client and wait for the channel future - if (std::future_status::ready != wsClient.connect(uri).wait_for(DEFAULT_TIMEOUT)) { + if (std::future_status::ready != wsClient->connect(uri).wait_for(DEFAULT_TIMEOUT)) { throw std::runtime_error("Client failed to connect"); } else if (std::future_status::ready != channelFuture.wait_for(DEFAULT_TIMEOUT)) { throw std::runtime_error("Client failed to receive channel"); } + const auto channel = channelFuture.get(); + const SubscriptionId subscriptionId = 1; + // Set up binary message handler to resolve when a binary message has been received std::promise> msgPromise; auto msgFuture = msgPromise.get_future(); - wsClient.setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) { + wsClient->setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) { + if (ReadUint32LE(data + 1) != subscriptionId) { + return; + } const size_t offset = 1 + 4 + 8; std::vector dataCopy(dataLength - offset); std::memcpy(dataCopy.data(), data + offset, dataLength - offset); @@ -47,8 +42,7 @@ std::vector connectClientAndReceiveMsg(const std::string& uri, }); // Subscribe to the channel that corresponds to the topic - const auto channelId = channelFuture.get().at("id").get(); - wsClient.subscribe({{1, channelId}}); + wsClient->subscribe({{subscriptionId, channel.id}}); // Wait until we have received a binary message if (std::future_status::ready != msgFuture.wait_for(DEFAULT_TIMEOUT)) { @@ -118,6 +112,30 @@ std::future waitForService(std::shared_ptr client, return future; } +std::future waitForChannel(std::shared_ptr client, + const std::string& topicName) { + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + client->setTextMessageHandler( + [promise = std::move(promise), topicName](const std::string& payload) mutable { + const auto msg = nlohmann::json::parse(payload); + const auto& op = msg["op"].get(); + + if (op == "advertise") { + const auto channels = msg["channels"].get>(); + for (const auto& channel : channels) { + if (channel.topic == topicName) { + promise->set_value(channel); + break; + } + } + } + }); + + return future; +} + // Explicit template instantiation template class Client; diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp index d42bb1b..8a2f45d 100644 --- a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp +++ b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp @@ -28,7 +28,7 @@ constexpr int64_t DEFAULT_PORT = 8765; constexpr char DEFAULT_ADDRESS[] = "0.0.0.0"; constexpr int64_t DEFAULT_SEND_BUFFER_LIMIT = 10000000; constexpr int64_t DEFAULT_MIN_QOS_DEPTH = 1; -constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 10; +constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 25; void declareParameters(rclcpp::Node* node); diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index a429d8f..4a65b0e 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -457,10 +457,27 @@ void FoxgloveBridge::subscribe(foxglove::ChannelId channelId, ConnectionHandle c if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { ++durabilityTransientLocalEndpointsCount; } - depth = std::min(_maxQosDepth, depth + qos.depth()); - } - - rclcpp::QoS qos{rclcpp::KeepLast(std::max(depth, _minQosDepth))}; + // Some RMWs do not retrieve history information of the publisher endpoint in which case the + // history depth is 0. We use a lower limit of 1 here, so that the history depth is at least + // equal to the number of publishers. This covers the case where there are multiple + // transient_local publishers with a depth of 1 (e.g. multiple tf_static transform + // broadcasters). See also + // https://github.com/foxglove/ros-foxglove-bridge/issues/238 and + // https://github.com/foxglove/ros-foxglove-bridge/issues/208 + const size_t publisherHistoryDepth = std::max(1ul, qos.depth()); + depth = depth + publisherHistoryDepth; + } + + depth = std::max(depth, _minQosDepth); + if (depth > _maxQosDepth) { + RCLCPP_WARN(this->get_logger(), + "Limiting history depth for topic '%s' to %zu (was %zu). You may want to increase " + "the max_qos_depth parameter value.", + topic.c_str(), _maxQosDepth, depth); + depth = _maxQosDepth; + } + + rclcpp::QoS qos{rclcpp::KeepLast(depth)}; // If all endpoints are reliable, ask for reliable if (reliabilityReliableEndpointsCount == publisherInfo.size()) { diff --git a/ros2_foxglove_bridge/tests/smoke_test.cpp b/ros2_foxglove_bridge/tests/smoke_test.cpp index 172e68c..19624e4 100644 --- a/ros2_foxglove_bridge/tests/smoke_test.cpp +++ b/ros2_foxglove_bridge/tests/smoke_test.cpp @@ -496,6 +496,53 @@ TEST_F(ServiceTest, testCallServiceParallel) { } } +TEST(SmokeTest, receiveMessagesOfMultipleTransientLocalPublishers) { + const std::string topicName = "/latched"; + auto node = rclcpp::Node::make_shared("node"); + rclcpp::QoS qos = rclcpp::QoS(rclcpp::KeepLast(1)); + qos.transient_local(); + qos.reliable(); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + auto spinnerThread = std::thread([&executor]() { + executor.spin(); + }); + + constexpr size_t nPubs = 15; + std::vector::SharedPtr> pubs; + for (size_t i = 0; i < nPubs; ++i) { + auto pub = pubs.emplace_back(node->create_publisher(topicName, qos)); + std_msgs::msg::String msg; + msg.data = "Hello"; + pub->publish(msg); + } + + // Set up a client and subscribe to the channel. + auto client = std::make_shared>(); + auto channelFuture = foxglove::waitForChannel(client, topicName); + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND)); + ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND)); + const foxglove::Channel channel = channelFuture.get(); + const foxglove::SubscriptionId subscriptionId = 1; + + // Set up binary message handler to resolve the promise when all nPub message have been received + std::promise promise; + size_t nReceivedMessages = 0; + client->setBinaryMessageHandler([&promise, &nReceivedMessages](const uint8_t*, size_t) { + if (++nReceivedMessages == nPubs) { + promise.set_value(); + } + }); + + // Subscribe to the channel and confirm that the promise resolves + client->subscribe({{subscriptionId, channel.id}}); + ASSERT_EQ(std::future_status::ready, promise.get_future().wait_for(ONE_SECOND)); + + executor.cancel(); + spinnerThread.join(); +} + // Run all the tests that were declared with TEST() int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv);