Skip to content

Commit

Permalink
Reverse read order API and sqlite storage implementation (#1083)
Browse files Browse the repository at this point in the history
* Reverse read order API and sqlite storage implementation for reverse timestamp order

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp authored Oct 6, 2022
1 parent c92010b commit 16cb47f
Show file tree
Hide file tree
Showing 23 changed files with 443 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
MOCK_METHOD1(write, void(std::shared_ptr<const rosbag2_storage::SerializedBagMessage>));
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_sequential_reader
test/rosbag2_cpp/test_sequential_reader.cpp)
test/rosbag2_cpp/test_sequential_reader.cpp test/rosbag2_cpp/fake_data.cpp)
if(TARGET test_sequential_reader)
ament_target_dependencies(test_sequential_reader rosbag2_storage rosbag2_test_common test_msgs)
target_link_libraries(test_sequential_reader ${PROJECT_NAME})
Expand Down Expand Up @@ -221,7 +221,7 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_sequential_writer
test/rosbag2_cpp/test_sequential_writer.cpp)
test/rosbag2_cpp/test_sequential_writer.cpp test/rosbag2_cpp/fake_data.cpp)
if(TARGET test_sequential_writer)
ament_target_dependencies(test_sequential_writer rosbag2_storage rosbag2_test_common test_msgs)
target_link_libraries(test_sequential_writer ${PROJECT_NAME})
Expand Down
12 changes: 11 additions & 1 deletion rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BaseReaderInterface;
/**
* The Reader allows opening and reading messages of a bag.
*/
class ROSBAG2_CPP_PUBLIC Reader final
class ROSBAG2_CPP_PUBLIC Reader
{
public:
explicit Reader(
Expand Down Expand Up @@ -99,6 +99,16 @@ class ROSBAG2_CPP_PUBLIC Reader final
*/
void close();

/**
* Set the read order for continued iteration of messages, without changing the current
* read head timestamp.
*
* \param read_order Sorting criterion and direction to read messages in
* \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file()
* or load_next_file() will cause undefined behavior
*/
void set_read_order(const rosbag2_storage::ReadOrder & read_order);

/**
* Ask whether the underlying bagfile contains at least one more message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/base_read_interface.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/topic_metadata.hpp"

Expand All @@ -45,6 +46,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface

virtual void close() = 0;

virtual void set_read_order(const rosbag2_storage::ReadOrder &) = 0;

virtual bool has_next() = 0;

virtual std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class ROSBAG2_CPP_PUBLIC SequentialReader

void close() override;

/**
* \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file()
* or load_next_file() will cause undefined behavior
*/
void set_read_order(const rosbag2_storage::ReadOrder & order) override;

bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
Expand Down Expand Up @@ -141,7 +147,6 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::vector<rosbag2_storage::TopicInformation> & topics);

/**
* Checks if the serialization format of the converter factory is the same as that of the storage
* factory.
* If not, changes the serialization format of the converter factory to use the serialization
Expand Down Expand Up @@ -187,6 +192,7 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};

bag_events::EventCallbackManager callback_manager_;
rosbag2_storage::ReadOrder read_order_{};
};

} // namespace readers
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ void Reader::close()
reader_impl_->close();
}

void Reader::set_read_order(const rosbag2_storage::ReadOrder & order)
{
return reader_impl_->set_read_order(order);
}

bool Reader::has_next()
{
return reader_impl_->has_next();
Expand Down
28 changes: 25 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ void SequentialReader::open(
if (!storage_) {
throw std::runtime_error{"No storage could be initialized from the inputs."};
}
storage_->set_read_order(read_order_);
metadata_ = storage_->get_metadata();
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
Expand All @@ -128,6 +129,14 @@ void SequentialReader::open(
topics[0].topic_metadata.serialization_format);
}

void SequentialReader::set_read_order(const rosbag2_storage::ReadOrder & order)
{
if (storage_) {
storage_->set_read_order(order);
}
read_order_ = order;
}

bool SequentialReader::has_next()
{
if (storage_) {
Expand Down Expand Up @@ -194,7 +203,11 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
seek_time_ = timestamp;
if (storage_) {
// reset to the first file
current_file_iterator_ = file_paths_.begin();
if (read_order_.reverse) {
current_file_iterator_ = std::prev(file_paths_.end());
} else {
current_file_iterator_ = file_paths_.begin();
}
load_current_file();
return;
}
Expand All @@ -204,7 +217,11 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)

bool SequentialReader::has_next_file() const
{
return current_file_iterator_ + 1 != file_paths_.end();
if (read_order_.reverse) {
return current_file_iterator_ != file_paths_.begin();
} else {
return (current_file_iterator_ + 1) != file_paths_.end();
}
}

void SequentialReader::load_current_file()
Expand All @@ -222,6 +239,7 @@ void SequentialReader::load_current_file()
throw std::runtime_error{"No storage could be initialized. Abort"};
}
// set filters
storage_->set_read_order(read_order_);
storage_->seek(seek_time_);
set_filter(topics_filter_);
}
Expand All @@ -231,7 +249,11 @@ void SequentialReader::load_next_file()
assert(current_file_iterator_ != file_paths_.end());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
current_file_iterator_++;
if (read_order_.reverse) {
current_file_iterator_--;
} else {
current_file_iterator_++;
}
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
Expand Down
56 changes: 56 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "fake_data.hpp"

#include "rosbag2_storage/ros_helper.hpp"

void write_sample_split_bag(
const rosbag2_storage::StorageOptions & storage_options,
const std::vector<std::pair<rcutils_time_point_value_t, uint32_t>> & fake_messages,
size_t split_every)
{
std::string topic_name = "testtopic";

ManualSplitSequentialWriter writer{};
writer.open(storage_options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(
{
topic_name,
"test_msgs/ByteMultiArray",
"cdr",
""
});
for (size_t i = 0; i < fake_messages.size(); i++) {
if (i > 0 && (i % split_every == 0)) {
writer.split_bagfile();
}

const auto message = fake_messages[i];
rcutils_time_point_value_t time_stamp = message.first;
uint32_t value = message.second;

auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value));
msg->time_stamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
writer.close();
}
35 changes: 35 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/fake_data.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__FAKE_DATA_HPP_
#define ROSBAG2_CPP__FAKE_DATA_HPP_

#include <utility>
#include <vector>

#include "rosbag2_cpp/writers/sequential_writer.hpp"

class ManualSplitSequentialWriter : public rosbag2_cpp::writers::SequentialWriter
{
public:
using rosbag2_cpp::writers::SequentialWriter::split_bagfile;
};

// Write vector of <timestamp, uint32_data_value> pairs to bag files, splitting every N messages
void write_sample_split_bag(
const rosbag2_storage::StorageOptions & storage_options,
const std::vector<std::pair<rcutils_time_point_value_t, uint32_t>> & fake_messages,
size_t split_every);

#endif // ROSBAG2_CPP__FAKE_DATA_HPP_
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(has_next_file, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
Expand Down
Loading

0 comments on commit 16cb47f

Please sign in to comment.