From d4e607f6b543604fa770584c8101c1b69cd176a1 Mon Sep 17 00:00:00 2001 From: Mike Purvis Date: Fri, 10 Feb 2017 11:06:04 -0500 Subject: [PATCH] Implemented rate-control-topic and rate-control-max-delay. (#947) --- tools/rosbag/include/rosbag/player.h | 10 +++ tools/rosbag/src/play.cpp | 13 +++- tools/rosbag/src/player.cpp | 112 ++++++++++++++++++++++++++- 3 files changed, 130 insertions(+), 5 deletions(-) diff --git a/tools/rosbag/include/rosbag/player.h b/tools/rosbag/include/rosbag/player.h index 0dd249f15d..09dd897d74 100644 --- a/tools/rosbag/include/rosbag/player.h +++ b/tools/rosbag/include/rosbag/player.h @@ -51,6 +51,8 @@ #include "rosbag/bag.h" +#include + #include "rosbag/time_translator.h" #include "rosbag/macros.h" @@ -90,6 +92,8 @@ struct ROSBAG_DECL PlayerOptions float duration; bool keep_alive; bool wait_for_subscribers; + std::string rate_control_topic; + float rate_control_max_delay; ros::Duration skip_empty; std::vector bags; @@ -173,6 +177,8 @@ class ROSBAG_DECL Player void setupTerminal(); void restoreTerminal(); + void updateRateTopicTime(const ros::MessageEvent& msg_event); + void doPublish(rosbag::MessageInstance const& m); void doKeepAlive(); @@ -195,6 +201,7 @@ class ROSBAG_DECL Player ros::ServiceServer pause_service_; bool paused_; + bool delayed_; bool pause_for_topics_; @@ -202,6 +209,9 @@ class ROSBAG_DECL Player bool requested_pause_state_; + ros::Subscriber rate_control_sub_; + ros::Time last_rate_control_; + ros::WallTime paused_time_; std::vector > bags_; diff --git a/tools/rosbag/src/play.cpp b/tools/rosbag/src/play.cpp index 77ec8183dc..ed5ddb307f 100644 --- a/tools/rosbag/src/play.cpp +++ b/tools/rosbag/src/play.cpp @@ -62,8 +62,11 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) { ("topics", po::value< std::vector >()->multitoken(), "topics to play back") ("pause-topics", po::value< std::vector >()->multitoken(), "topics to pause playback on") ("bags", po::value< std::vector >(), "bag files to play back from") - ("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing"); - + ("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing") + ("rate-control-topic", po::value(), "watch the given topic, and if the last publish was more than ago, wait until the topic publishes again to continue playback") + ("rate-control-max-delay", po::value()->default_value(1.0f), "maximum time difference from before pausing") + ; + po::positional_options_description p; p.add("bags", -1); @@ -140,6 +143,12 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) { opts.pause_topics.push_back(*i); } + if (vm.count("rate-control-topic")) + opts.rate_control_topic = vm["rate-control-topic"].as(); + + if (vm.count("rate-control-max-delay")) + opts.rate_control_max_delay = vm["rate-control-max-delay"].as(); + if (vm.count("bags")) { std::vector bags = vm["bags"].as< std::vector >(); diff --git a/tools/rosbag/src/player.cpp b/tools/rosbag/src/player.cpp index c7e191da3d..32570bdef1 100644 --- a/tools/rosbag/src/player.cpp +++ b/tools/rosbag/src/player.cpp @@ -88,6 +88,8 @@ PlayerOptions::PlayerOptions() : duration(0.0f), keep_alive(false), wait_for_subscribers(false), + rate_control_topic(""), + rate_control_max_delay(1.0f), skip_empty(ros::DURATION_MAX) { } @@ -208,6 +210,26 @@ void Player::publish() { } } + if (options_.rate_control_topic != "") + { + std::cout << "Creating rate control topic subscriber..." << std::flush; + + boost::shared_ptr sub(boost::make_shared()); + ros::SubscribeOptions ops; + ops.topic = options_.rate_control_topic; + ops.queue_size = 10; + ops.md5sum = ros::message_traits::md5sum(); + ops.datatype = ros::message_traits::datatype(); + ops.helper = boost::make_shared &> >( + boost::bind(&Player::updateRateTopicTime, this, _1)); + + rate_control_sub_ = node_handle_.subscribe(ops); + + std::cout << " done." << std::endl; + } + + std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush; options_.advertise_sleep.sleep(); std::cout << " done." << std::endl; @@ -230,6 +252,9 @@ void Player::publish() { time_translator_.setRealStartTime(start_time_); bag_length_ = view.getEndTime() - view.getBeginTime(); + // Set the last rate control to now, so the program doesn't start delayed. + last_rate_control_ = start_time_; + time_publisher_.setTime(start_time_); ros::WallTime now_wt = ros::WallTime::now(); @@ -269,6 +294,42 @@ void Player::publish() { ros::shutdown(); } +void Player::updateRateTopicTime(const ros::MessageEvent& msg_event) +{ + boost::shared_ptr const &ssmsg = msg_event.getConstMessage(); + std::string def = ssmsg->getMessageDefinition(); + size_t length = ros::serialization::serializationLength(*ssmsg); + + // Check the message definition. + std::istringstream f(def); + std::string s; + bool flag = false; + while(std::getline(f, s, '\n')) { + if (s.find("#") != 0) { + // Does not start with #, is not a comment. + if(s == "Header header") { + flag = true; + } + } + } + // If the header is not the first element in the message according to the definition, throw an error. + if (!flag) { + std::cout << std::endl << "WARNING: Rate control topic is bad, header is not first. MSG may be malformed." << std::endl; + return; + } + + std::vector buffer(length); + ros::serialization::OStream ostream(&buffer[0], length); + ros::serialization::Serializer::write(ostream, *ssmsg); + + // Assuming that the header is the first several bytes of the message. + //uint32_t header_sequence_id = buffer[0] | (uint32_t)buffer[1] << 8 | (uint32_t)buffer[2] << 16 | (uint32_t)buffer[3] << 24; + int32_t header_timestamp_sec = buffer[4] | (uint32_t)buffer[5] << 8 | (uint32_t)buffer[6] << 16 | (uint32_t)buffer[7] << 24; + int32_t header_timestamp_nsec = buffer[8] | (uint32_t)buffer[9] << 8 | (uint32_t)buffer[10] << 16 | (uint32_t)buffer[11] << 24; + + last_rate_control_ = ros::Time(header_timestamp_sec, header_timestamp_nsec); +} + void Player::printTime() { if (!options_.quiet) { @@ -276,13 +337,19 @@ void Player::printTime() ros::Time current_time = time_publisher_.getTime(); ros::Duration d = current_time - start_time_; + if (paused_) { - printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); + printf("\r [PAUSED ] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); + } + else if (delayed_) + { + ros::Duration time_since_rate = std::max(ros::Time::now() - last_rate_control_, ros::Duration(0)); + printf("\r [DELAYED] Bag Time: %13.6f Duration: %.6f / %.6f Delay: %.2f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec(), time_since_rate.toSec()); } else { - printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); + printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec()); } fflush(stdout); } @@ -317,6 +384,7 @@ void Player::processPause(const bool paused, ros::WallTime &horizon) } else { + // Make sure time doesn't shift after leaving pause. ros::WallDuration shift = ros::WallTime::now() - paused_time_; paused_time_ = ros::WallTime::now(); @@ -357,6 +425,9 @@ void Player::doPublish(MessageInstance const& m) { map::iterator pub_iter = publishers_.find(callerid_topic); ROS_ASSERT(pub_iter != publishers_.end()); + // Update subscribers. + ros::spinOnce(); + // If immediate specified, play immediately if (options_.at_once) { time_publisher_.stepClock(); @@ -393,7 +464,16 @@ void Player::doPublish(MessageInstance const& m) { } } - while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok()) + // Check if the rate control topic has posted recently enough to continue, or if a delay is needed. + // Delayed is separated from paused to allow more verbose printing. + if (rate_control_sub_ != NULL) { + if ((time_publisher_.getTime() - last_rate_control_).toSec() > options_.rate_control_max_delay) { + delayed_ = true; + paused_time_ = ros::WallTime::now(); + } + } + + while ((paused_ || delayed_ || !time_publisher_.horizonReached()) && node_handle_.ok()) { bool charsleftorpaused = true; while (charsleftorpaused && node_handle_.ok()) @@ -436,6 +516,25 @@ void Player::doPublish(MessageInstance const& m) { { printTime(); time_publisher_.runStalledClock(ros::WallDuration(.1)); + ros::spinOnce(); + } + else if (delayed_) + { + printTime(); + time_publisher_.runStalledClock(ros::WallDuration(.1)); + ros::spinOnce(); + // You need to check the rate here too. + if(rate_control_sub_ == NULL || (time_publisher_.getTime() - last_rate_control_).toSec() <= options_.rate_control_max_delay) { + delayed_ = false; + // Make sure time doesn't shift after leaving delay. + ros::WallDuration shift = ros::WallTime::now() - paused_time_; + paused_time_ = ros::WallTime::now(); + + time_translator_.shift(ros::Duration(shift.sec, shift.nsec)); + + horizon += shift; + time_publisher_.setWCHorizon(horizon); + } } else charsleftorpaused = false; @@ -444,6 +543,7 @@ void Player::doPublish(MessageInstance const& m) { printTime(); time_publisher_.runClock(ros::WallDuration(.1)); + ros::spinOnce(); } pub_iter->second.publish(m); @@ -464,6 +564,9 @@ void Player::doKeepAlive() { return; } + // If we're done and just staying alive, don't watch the rate control topic. We aren't publishing anyway. + delayed_ = false; + while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok()) { bool charsleftorpaused = true; @@ -477,6 +580,7 @@ void Player::doKeepAlive() { } else { + // Make sure time doesn't shift after leaving pause. ros::WallDuration shift = ros::WallTime::now() - paused_time_; paused_time_ = ros::WallTime::now(); @@ -491,6 +595,7 @@ void Player::doKeepAlive() { { printTime(); time_publisher_.runStalledClock(ros::WallDuration(.1)); + ros::spinOnce(); } else charsleftorpaused = false; @@ -499,6 +604,7 @@ void Player::doKeepAlive() { printTime(); time_publisher_.runClock(ros::WallDuration(.1)); + ros::spinOnce(); } }