diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index ce0d3c494a..ecdb142837 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -28,6 +28,7 @@ #include "rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/metadata_io.hpp" +#include "rosbag2_storage/ros_helper.hpp" #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_test_common/temporary_directory_fixture.hpp" @@ -241,7 +242,8 @@ class OrderTestWriter : public rosbag2_cpp::writers::SequentialWriter { public: // makes the method public for manual splitting - void split() { + void split() + { split_bagfile(); } }; @@ -252,20 +254,33 @@ class ReadOrderTest : public TemporaryDirectoryFixture public: ReadOrderTest() { - fake_data = std::make_shared(); + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + fake_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + storage_options.uri = (std::filesystem::path(temporary_dir_path_) / "ordertest").string(); storage_options.storage_id = "sqlite3"; writer.open(storage_options, rosbag2_cpp::ConverterOptions{}); + writer.create_topic( + { + topic_name, + "test_msgs/ByteMultiArray", + "cdr", + "" + }); + for (const auto & file_messages : message_timestamps_by_file) { for (const auto time_stamp : file_messages) { + message_timestamps.push_back(time_stamp); auto msg = std::make_shared(); msg->serialized_data = fake_data; msg->time_stamp = time_stamp; msg->topic_name = topic_name; writer.write(msg); } - writer.split(); + // writer.split(); } writer.close(); } @@ -274,6 +289,7 @@ class ReadOrderTest : public TemporaryDirectoryFixture {100, 300, 200}, {500, 400, 600} }; + std::vector message_timestamps; const std::string topic_name = "testtopic"; OrderTestWriter writer; rosbag2_cpp::readers::SequentialReader reader; @@ -283,18 +299,12 @@ class ReadOrderTest : public TemporaryDirectoryFixture TEST_F(ReadOrderTest, default_read_order) { reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); - auto next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 100); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 200); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 300); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 400); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 500); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 600); + std::sort(message_timestamps.begin(), message_timestamps.end()); + for (const auto time_stamp : message_timestamps) { + ASSERT_TRUE(reader.has_next()); + auto next = reader.read_next(); + ASSERT_EQ(next->time_stamp, time_stamp); + } ASSERT_FALSE(reader.has_next()); } @@ -307,17 +317,27 @@ TEST_F(ReadOrderTest, reverse_received_timestamp_order) { auto metadata = reader.get_metadata(); reader.seek((metadata.starting_time + metadata.duration).time_since_epoch().count()); - auto next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 600); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 500); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 400); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 300); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 200); - next = reader.read_next(); - ASSERT_EQ(next->time_stamp, 100); + std::sort( + message_timestamps.begin(), message_timestamps.end(), [](auto a, auto b) {return a > b;}); + for (const auto time_stamp : message_timestamps) { + ASSERT_TRUE(reader.has_next()); + auto next = reader.read_next(); + ASSERT_EQ(next->time_stamp, time_stamp); + } + ASSERT_FALSE(reader.has_next()); +} + +TEST_F(ReadOrderTest, file_order) { + rosbag2_storage::ReadOrder order; + order.sort_by = rosbag2_storage::ReadOrder::File; + reader.set_read_order(order); + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + + // Don't sort timestamps, should be returned in write order + for (const auto time_stamp : message_timestamps) { + ASSERT_TRUE(reader.has_next()); + auto next = reader.read_next(); + ASSERT_EQ(next->time_stamp, time_stamp); + } ASSERT_FALSE(reader.has_next()); } diff --git a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp index d16c47c0c4..8c4d556fc9 100644 --- a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp +++ b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp @@ -106,6 +106,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage void write_locked(std::shared_ptr message) RCPPUTILS_TSA_REQUIRES(database_write_mutex_); + int get_last_rowid(); + using ReadQueryResult = SqliteStatementWrapper::QueryResult< std::shared_ptr, rcutils_time_point_value_t, std::string, int>; diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index 28146bc53a..6638e437a3 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -294,11 +294,15 @@ void SqliteStorage::set_read_order(rosbag2_storage::ReadOrder read_order) if (read_order == read_order_) { return; } + read_order_ = read_order; + read_statement_ = nullptr; + if (read_order_.sort_by == rosbag2_storage::ReadOrder::PublishedTimestamp) { throw std::runtime_error("Not Implemented - PublishedTimestamp read order."); } - read_order_ = read_order; - read_statement_ = nullptr; + if (read_order.reverse && seek_row_id_ == 0) { + seek_row_id_ = get_last_rowid(); + } } bool SqliteStorage::has_next() @@ -403,6 +407,7 @@ void SqliteStorage::prepare_for_reading() { std::string statement_str = "SELECT data, timestamp, topics.name, messages.id " "FROM messages JOIN topics ON messages.topic_id = topics.id WHERE "; + std::vector where_conditions; // add topic filter if (!storage_filter_.topics.empty()) { @@ -414,38 +419,50 @@ void SqliteStorage::prepare_for_reading() topic_list += ","; } } - statement_str += "(topics.name IN (" + topic_list + ")) AND "; + where_conditions.push_back("(topics.name IN (" + topic_list + "))"); } // add topic filter based on regular expression if (!storage_filter_.topics_regex.empty()) { // Construct string for selected topics - statement_str += "(topics.name REGEXP '" + storage_filter_.topics_regex + "')"; - statement_str += " AND "; + where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')"); } // exclude topics based on regular expressions if (!storage_filter_.topics_regex_to_exclude.empty()) { // Construct string for selected topics - statement_str += " (topics.name NOT IN "; - statement_str += " (SELECT topics.name FROM topics WHERE topics.name REGEXP '"; - statement_str += storage_filter_.topics_regex_to_exclude + "')"; - statement_str += " ) AND "; + where_conditions.push_back( + "(topics.name NOT IN " + "(SELECT topics.name FROM topics WHERE topics.name REGEXP '" + + storage_filter_.topics_regex_to_exclude + "'))"); } + + const std::string direction_op = read_order_.reverse ? "<" : ">"; + const std::string order_direction = read_order_.reverse ? "DESC" : "ASC"; + // add seek head filter - std::string message_id_filter = "(messages.id >= " + std::to_string(seek_row_id_) + ")"; - std::string timestamp_equal_filter = "(timestamp = " + std::to_string(seek_time_) + ")"; - std::string timestamp_greater_filter = "(timestamp > " + std::to_string(seek_time_) + ")"; if (read_order_.sort_by == rosbag2_storage::ReadOrder::File) { - statement_str += message_id_filter; + where_conditions.push_back( + "(messages.id " + direction_op + "= " + std::to_string(seek_row_id_) + ") "); } else { - statement_str += "((" + message_id_filter + " AND " + timestamp_equal_filter + ") " - "OR " + timestamp_greater_filter + ") "; + where_conditions.push_back( + "(((timestamp = " + std::to_string(seek_time_) + ") " + "AND (messages.id " + direction_op + "= " + std::to_string(seek_row_id_) + ")) " + "OR (timestamp " + direction_op + " " + std::to_string(seek_time_) + ")) "); + } + + for ( + std::vector::const_iterator it = where_conditions.begin(); + it != where_conditions.end(); ++it) + { + statement_str += *it; + if (it != where_conditions.end() - 1) { + statement_str += " AND "; + } } // add order by time then id - std::string order_direction = read_order_.reverse ? "DESC" : "ASC"; switch (read_order_.sort_by) { case rosbag2_storage::ReadOrder::ReceivedTimestamp: - statement_str += "ORDER BY messages.timestamp " + order_direction + + statement_str += "ORDER BY messages.timestamp " + order_direction; statement_str += ", messages.id " + order_direction; break; case rosbag2_storage::ReadOrder::File: @@ -578,9 +595,9 @@ void SqliteStorage::reset_filter() void SqliteStorage::seek(const rcutils_time_point_value_t & timestamp) { - // reset row id to 0 and set start time to input + // reset row id and set start time to input // keep topic filter and reset read statement for re-read - seek_row_id_ = 0; + seek_row_id_ = read_order_.reverse ? get_last_rowid() : 0; seek_time_ = timestamp; read_statement_ = nullptr; } @@ -598,6 +615,13 @@ SqliteWrapper & SqliteStorage::get_sqlite_database_wrapper() return *database_; } +int SqliteStorage::get_last_rowid() +{ + auto statement = database_->prepare_statement("SELECT max(rowid) from messages;"); + auto query_results = statement->execute_query(); + return std::get<0>(*query_results.begin()); +} + } // namespace rosbag2_storage_plugins #include "pluginlib/class_list_macros.hpp" // NOLINT