Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Generate callbacks after updating message_ (ros#274)
Browse files Browse the repository at this point in the history
* Generate callbacks after updating message_

Signed-off-by: ymd-stella <world.applepie@gmail.com>

* Add test for multithread TransformListener to test_message_filter

Signed-off-by: ymd-stella <world.applepie@gmail.com>
  • Loading branch information
ymd-stella authored Jul 14, 2020
1 parent e40c27d commit 7b0187f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 51 deletions.
41 changes: 41 additions & 0 deletions test_tf2/test/test_message_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,47 @@ TEST(MessageFilter, postTransforms)

EXPECT_EQ(1, n.count_);
}

TEST(MessageFilter, concurrentTransforms)
{
const int messages = 30;
const int buffer_size = messages;
auto node = rclcpp::Node::make_shared("tf2_ros_message_filter");
auto create_timer_interface = std::make_shared<tf2_ros::CreateTimerROS>(
node->get_node_base_interface(),
node->get_node_timers_interface());

rclcpp::Clock::SharedPtr clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);

builtin_interfaces::msg::Time stamp = tf2_ros::toMsg(tf2::timeFromSec(1));

std::shared_ptr<geometry_msgs::msg::PointStamped> msg = std::make_shared<geometry_msgs::msg::PointStamped>();
msg->header.stamp = stamp;
msg->header.frame_id = "frame2";

tf2_ros::Buffer buffer(clock);
for (int i = 0; i < 50; i++) {
buffer.setCreateTimerInterface(create_timer_interface);
tf2_ros::MessageFilter<geometry_msgs::msg::PointStamped> filter(buffer, "frame1", buffer_size, node);
Notification n(1);
filter.registerCallback(std::bind(&Notification::notify, &n, std::placeholders::_1));

std::thread t([&](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
buffer.setTransform(createTransform(tf2::Quaternion(0,0,0,1), tf2::Vector3(1,2,3), stamp, "frame1", "frame2"), "me");
});
for (int j = 0; j < messages; j++) {
filter.add(msg);
}
t.join();
std::this_thread::sleep_for(std::chrono::milliseconds(10));

EXPECT_EQ(messages, n.count_);

buffer.clear();
}
}

// TODO (ahcorde): For some unknown reason message_filters::Connection registerFailureCallback is disable
// with #if 0 https://github.com/ros2/geometry2/blob/ros2/tf2_ros/include/tf2_ros/message_filter.h#L463
// rework this part when this is available
Expand Down
69 changes: 18 additions & 51 deletions tf2_ros/include/tf2_ros/message_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
return;
}

std::vector<std::tuple<uint64_t, tf2::TimePoint, std::string>> wait_params;
// iterate through the target frames and add requests for each of them
MessageInfo info;
info.handles.reserve(expected_success_count_);
Expand All @@ -353,63 +354,17 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
V_string::iterator end = target_frames_copy.end();
for (; it != end; ++it) {
const std::string & target_frame = *it;
auto future = buffer_.waitForTransform(
target_frame,
frame_id,
tf2::timeFromSec(stamp.seconds()),
buffer_timeout_,
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, next_handle_index_));

try {
const auto status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
future.get();
// Transform is available
++info.success_count;
}
else {
info.handles.push_back(next_handle_index_++);
}
}
catch (const std::exception & e) {
TF2_ROS_MESSAGEFILTER_WARN("Message dropped because: %s", e.what());
messageDropped(evt, filter_failure_reasons::OutTheBack);
return;
}
wait_params.emplace_back(next_handle_index_, tf2::timeFromSec(stamp.seconds()), target_frame);
info.handles.push_back(next_handle_index_++);

if (time_tolerance_.nanoseconds()) {
future = buffer_.waitForTransform(
target_frame,
frame_id,
tf2::timeFromSec((stamp + time_tolerance_).seconds()),
buffer_timeout_,
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, next_handle_index_));
try {
const auto status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
future.get();
// Transform is available
++info.success_count;
}
else {
info.handles.push_back(next_handle_index_++);
}
}
catch (const std::exception & e) {
TF2_ROS_MESSAGEFILTER_WARN("Message dropped because: %s", e.what());
messageDropped(evt, filter_failure_reasons::OutTheBack);
return;
}
wait_params.emplace_back(next_handle_index_, tf2::timeFromSec((stamp + time_tolerance_).seconds()), target_frame);
info.handles.push_back(next_handle_index_++);
}
}
}


// We can transform already
if (info.success_count == expected_success_count_) {
messageReady(evt);
} else {

{
// Keep a lock on the messages
std::unique_lock<std::mutex> unique_lock(messages_mutex_);

Expand Down Expand Up @@ -439,6 +394,18 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d",
frame_id.c_str(), stamp.seconds(), message_count_);
++incoming_message_count_;

for (const auto& param : wait_params) {
const auto& handle = std::get<0>(param);
const auto& stamp = std::get<1>(param);
const auto& target_frame = std::get<2>(param);
buffer_.waitForTransform(
target_frame,
frame_id,
stamp,
buffer_timeout_,
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, handle));
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions tf2_ros/src/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ Buffer::waitForTransform(const std::string& target_frame, const std::string& sou
// Immediately transformable
geometry_msgs::msg::TransformStamped msg_stamped = lookupTransform(target_frame, source_frame, time);
promise->set_value(msg_stamped);
callback(future);
} else if (0xffffffffffffffffULL == handle) {
// Never transformable
promise->set_exception(std::make_exception_ptr(tf2::LookupException(
"Failed to transform from " + source_frame + " to " + target_frame)));
callback(future);
} else {
std::lock_guard<std::mutex> lock(timer_to_request_map_mutex_);
auto timer_handle = timer_interface_->createTimer(
Expand Down

0 comments on commit 7b0187f

Please sign in to comment.