diff --git a/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp b/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp index 413f69706c..fdad4d2ed3 100644 --- a/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp +++ b/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp @@ -35,6 +35,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag)); MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &)); MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &)); + MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &)); MOCK_METHOD0(has_next, bool()); MOCK_METHOD0(read_next, std::shared_ptr()); MOCK_METHOD1(write, void(std::shared_ptr)); diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index 8ddddf2239..af1f5998b3 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -170,7 +170,7 @@ if(BUILD_TESTING) endif() ament_add_gmock(test_sequential_reader - test/rosbag2_cpp/test_sequential_reader.cpp) + test/rosbag2_cpp/test_sequential_reader.cpp test/rosbag2_cpp/fake_data.cpp) if(TARGET test_sequential_reader) ament_target_dependencies(test_sequential_reader rosbag2_storage rosbag2_test_common test_msgs) target_link_libraries(test_sequential_reader ${PROJECT_NAME}) @@ -221,7 +221,7 @@ if(BUILD_TESTING) endif() ament_add_gmock(test_sequential_writer - test/rosbag2_cpp/test_sequential_writer.cpp) + test/rosbag2_cpp/test_sequential_writer.cpp test/rosbag2_cpp/fake_data.cpp) if(TARGET test_sequential_writer) ament_target_dependencies(test_sequential_writer rosbag2_storage rosbag2_test_common test_msgs) target_link_libraries(test_sequential_writer ${PROJECT_NAME}) diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp index aaf9416124..3fc289be4b 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp @@ -52,7 +52,7 @@ class BaseReaderInterface; /** * The Reader allows opening and reading messages of a bag. */ -class ROSBAG2_CPP_PUBLIC Reader final +class ROSBAG2_CPP_PUBLIC Reader { public: explicit Reader( @@ -99,6 +99,16 @@ class ROSBAG2_CPP_PUBLIC Reader final */ void close(); + /** + * Set the read order for continued iteration of messages, without changing the current + * read head timestamp. + * + * \param read_order Sorting criterion and direction to read messages in + * \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file() + * or load_next_file() will cause undefined behavior + */ + void set_read_order(const rosbag2_storage::ReadOrder & read_order); + /** * Ask whether the underlying bagfile contains at least one more message. * diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp index 5497edd274..5d6c4ec99c 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp @@ -26,6 +26,7 @@ #include "rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_storage/storage_filter.hpp" +#include "rosbag2_storage/storage_interfaces/base_read_interface.hpp" #include "rosbag2_storage/storage_options.hpp" #include "rosbag2_storage/topic_metadata.hpp" @@ -45,6 +46,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface virtual void close() = 0; + virtual void set_read_order(const rosbag2_storage::ReadOrder &) = 0; + virtual bool has_next() = 0; virtual std::shared_ptr read_next() = 0; diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index f056c79021..c0d8c217d7 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -66,6 +66,12 @@ class ROSBAG2_CPP_PUBLIC SequentialReader void close() override; + /** + * \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file() + * or load_next_file() will cause undefined behavior + */ + void set_read_order(const rosbag2_storage::ReadOrder & order) override; + bool has_next() override; std::shared_ptr read_next() override; @@ -141,7 +147,6 @@ class ROSBAG2_CPP_PUBLIC SequentialReader const std::vector & topics); /** - * Checks if the serialization format of the converter factory is the same as that of the storage * factory. * If not, changes the serialization format of the converter factory to use the serialization @@ -187,6 +192,7 @@ class ROSBAG2_CPP_PUBLIC SequentialReader std::shared_ptr converter_factory_{}; bag_events::EventCallbackManager callback_manager_; + rosbag2_storage::ReadOrder read_order_{}; }; } // namespace readers diff --git a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp index 64d30a9176..91523a96ad 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp @@ -57,6 +57,11 @@ void Reader::close() reader_impl_->close(); } +void Reader::set_read_order(const rosbag2_storage::ReadOrder & order) +{ + return reader_impl_->set_read_order(order); +} + bool Reader::has_next() { return reader_impl_->has_next(); diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 042966358c..1bac318483 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -106,6 +106,7 @@ void SequentialReader::open( if (!storage_) { throw std::runtime_error{"No storage could be initialized from the inputs."}; } + storage_->set_read_order(read_order_); metadata_ = storage_->get_metadata(); if (metadata_.relative_file_paths.empty()) { ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata."); @@ -128,6 +129,14 @@ void SequentialReader::open( topics[0].topic_metadata.serialization_format); } +void SequentialReader::set_read_order(const rosbag2_storage::ReadOrder & order) +{ + if (storage_) { + storage_->set_read_order(order); + } + read_order_ = order; +} + bool SequentialReader::has_next() { if (storage_) { @@ -194,7 +203,11 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) seek_time_ = timestamp; if (storage_) { // reset to the first file - current_file_iterator_ = file_paths_.begin(); + if (read_order_.reverse) { + current_file_iterator_ = std::prev(file_paths_.end()); + } else { + current_file_iterator_ = file_paths_.begin(); + } load_current_file(); return; } @@ -204,7 +217,11 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) bool SequentialReader::has_next_file() const { - return current_file_iterator_ + 1 != file_paths_.end(); + if (read_order_.reverse) { + return current_file_iterator_ != file_paths_.begin(); + } else { + return (current_file_iterator_ + 1) != file_paths_.end(); + } } void SequentialReader::load_current_file() @@ -222,6 +239,7 @@ void SequentialReader::load_current_file() throw std::runtime_error{"No storage could be initialized. Abort"}; } // set filters + storage_->set_read_order(read_order_); storage_->seek(seek_time_); set_filter(topics_filter_); } @@ -231,7 +249,11 @@ void SequentialReader::load_next_file() assert(current_file_iterator_ != file_paths_.end()); auto info = std::make_shared(); info->closed_file = get_current_file(); - current_file_iterator_++; + if (read_order_.reverse) { + current_file_iterator_--; + } else { + current_file_iterator_++; + } info->opened_file = get_current_file(); load_current_file(); callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info); diff --git a/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp new file mode 100644 index 0000000000..da5f3af8f4 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp @@ -0,0 +1,56 @@ +// Copyright 2022, Foxglove Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "fake_data.hpp" + +#include "rosbag2_storage/ros_helper.hpp" + +void write_sample_split_bag( + const rosbag2_storage::StorageOptions & storage_options, + const std::vector> & fake_messages, + size_t split_every) +{ + std::string topic_name = "testtopic"; + + ManualSplitSequentialWriter writer{}; + writer.open(storage_options, rosbag2_cpp::ConverterOptions{}); + writer.create_topic( + { + topic_name, + "test_msgs/ByteMultiArray", + "cdr", + "" + }); + for (size_t i = 0; i < fake_messages.size(); i++) { + if (i > 0 && (i % split_every == 0)) { + writer.split_bagfile(); + } + + const auto message = fake_messages[i]; + rcutils_time_point_value_t time_stamp = message.first; + uint32_t value = message.second; + + auto msg = std::make_shared(); + msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value)); + msg->time_stamp = time_stamp; + msg->topic_name = topic_name; + writer.write(msg); + } + writer.close(); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp b/rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp new file mode 100644 index 0000000000..9a01d822b1 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp @@ -0,0 +1,35 @@ +// Copyright 2022, Foxglove Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__FAKE_DATA_HPP_ +#define ROSBAG2_CPP__FAKE_DATA_HPP_ + +#include +#include + +#include "rosbag2_cpp/writers/sequential_writer.hpp" + +class ManualSplitSequentialWriter : public rosbag2_cpp::writers::SequentialWriter +{ +public: + using rosbag2_cpp::writers::SequentialWriter::split_bagfile; +}; + +// Write vector of pairs to bag files, splitting every N messages +void write_sample_split_bag( + const rosbag2_storage::StorageOptions & storage_options, + const std::vector> & fake_messages, + size_t split_every); + +#endif // ROSBAG2_CPP__FAKE_DATA_HPP_ diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp index 3005cef366..27ec5fbf3f 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp @@ -36,6 +36,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag)); MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &)); MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &)); + MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &)); MOCK_METHOD0(has_next, bool()); MOCK_METHOD0(has_next_file, bool()); MOCK_METHOD0(read_next, std::shared_ptr()); diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index 0bf6a12886..e3cafff33d 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -27,12 +27,14 @@ #include "rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/metadata_io.hpp" +#include "rosbag2_storage/ros_helper.hpp" #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_test_common/temporary_directory_fixture.hpp" #include "test_msgs/msg/basic_types.hpp" +#include "fake_data.hpp" #include "mock_converter.hpp" #include "mock_converter_factory.hpp" #include "mock_metadata_io.hpp" @@ -234,3 +236,142 @@ TEST_F(TemporaryDirectoryFixture, reader_accepts_bare_file) { EXPECT_TRUE(reader.has_next()); EXPECT_THAT(reader.get_metadata().topics_with_message_count, SizeIs(1)); } + + +class ReadOrderTest : public TemporaryDirectoryFixture +{ +public: + ReadOrderTest() + { + storage_options.uri = (rcpputils::fs::path(temporary_dir_path_) / "ordertest").string(); + storage_options.storage_id = "sqlite3"; + write_sample_split_bag(storage_options, fake_messages, split_every); + } + + void sort_expected(rosbag2_storage::ReadOrder order) + { + sorted_messages.clear(); + for (const auto & message : fake_messages) { + sorted_messages.push_back(message); + } + + switch (order.sort_by) { + case rosbag2_storage::ReadOrder::ReceivedTimestamp: { + if (order.reverse) { + std::sort( + sorted_messages.begin(), sorted_messages.end(), [](auto a, auto b) { + return a.first > b.first || (a.first == b.first && a.second > b.second); + }); + } else { + std::sort( + sorted_messages.begin(), sorted_messages.end(), [](auto a, auto b) { + return a.first < b.first || (a.first == b.first && a.second < b.second); + }); + } + } break; + case rosbag2_storage::ReadOrder::File: { + if (order.reverse) { + std::reverse(sorted_messages.begin(), sorted_messages.end()); + } else { + // Already in forward file order + } + } break; + case rosbag2_storage::ReadOrder::PublishedTimestamp: + throw std::runtime_error("PublishedTimestamp not implemented."); + break; + } + } + + void check_against_sorted(bool do_reset) + { + // If do_reset - try to reset the storage internal iterator every time, to test its ability + // to track order when the query changes. + // If not, do a single chain of uninterrupted read_next, which likely uses the same iterator + for (const auto & expect_message : sorted_messages) { + auto expect_timestamp = expect_message.first; + uint32_t expect_value = expect_message.second; + + // Check both timestamp and value to uniquely identify messages in expected order + ASSERT_TRUE(reader.has_next()); + auto next = reader.read_next(); + EXPECT_EQ(next->time_stamp, expect_timestamp); + + ASSERT_EQ(next->serialized_data->buffer_length, 4u); + uint32_t value = *reinterpret_cast(next->serialized_data->buffer); + EXPECT_EQ(value, expect_value); + + if (do_reset) { + reader.reset_filter(); + } + } + ASSERT_FALSE(reader.has_next()); + } + + const std::vector> fake_messages { + {100, 1}, + {100, 2}, + {300, 3}, + {200, 4}, + {300, 5}, + {500, 6}, + {400, 7}, + {600, 8} + }; + const size_t split_every = 5; + std::vector> sorted_messages; + + rosbag2_cpp::readers::SequentialReader reader{}; + rosbag2_storage::StorageOptions storage_options{}; +}; + +TEST_F(ReadOrderTest, received_timestamp_order) { + rosbag2_storage::ReadOrder order(rosbag2_storage::ReadOrder::ReceivedTimestamp, false); + sort_expected(order); + reader.set_read_order(order); + + for (bool do_reset : {false, true}) { + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + check_against_sorted(do_reset); + reader.close(); + } +} + +TEST_F(ReadOrderTest, reverse_received_timestamp_order) { + rosbag2_storage::ReadOrder order(rosbag2_storage::ReadOrder::ReceivedTimestamp, true); + sort_expected(order); + reader.set_read_order(order); + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + auto metadata = reader.get_metadata(); + // Seek to end before reading reverse messages + auto end_timestamp = (metadata.starting_time + metadata.duration).time_since_epoch().count(); + reader.close(); + + for (bool do_reset : {false, true}) { + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + reader.seek(end_timestamp); + check_against_sorted(do_reset); + reader.close(); + } +} + +TEST_F(ReadOrderTest, file_order) { + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + EXPECT_THROW( + reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, false)), + std::runtime_error); +} + +TEST_F(ReadOrderTest, reverse_file_order) { + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + EXPECT_THROW( + reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, true)), + std::runtime_error); +} + +TEST_F(ReadOrderTest, published_timestamp_order) { + reader.open(storage_options, rosbag2_cpp::ConverterOptions{}); + EXPECT_THROW( + reader.set_read_order( + rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::PublishedTimestamp, false)), + std::runtime_error); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 8f94c8cac6..9c247d9332 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -14,7 +14,6 @@ #include -#include #include #include #include @@ -32,6 +31,7 @@ #include "rosbag2_test_common/temporary_directory_fixture.hpp" +#include "fake_data.hpp" #include "mock_converter.hpp" #include "mock_converter_factory.hpp" #include "mock_metadata_io.hpp" @@ -568,63 +568,22 @@ TEST_F(SequentialWriterTest, split_event_calls_callback) } -class ManualSplitWriter : public rosbag2_cpp::writers::SequentialWriter -{ -public: - // makes the method public for manual splitting - void split() - { - split_bagfile(); - } -}; - -void write_sample_split_bag( - const std::string & uri, - const std::vector> & message_timestamps_by_file) -{ - std::string msg_content = "Hello"; - auto msg_length = msg_content.length(); - std::shared_ptr fake_data = rosbag2_storage::make_serialized_message( - msg_content.c_str(), msg_length); - std::string topic_name = "testtopic"; - - rosbag2_storage::StorageOptions storage_options; - storage_options.uri = uri; - storage_options.storage_id = "sqlite3"; - - ManualSplitWriter writer; - writer.open(storage_options, rosbag2_cpp::ConverterOptions{}); - writer.create_topic( - { - topic_name, - "test_msgs/ByteMultiArray", - "cdr", - "" - }); - for (const auto & file_messages : message_timestamps_by_file) { - for (const auto time_stamp : file_messages) { - auto msg = std::make_shared(); - msg->serialized_data = fake_data; - msg->time_stamp = time_stamp; - msg->topic_name = topic_name; - writer.write(msg); - } - writer.split(); - } - writer.close(); -} - - TEST_F(TemporaryDirectoryFixture, split_bag_metadata_has_full_duration) { - const std::vector> message_timestamps_by_file { - {100, 300, 200}, - {500, 400, 600} + const std::vector> fake_messages { + {100, 1}, + {300, 2}, + {200, 3}, + {500, 4}, + {400, 5}, + {600, 6} }; - std::string uri = (rcpputils::fs::path(temporary_dir_path_) / "split_duration_bag").string(); - write_sample_split_bag(uri, message_timestamps_by_file); + rosbag2_storage::StorageOptions storage_options; + storage_options.uri = (rcpputils::fs::path(temporary_dir_path_) / "split_duration_bag").string(); + storage_options.storage_id = "sqlite3"; + write_sample_split_bag(storage_options, fake_messages, 3); rosbag2_storage::MetadataIo metadata_io; - auto metadata = metadata_io.read_metadata(uri); + auto metadata = metadata_io.read_metadata(storage_options.uri); ASSERT_EQ( metadata.starting_time, std::chrono::high_resolution_clock::time_point(std::chrono::nanoseconds(100))); diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index 193944fef8..1dbc3bbf92 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -26,6 +26,8 @@ from rosbag2_py._storage import ( ConverterOptions, FileInformation, + ReadOrder, + ReadOrderSortBy, StorageFilter, StorageOptions, TopicMetadata, @@ -61,6 +63,8 @@ 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', + 'ReadOrder', + 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', diff --git a/rosbag2_py/src/rosbag2_py/_reader.cpp b/rosbag2_py/src/rosbag2_py/_reader.cpp index a70a43f45c..49cd2674a4 100644 --- a/rosbag2_py/src/rosbag2_py/_reader.cpp +++ b/rosbag2_py/src/rosbag2_py/_reader.cpp @@ -35,66 +35,25 @@ namespace rosbag2_py { template -class Reader +class Reader : public rosbag2_cpp::Reader { public: Reader() - : reader_(std::make_unique(std::make_unique())) + : rosbag2_cpp::Reader(std::make_unique()) { } - void open( - rosbag2_storage::StorageOptions & storage_options, - rosbag2_cpp::ConverterOptions & converter_options = rosbag2_cpp::ConverterOptions()) - { - reader_->open(storage_options, converter_options); - } - - bool has_next() - { - return reader_->has_next(); - } - /// Return a tuple containing the topic name, the serialized ROS message, and /// the timestamp. pybind11::tuple read_next() { - const auto next = reader_->read_next(); + const auto next = rosbag2_cpp::Reader::read_next(); rcutils_uint8_array_t rcutils_data = *next->serialized_data.get(); std::string serialized_data(rcutils_data.buffer, rcutils_data.buffer + rcutils_data.buffer_length); return pybind11::make_tuple( next->topic_name, pybind11::bytes(serialized_data), next->time_stamp); } - - /// Return a mapping from topic name to topic type. - std::vector get_all_topics_and_types() - { - return reader_->get_all_topics_and_types(); - } - - const rosbag2_storage::BagMetadata & get_metadata() const - { - return reader_->get_metadata(); - } - - void set_filter(const rosbag2_storage::StorageFilter & storage_filter) - { - return reader_->set_filter(storage_filter); - } - - void reset_filter() - { - reader_->reset_filter(); - } - - void seek(const rcutils_time_point_value_t & timestamp) - { - reader_->seek(timestamp); - } - -protected: - std::unique_ptr reader_; }; std::unordered_set get_registered_readers() @@ -121,7 +80,13 @@ PYBIND11_MODULE(_reader, m) { pybind11::class_(m, "SequentialReader") .def(pybind11::init()) - .def("open", &PyReader::open) + .def("open_uri", pybind11::overload_cast(&PyReader::open)) + .def( + "open", + pybind11::overload_cast< + const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions & + >(&PyReader::open)) + .def("set_read_order", &PyReader::set_read_order) .def("read_next", &PyReader::read_next) .def("has_next", &PyReader::has_next) .def("get_metadata", &PyReader::get_metadata) @@ -132,7 +97,13 @@ PYBIND11_MODULE(_reader, m) { pybind11::class_(m, "SequentialCompressionReader") .def(pybind11::init()) - .def("open", &PyCompressionReader::open) + .def("open_uri", pybind11::overload_cast(&PyCompressionReader::open)) + .def( + "open", + pybind11::overload_cast< + const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions & + >(&PyCompressionReader::open)) + .def("set_read_order", &PyCompressionReader::set_read_order) .def("read_next", &PyCompressionReader::read_next) .def("has_next", &PyCompressionReader::has_next) .def("get_metadata", &PyCompressionReader::get_metadata) diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index d9ef48282b..2baf8e60da 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -18,6 +18,7 @@ #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_storage/bag_metadata.hpp" #include "rosbag2_storage/storage_filter.hpp" +#include "rosbag2_storage/storage_interfaces/base_read_interface.hpp" #include "rosbag2_storage/storage_options.hpp" #include "rosbag2_storage/topic_metadata.hpp" @@ -270,4 +271,17 @@ PYBIND11_MODULE(_storage, m) { "__repr__", [](const rosbag2_storage::BagMetadata & metadata) { return format_bag_meta_data(metadata); }); + + pybind11::enum_(m, "ReadOrderSortBy") + .value("ReceivedTimestamp", rosbag2_storage::ReadOrder::ReceivedTimestamp) + .value("PublishedTimestamp", rosbag2_storage::ReadOrder::PublishedTimestamp) + .value("File", rosbag2_storage::ReadOrder::File); + + pybind11::class_(m, "ReadOrder") + .def( + pybind11::init(), + pybind11::arg("sort_by") = rosbag2_storage::ReadOrder{}.sort_by, + pybind11::arg("reverse") = rosbag2_storage::ReadOrder{}.reverse) + .def_readwrite("sort_by", &rosbag2_storage::ReadOrder::sort_by) + .def_readwrite("reverse", &rosbag2_storage::ReadOrder::reverse); } diff --git a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp index 3cb780dcb2..ac3a1206df 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp @@ -25,6 +25,31 @@ namespace rosbag2_storage { + +struct ReadOrder +{ + enum SortBy + { + ReceivedTimestamp, + PublishedTimestamp, // NOTE: not implemented in ROS 2 core + File + }; + + // Sorting criterion for reading out messages, ascending by default + SortBy sort_by = ReceivedTimestamp; + // If true, changes sort order to descending + bool reverse = false; + + ReadOrder(SortBy sort_by, bool reverse) + : sort_by(sort_by), reverse(reverse) {} + ReadOrder() {} +}; + +inline bool operator==(const ReadOrder & a, const ReadOrder & b) +{ + return a.sort_by == b.sort_by && a.reverse == b.reverse; +} + namespace storage_interfaces { @@ -33,6 +58,13 @@ class ROSBAG2_STORAGE_PUBLIC BaseReadInterface public: virtual ~BaseReadInterface() = default; + /// @brief Set the order to iterate messages in the storage. + /// This affects the outcome of has_next and read_next. + /// Note that when setting to reverse order, this will not change the read head, so user + /// must first seek() to the end in order to read messages from the end. + /// @param read_order The order in which to return messages. + virtual void set_read_order(const ReadOrder & read_order) = 0; + virtual bool has_next() = 0; virtual std::shared_ptr read_next() = 0; diff --git a/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp b/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp index 8d838b3749..3476d0328d 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp @@ -46,6 +46,11 @@ void TestPlugin::open( std::cout << "config file uri: " << storage_options.storage_config_uri << ".\n"; } +void TestPlugin::set_read_order(const rosbag2_storage::ReadOrder & order) +{ + std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl; +} + bool TestPlugin::has_next() { return true; diff --git a/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp b/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp index bd547876d1..9a9244cae4 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp +++ b/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp @@ -37,6 +37,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac void remove_topic(const rosbag2_storage::TopicMetadata & topic) override; + void set_read_order(const rosbag2_storage::ReadOrder &) override; + bool has_next() override; std::shared_ptr read_next() override; diff --git a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp index 9d7eb8fad1..43992ff22d 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp @@ -43,6 +43,11 @@ void TestReadOnlyPlugin::open( std::cout << "config file uri: " << storage_options.storage_config_uri << ".\n"; } +void TestReadOnlyPlugin::set_read_order(const rosbag2_storage::ReadOrder & order) +{ + std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl; +} + bool TestReadOnlyPlugin::has_next() { return true; diff --git a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp index 3a3dc34f8a..27e4308846 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp +++ b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp @@ -31,6 +31,8 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI const rosbag2_storage::StorageOptions & storage_options, rosbag2_storage::storage_interfaces::IOFlag flag) override; + void set_read_order(const rosbag2_storage::ReadOrder &) override; + bool has_next() override; std::shared_ptr read_next() override; diff --git a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp index f7cab1ae2f..044fea1da1 100644 --- a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp +++ b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp @@ -64,6 +64,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage const std::vector> & messages) override; + void set_read_order(const rosbag2_storage::ReadOrder &) override; + bool has_next() override; std::shared_ptr read_next() override; @@ -103,6 +105,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage void commit_transaction(); void write_locked(std::shared_ptr message) RCPPUTILS_TSA_REQUIRES(database_write_mutex_); + int get_last_rowid(); using ReadQueryResult = SqliteStatementWrapper::QueryResult< std::shared_ptr, rcutils_time_point_value_t, std::string, int>; @@ -120,6 +123,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage rcutils_time_point_value_t seek_time_ = 0; int seek_row_id_ = 0; + rosbag2_storage::ReadOrder read_order_{}; rosbag2_storage::StorageFilter storage_filter_ {}; // This mutex is necessary to protect: diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index 1bb48add85..807d9583b1 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -289,6 +289,19 @@ void SqliteStorage::write( commit_transaction(); } +void SqliteStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order) +{ + if (read_order.sort_by == rosbag2_storage::ReadOrder::PublishedTimestamp) { + throw std::runtime_error("Not Implemented - PublishedTimestamp read order."); + } + if (read_order.sort_by == rosbag2_storage::ReadOrder::File) { + throw std::runtime_error("Not Implemented - File read order"); + } + + read_order_ = read_order; + read_statement_ = nullptr; +} + bool SqliteStorage::has_next() { if (!read_statement_) { @@ -312,7 +325,7 @@ std::shared_ptr SqliteStorage::read_next( // set start time to current time // and set seek_row_id to the new row id up seek_time_ = bag_message->time_stamp; - seek_row_id_ = std::get<3>(*current_message_row_) + 1; + seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1); ++current_message_row_; return bag_message; @@ -391,6 +404,7 @@ void SqliteStorage::prepare_for_reading() { std::string statement_str = "SELECT data, timestamp, topics.name, messages.id " "FROM messages JOIN topics ON messages.topic_id = topics.id WHERE "; + std::vector where_conditions; // add topic filter if (!storage_filter_.topics.empty()) { @@ -402,29 +416,48 @@ void SqliteStorage::prepare_for_reading() topic_list += ","; } } - statement_str += "(topics.name IN (" + topic_list + ")) AND "; + where_conditions.push_back("(topics.name IN (" + topic_list + "))"); } // add topic filter based on regular expression if (!storage_filter_.topics_regex.empty()) { // Construct string for selected topics - statement_str += "(topics.name REGEXP '" + storage_filter_.topics_regex + "')"; - statement_str += " AND "; + where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')"); } // exclude topics based on regular expressions if (!storage_filter_.topics_regex_to_exclude.empty()) { // Construct string for selected topics - statement_str += " (topics.name NOT IN "; - statement_str += " (SELECT topics.name FROM topics WHERE topics.name REGEXP '"; - statement_str += storage_filter_.topics_regex_to_exclude + "')"; - statement_str += " ) AND "; + where_conditions.push_back( + "(topics.name NOT IN " + "(SELECT topics.name FROM topics WHERE topics.name REGEXP '" + + storage_filter_.topics_regex_to_exclude + "'))"); + } + + const std::string direction_op = read_order_.reverse ? "<" : ">"; + const std::string order_direction = read_order_.reverse ? "DESC" : "ASC"; + + // add seek head filter + // When doing timestamp ordering, we need a secondary ordering on message_id + // Timestamp is not required to be unique, but message_id is, so for messages with the same + // timestamp we order by the id to have a consistent and deterministic order. + where_conditions.push_back( + "(((timestamp = " + std::to_string(seek_time_) + ") " + "AND (messages.id " + direction_op + "= " + std::to_string(seek_row_id_) + ")) " + "OR (timestamp " + direction_op + " " + std::to_string(seek_time_) + ")) "); + + for ( + std::vector::const_iterator it = where_conditions.begin(); + it != where_conditions.end(); ++it) + { + statement_str += *it; + if (it != where_conditions.end() - 1) { + statement_str += " AND "; + } } - // add start time filter - statement_str += "(((timestamp = " + std::to_string(seek_time_) + ") " - "AND (messages.id >= " + std::to_string(seek_row_id_) + ")) " - "OR (timestamp > " + std::to_string(seek_time_) + ")) "; // add order by time then id - statement_str += "ORDER BY messages.timestamp, messages.id;"; + statement_str += "ORDER BY messages.timestamp " + order_direction; + statement_str += ", messages.id " + order_direction; + statement_str += ";"; read_statement_ = database_->prepare_statement(statement_str); message_result_ = read_statement_->execute_query< @@ -547,9 +580,9 @@ void SqliteStorage::reset_filter() void SqliteStorage::seek(const rcutils_time_point_value_t & timestamp) { - // reset row id to 0 and set start time to input + // reset row id and set start time to input // keep topic filter and reset read statement for re-read - seek_row_id_ = 0; + seek_row_id_ = read_order_.reverse ? get_last_rowid() : 0; seek_time_ = timestamp; read_statement_ = nullptr; } @@ -567,6 +600,13 @@ SqliteWrapper & SqliteStorage::get_sqlite_database_wrapper() return *database_; } +int SqliteStorage::get_last_rowid() +{ + auto statement = database_->prepare_statement("SELECT max(rowid) from messages;"); + auto query_results = statement->execute_query(); + return std::get<0>(*query_results.begin()); +} + } // namespace rosbag2_storage_plugins #include "pluginlib/class_list_macros.hpp" // NOLINT diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 4a352401b0..a1001c13be 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -36,6 +36,9 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn void close() override {} + void set_read_order(const rosbag2_storage::ReadOrder &) override + {} + bool has_next() override { if (filter_.topics.empty()) {