Skip to content

Commit

Permalink
Fix AND conditions across all tests, cleanup test suite
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Sep 23, 2022
1 parent b89d2b1 commit 9b4cdcc
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 46 deletions.
74 changes: 47 additions & 27 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/ros_helper.hpp"
#include "rosbag2_storage/topic_metadata.hpp"

#include "rosbag2_test_common/temporary_directory_fixture.hpp"
Expand Down Expand Up @@ -241,7 +242,8 @@ class OrderTestWriter : public rosbag2_cpp::writers::SequentialWriter
{
public:
// makes the method public for manual splitting
void split() {
void split()
{
split_bagfile();
}
};
Expand All @@ -252,20 +254,33 @@ class ReadOrderTest : public TemporaryDirectoryFixture
public:
ReadOrderTest()
{
fake_data = std::make_shared<rcutils_uint8_array_t>();
std::string msg_content = "Hello";
auto msg_length = msg_content.length();
fake_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);

storage_options.uri = (std::filesystem::path(temporary_dir_path_) / "ordertest").string();
storage_options.storage_id = "sqlite3";

writer.open(storage_options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(
{
topic_name,
"test_msgs/ByteMultiArray",
"cdr",
""
});

for (const auto & file_messages : message_timestamps_by_file) {
for (const auto time_stamp : file_messages) {
message_timestamps.push_back(time_stamp);
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = fake_data;
msg->time_stamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
writer.split();
// writer.split();
}
writer.close();
}
Expand All @@ -274,6 +289,7 @@ class ReadOrderTest : public TemporaryDirectoryFixture
{100, 300, 200},
{500, 400, 600}
};
std::vector<rcutils_time_point_value_t> message_timestamps;
const std::string topic_name = "testtopic";
OrderTestWriter writer;
rosbag2_cpp::readers::SequentialReader reader;
Expand All @@ -283,18 +299,12 @@ class ReadOrderTest : public TemporaryDirectoryFixture

TEST_F(ReadOrderTest, default_read_order) {
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
auto next = reader.read_next();
ASSERT_EQ(next->time_stamp, 100);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 200);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 300);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 400);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 500);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 600);
std::sort(message_timestamps.begin(), message_timestamps.end());
for (const auto time_stamp : message_timestamps) {
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
ASSERT_EQ(next->time_stamp, time_stamp);
}
ASSERT_FALSE(reader.has_next());
}

Expand All @@ -307,17 +317,27 @@ TEST_F(ReadOrderTest, reverse_received_timestamp_order) {
auto metadata = reader.get_metadata();
reader.seek((metadata.starting_time + metadata.duration).time_since_epoch().count());

auto next = reader.read_next();
ASSERT_EQ(next->time_stamp, 600);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 500);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 400);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 300);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 200);
next = reader.read_next();
ASSERT_EQ(next->time_stamp, 100);
std::sort(
message_timestamps.begin(), message_timestamps.end(), [](auto a, auto b) {return a > b;});
for (const auto time_stamp : message_timestamps) {
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
ASSERT_EQ(next->time_stamp, time_stamp);
}
ASSERT_FALSE(reader.has_next());
}

TEST_F(ReadOrderTest, file_order) {
rosbag2_storage::ReadOrder order;
order.sort_by = rosbag2_storage::ReadOrder::File;
reader.set_read_order(order);
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});

// Don't sort timestamps, should be returned in write order
for (const auto time_stamp : message_timestamps) {
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
ASSERT_EQ(next->time_stamp, time_stamp);
}
ASSERT_FALSE(reader.has_next());
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
void write_locked(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
RCPPUTILS_TSA_REQUIRES(database_write_mutex_);

int get_last_rowid();

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string, int>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,15 @@ void SqliteStorage::set_read_order(rosbag2_storage::ReadOrder read_order)
if (read_order == read_order_) {
return;
}
read_order_ = read_order;
read_statement_ = nullptr;

if (read_order_.sort_by == rosbag2_storage::ReadOrder::PublishedTimestamp) {
throw std::runtime_error("Not Implemented - PublishedTimestamp read order.");
}
read_order_ = read_order;
read_statement_ = nullptr;
if (read_order.reverse && seek_row_id_ == 0) {
seek_row_id_ = get_last_rowid();
}
}

bool SqliteStorage::has_next()
Expand Down Expand Up @@ -403,6 +407,7 @@ void SqliteStorage::prepare_for_reading()
{
std::string statement_str = "SELECT data, timestamp, topics.name, messages.id "
"FROM messages JOIN topics ON messages.topic_id = topics.id WHERE ";
std::vector<std::string> where_conditions;

// add topic filter
if (!storage_filter_.topics.empty()) {
Expand All @@ -414,38 +419,50 @@ void SqliteStorage::prepare_for_reading()
topic_list += ",";
}
}
statement_str += "(topics.name IN (" + topic_list + ")) AND ";
where_conditions.push_back("(topics.name IN (" + topic_list + "))");
}
// add topic filter based on regular expression
if (!storage_filter_.topics_regex.empty()) {
// Construct string for selected topics
statement_str += "(topics.name REGEXP '" + storage_filter_.topics_regex + "')";
statement_str += " AND ";
where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')");
}
// exclude topics based on regular expressions
if (!storage_filter_.topics_regex_to_exclude.empty()) {
// Construct string for selected topics
statement_str += " (topics.name NOT IN ";
statement_str += " (SELECT topics.name FROM topics WHERE topics.name REGEXP '";
statement_str += storage_filter_.topics_regex_to_exclude + "')";
statement_str += " ) AND ";
where_conditions.push_back(
"(topics.name NOT IN "
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.topics_regex_to_exclude + "'))");
}

const std::string direction_op = read_order_.reverse ? "<" : ">";
const std::string order_direction = read_order_.reverse ? "DESC" : "ASC";

// add seek head filter
std::string message_id_filter = "(messages.id >= " + std::to_string(seek_row_id_) + ")";
std::string timestamp_equal_filter = "(timestamp = " + std::to_string(seek_time_) + ")";
std::string timestamp_greater_filter = "(timestamp > " + std::to_string(seek_time_) + ")";
if (read_order_.sort_by == rosbag2_storage::ReadOrder::File) {
statement_str += message_id_filter;
where_conditions.push_back(
"(messages.id " + direction_op + "= " + std::to_string(seek_row_id_) + ") ");
} else {
statement_str += "((" + message_id_filter + " AND " + timestamp_equal_filter + ") "
"OR " + timestamp_greater_filter + ") ";
where_conditions.push_back(
"(((timestamp = " + std::to_string(seek_time_) + ") "
"AND (messages.id " + direction_op + "= " + std::to_string(seek_row_id_) + ")) "
"OR (timestamp " + direction_op + " " + std::to_string(seek_time_) + ")) ");
}

for (
std::vector<std::string>::const_iterator it = where_conditions.begin();
it != where_conditions.end(); ++it)
{
statement_str += *it;
if (it != where_conditions.end() - 1) {
statement_str += " AND ";
}
}

// add order by time then id
std::string order_direction = read_order_.reverse ? "DESC" : "ASC";
switch (read_order_.sort_by) {
case rosbag2_storage::ReadOrder::ReceivedTimestamp:
statement_str += "ORDER BY messages.timestamp " + order_direction +
statement_str += "ORDER BY messages.timestamp " + order_direction;
statement_str += ", messages.id " + order_direction;
break;
case rosbag2_storage::ReadOrder::File:
Expand Down Expand Up @@ -578,9 +595,9 @@ void SqliteStorage::reset_filter()

void SqliteStorage::seek(const rcutils_time_point_value_t & timestamp)
{
// reset row id to 0 and set start time to input
// reset row id and set start time to input
// keep topic filter and reset read statement for re-read
seek_row_id_ = 0;
seek_row_id_ = read_order_.reverse ? get_last_rowid() : 0;
seek_time_ = timestamp;
read_statement_ = nullptr;
}
Expand All @@ -598,6 +615,13 @@ SqliteWrapper & SqliteStorage::get_sqlite_database_wrapper()
return *database_;
}

int SqliteStorage::get_last_rowid()
{
auto statement = database_->prepare_statement("SELECT max(rowid) from messages;");
auto query_results = statement->execute_query<int>();
return std::get<0>(*query_results.begin());
}

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
Expand Down

0 comments on commit 9b4cdcc

Please sign in to comment.