diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index f4df5f5346..716967a12b 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -21,11 +21,11 @@ #ifdef ROSBAG2_STORAGE_MCAP_HAS_YAML_HPP #include "rosbag2_storage/yaml.hpp" #else -// COMPATIBILITY(foxy, galactic) - this block is available in rosbag2_storage/yaml.hpp in H + // COMPATIBILITY(foxy, galactic) - this block is available in rosbag2_storage/yaml.hpp in H #ifdef _WIN32 -// This is necessary because of a bug in yaml-cpp's cmake + // This is necessary because of a bug in yaml-cpp's cmake #define YAML_CPP_DLL -// This is necessary because yaml-cpp does not always use dllimport/dllexport consistently + // This is necessary because yaml-cpp does not always use dllimport/dllexport consistently #pragma warning(push) #pragma warning(disable : 4251) #pragma warning(disable : 4275) @@ -58,33 +58,33 @@ #pragma warning(disable : 4251) #endif -#define DECLARE_YAML_VALUE_MAP(KEY_TYPE, VALUE_TYPE, ...) \ - template<> \ - struct convert \ - { \ - static Node encode(const KEY_TYPE & e) \ - { \ +#define DECLARE_YAML_VALUE_MAP(KEY_TYPE, VALUE_TYPE, ...) \ + template <> \ + struct convert \ + { \ + static Node encode(const KEY_TYPE & e) \ + { \ static const std::pair mapping[] = __VA_ARGS__; \ - for (const auto & m : mapping) { \ - if (m.first == e) { \ - return Node(m.second); \ - } \ - } \ - return Node(""); \ - } \ - \ - static bool decode(const Node & node, KEY_TYPE & e) \ - { \ + for (const auto & m : mapping) { \ + if (m.first == e) { \ + return Node(m.second); \ + } \ + } \ + return Node(""); \ + } \ + \ + static bool decode(const Node & node, KEY_TYPE & e) \ + { \ static const std::pair mapping[] = __VA_ARGS__; \ - const auto val = node.as(); \ - for (const auto & m : mapping) { \ - if (m.second == val) { \ - e = m.first; \ - return true; \ - } \ - } \ - return false; \ - } \ + const auto val = node.as(); \ + for (const auto & m : mapping) { \ + if (m.second == val) { \ + e = m.first; \ + return true; \ + } \ + } \ + return false; \ + } \ } namespace @@ -93,7 +93,7 @@ namespace struct McapWriterOptions : mcap::McapWriterOptions { McapWriterOptions() - : mcap::McapWriterOptions("ros2") + : mcap::McapWriterOptions("ros2") { } }; @@ -102,7 +102,7 @@ struct McapWriterOptions : mcap::McapWriterOptions namespace YAML { #ifndef ROSBAG2_STORAGE_MCAP_HAS_YAML_HPP -template +template void optional_assign(const Node & node, std::string field, T & assign_to) { if (node[field]) { @@ -111,21 +111,19 @@ void optional_assign(const Node & node, std::string field, T & assign_to) } #endif -DECLARE_YAML_VALUE_MAP( - mcap::Compression, std::string, - {{mcap::Compression::None, "None"}, - {mcap::Compression::Lz4, "Lz4"}, - {mcap::Compression::Zstd, "Zstd"}}); - -DECLARE_YAML_VALUE_MAP( - mcap::CompressionLevel, std::string, - {{mcap::CompressionLevel::Fastest, "Fastest"}, - {mcap::CompressionLevel::Fast, "Fast"}, - {mcap::CompressionLevel::Default, "Default"}, - {mcap::CompressionLevel::Slow, "Slow"}, - {mcap::CompressionLevel::Slowest, "Slowest"}}); - -template<> +DECLARE_YAML_VALUE_MAP(mcap::Compression, std::string, + {{mcap::Compression::None, "None"}, + {mcap::Compression::Lz4, "Lz4"}, + {mcap::Compression::Zstd, "Zstd"}}); + +DECLARE_YAML_VALUE_MAP(mcap::CompressionLevel, std::string, + {{mcap::CompressionLevel::Fastest, "Fastest"}, + {mcap::CompressionLevel::Fast, "Fast"}, + {mcap::CompressionLevel::Default, "Default"}, + {mcap::CompressionLevel::Slow, "Slow"}, + {mcap::CompressionLevel::Slowest, "Slowest"}}); + +template <> struct convert { // NOTE: when updating this struct, also update documentation in README.md @@ -171,7 +169,7 @@ static void OnProblem(const mcap::Status & status) * A storage implementation for the MCAP file format. */ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage - : public rosbag2_storage::storage_interfaces::ReadWriteInterface + : public rosbag2_storage::storage_interfaces::ReadWriteInterface { public: MCAPStorage(); @@ -179,18 +177,15 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage /** BaseIOInterface **/ #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS - void open( - const rosbag2_storage::StorageOptions & storage_options, - rosbag2_storage::storage_interfaces::IOFlag io_flag = - rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; - void open( - const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag = - rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE); + void open(const rosbag2_storage::StorageOptions & storage_options, + rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; + void open(const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE); #else - void open( - const std::string & uri, - rosbag2_storage::storage_interfaces::IOFlag io_flag = - rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; + void open(const std::string & uri, + rosbag2_storage::storage_interfaces::IOFlag io_flag = + rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override; #endif /** BaseInfoInterface **/ @@ -225,19 +220,17 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage void write(std::shared_ptr msg) override; void write( const std::vector> & msg) override; - void create_topic( - const rosbag2_storage::TopicMetadata & topic, - const rosbag2_storage::MessageDefinition & message_definition) override; + void create_topic(const rosbag2_storage::TopicMetadata & topic, + const rosbag2_storage::MessageDefinition & message_definition) override; void remove_topic(const rosbag2_storage::TopicMetadata & topic) override; #ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA void update_metadata(const rosbag2_storage::BagMetadata &) override; #endif private: - void open_impl( - const std::string & uri, const std::string & preset_profile, - rosbag2_storage::storage_interfaces::IOFlag io_flag, - const std::string & storage_config_uri); + void open_impl(const std::string & uri, const std::string & preset_profile, + rosbag2_storage::storage_interfaces::IOFlag io_flag, + const std::string & storage_config_uri); void reset_iterator(); bool read_and_enqueue_message(); @@ -292,13 +285,11 @@ MCAPStorage::~MCAPStorage() /** BaseIOInterface **/ #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS -void MCAPStorage::open( - const rosbag2_storage::StorageOptions & storage_options, - rosbag2_storage::storage_interfaces::IOFlag io_flag) +void MCAPStorage::open(const rosbag2_storage::StorageOptions & storage_options, + rosbag2_storage::storage_interfaces::IOFlag io_flag) { - open_impl( - storage_options.uri, storage_options.storage_preset_profile, io_flag, - storage_options.storage_config_uri); + open_impl(storage_options.uri, storage_options.storage_preset_profile, io_flag, + storage_options.storage_config_uri); } #endif @@ -322,59 +313,58 @@ static void SetOptionsForPreset(const std::string & preset_profile, McapWriterOp options.chunkSize = 4 * 1024 * 1024; } else if (preset_profile != "none") { throw std::runtime_error( - "unknown MCAP storage preset profile " - "(valid options are 'none', 'fastwrite', 'zstd_fast', 'zstd_small'): " + - preset_profile); + "unknown MCAP storage preset profile " + "(valid options are 'none', 'fastwrite', 'zstd_fast', 'zstd_small'): " + + preset_profile); } } -void MCAPStorage::open_impl( - const std::string & uri, const std::string & preset_profile, - rosbag2_storage::storage_interfaces::IOFlag io_flag, - const std::string & storage_config_uri) +void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_profile, + rosbag2_storage::storage_interfaces::IOFlag io_flag, + const std::string & storage_config_uri) { switch (io_flag) { case rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY: { - relative_path_ = uri; - input_ = std::make_unique(relative_path_, std::ios::binary); - data_source_ = std::make_unique(*input_); - mcap_reader_ = std::make_unique(); - auto status = mcap_reader_->open(*data_source_); - if (!status.ok()) { - throw std::runtime_error(status.message); - } - last_read_time_point_ = 0; - reset_iterator(); - break; + relative_path_ = uri; + input_ = std::make_unique(relative_path_, std::ios::binary); + data_source_ = std::make_unique(*input_); + mcap_reader_ = std::make_unique(); + auto status = mcap_reader_->open(*data_source_); + if (!status.ok()) { + throw std::runtime_error(status.message); } + last_read_time_point_ = 0; + reset_iterator(); + break; + } case rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE: case rosbag2_storage::storage_interfaces::IOFlag::APPEND: { - // APPEND does not seem to be used; treat it the same as READ_WRITE - io_flag = rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE; - relative_path_ = uri + FILE_EXTENSION; - - mcap_writer_ = std::make_unique(); - McapWriterOptions options; - // Set defaults for the rosbag2 storage plugin specifically. - options.noChunkCRC = true; - options.compression = mcap::Compression::None; - // Set options from preset profile first - if (!preset_profile.empty()) { - SetOptionsForPreset(preset_profile, options); - } - // If both preset profile and storage config are specified, - // options from the storage config are overlaid on the options from the preset profile. - if (!storage_config_uri.empty()) { - YAML::Node yaml_node = YAML::LoadFile(storage_config_uri); - YAML::convert::decode(yaml_node, options); - } + // APPEND does not seem to be used; treat it the same as READ_WRITE + io_flag = rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE; + relative_path_ = uri + FILE_EXTENSION; + + mcap_writer_ = std::make_unique(); + McapWriterOptions options; + // Set defaults for the rosbag2 storage plugin specifically. + options.noChunkCRC = true; + options.compression = mcap::Compression::None; + // Set options from preset profile first + if (!preset_profile.empty()) { + SetOptionsForPreset(preset_profile, options); + } + // If both preset profile and storage config are specified, + // options from the storage config are overlaid on the options from the preset profile. + if (!storage_config_uri.empty()) { + YAML::Node yaml_node = YAML::LoadFile(storage_config_uri); + YAML::convert::decode(yaml_node, options); + } - auto status = mcap_writer_->open(relative_path_, options); - if (!status.ok()) { - throw std::runtime_error(status.message); - } - break; + auto status = mcap_writer_->open(relative_path_, options); + if (!status.ok()) { + throw std::runtime_error(status.message); } + break; + } } opened_as_ = io_flag; metadata_.relative_file_paths = {get_relative_file_path()}; @@ -506,9 +496,8 @@ bool MCAPStorage::read_and_enqueue_message() last_enqueued_message_offset_ = messageView.messageOffset; msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime); msg->topic_name = messageView.channel->topic; - msg->serialized_data = rosbag2_storage::make_serialized_message( - messageView.message.data, - messageView.message.dataSize); + msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data, + messageView.message.dataSize); // enqueue this message to be used next_ = msg; @@ -534,22 +523,22 @@ void MCAPStorage::reset_iterator() options.readOrder = read_order_; if (!storage_filter_.topics.empty()) { options.topicFilter = [this](std::string_view topic) { - for (const auto & match_topic : storage_filter_.topics) { - if (match_topic == topic) { - return true; - } + for (const auto & match_topic : storage_filter_.topics) { + if (match_topic == topic) { + return true; } - return false; - }; + } + return false; + }; } #ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX if (!storage_filter_.topics_regex.empty()) { options.topicFilter = [this](std::string_view topic) { - std::smatch m; - std::string topic_string(topic); - std::regex re(storage_filter_.topics_regex); - return std::regex_match(topic_string, m, re); - }; + std::smatch m; + std::string topic_string(topic); + std::regex re(storage_filter_.topics_regex); + return std::regex_match(topic_string, m, re); + }; } #endif linear_view_ = @@ -580,9 +569,9 @@ bool MCAPStorage::enqueued_message_is_already_read() return false; } if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) { - return *last_enqueued_message_offset_ >= *last_read_message_offset_; + return (*last_enqueued_message_offset_ >= *last_read_message_offset_); } - return *last_enqueued_message_offset_ <= *last_read_message_offset_; + return (*last_enqueued_message_offset_ <= *last_read_message_offset_); } void MCAPStorage::ensure_summary_read() @@ -753,8 +742,8 @@ void MCAPStorage::write(std::shared_ptrwrite(mcap_msg); if (!status.ok()) { throw std::runtime_error{std::string{"Failed to write "} + - std::to_string(msg->serialized_data->buffer_length) + - " byte message to MCAP file: " + status.message}; + std::to_string(msg->serialized_data->buffer_length) + + " byte message to MCAP file: " + status.message}; } /// Update metadata @@ -775,9 +764,8 @@ void MCAPStorage::write( } } -void MCAPStorage::create_topic( - const rosbag2_storage::TopicMetadata & topic, - const rosbag2_storage::MessageDefinition & message_definition) +void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic, + const rosbag2_storage::MessageDefinition & message_definition) { auto topic_info = rosbag2_storage::TopicInformation{topic, 0}; const auto topic_it = topics_.find(topic.name); @@ -797,9 +785,8 @@ void MCAPStorage::create_topic( schema.name = datatype; schema.encoding = message_definition.encoding; const auto & full_text = message_definition.encoded_message_definition; - schema.data.assign( - reinterpret_cast(full_text.data()), - reinterpret_cast(full_text.data() + full_text.size())); + schema.data.assign(reinterpret_cast(full_text.data()), + reinterpret_cast(full_text.data() + full_text.size())); // TODO(jrms): save message_definition.type_hash in mcap schema mcap_writer_->addSchema(schema); schema_ids_.emplace(datatype, schema.id); @@ -815,9 +802,8 @@ void MCAPStorage::create_topic( channel.topic = topic.name; channel.messageEncoding = topic_info.topic_metadata.serialization_format; channel.schemaId = schema_id; - channel.metadata.emplace( - "offered_qos_profiles", - topic_info.topic_metadata.offered_qos_profiles); + channel.metadata.emplace("offered_qos_profiles", + topic_info.topic_metadata.offered_qos_profiles); channel.metadata.emplace("topic_type_hash", topic_info.topic_metadata.type_description_hash); mcap_writer_->addChannel(channel); channel_ids_.emplace(topic.name, channel.id); @@ -839,8 +825,8 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad { if (bag_metadata.compression_mode == "message") { throw std::runtime_error( - "MCAP storage plugin does not support message compression, " - "consider using chunk compression by setting `compression: 'Zstd'` in storage config"); + "MCAP storage plugin does not support message compression, " + "consider using chunk compression by setting `compression: 'Zstd'` in storage config"); } mcap::Metadata metadata; @@ -856,9 +842,8 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad } // namespace rosbag2_storage_plugins #include "pluginlib/class_list_macros.hpp" // NOLINT -PLUGINLIB_EXPORT_CLASS( - rosbag2_storage_plugins::MCAPStorage, - rosbag2_storage::storage_interfaces::ReadWriteInterface) +PLUGINLIB_EXPORT_CLASS(rosbag2_storage_plugins::MCAPStorage, + rosbag2_storage::storage_interfaces::ReadWriteInterface) #ifdef _WIN32 #pragma warning(pop) diff --git a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp index 58cd8c43dd..f94520bf05 100644 --- a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp +++ b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp @@ -49,7 +49,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) const std::string message_data = "Test Message 1"; const std::string storage_id = "mcap"; const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg", - "string data", ""}; + "string data", ""}; // COMPATIBILITY(foxy) // using verbose APIs for Foxy compatibility which did not yet provide plain-message API rclcpp::Serialization serialization; @@ -103,23 +103,19 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) #endif auto topics_and_types = reader->get_all_topics_and_types(); - EXPECT_THAT( - topics_and_types, - ElementsAreArray( - {rosbag2_storage::TopicMetadata{ - topic_name, "std_msgs/msg/String", "cdr", "qos_profile1", "type_hash1"}})); + EXPECT_THAT(topics_and_types, + ElementsAreArray({rosbag2_storage::TopicMetadata{ + topic_name, "std_msgs/msg/String", "cdr", "qos_profile1", "type_hash1"}})); const auto metadata = reader->get_metadata(); EXPECT_THAT(metadata.storage_identifier, Eq("mcap")); EXPECT_THAT(metadata.relative_file_paths, ElementsAreArray({expected_bag.string()})); - EXPECT_THAT( - metadata.topics_with_message_count, - ElementsAreArray( - {rosbag2_storage::TopicInformation{ - rosbag2_storage::TopicMetadata{topic_name, "std_msgs/msg/String", "cdr", - "qos_profile1", "type_hash1"}, - 1u}})); + EXPECT_THAT(metadata.topics_with_message_count, + ElementsAreArray({rosbag2_storage::TopicInformation{ + rosbag2_storage::TopicMetadata{topic_name, "std_msgs/msg/String", "cdr", + "qos_profile1", "type_hash1"}, + 1u}})); EXPECT_THAT(metadata.message_count, Eq(1u)); const auto current_distro = rcpputils::get_env_var("ROS_DISTRO"); @@ -151,7 +147,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml) const std::string storage_id = "mcap"; const std::string config_path = _TEST_RESOURCES_DIR_PATH; const rosbag2_storage::MessageDefinition definition = {"std_msgs/msg/String", "ros2msg", - "string data", ""}; + "string data", ""}; rclcpp::Serialization serialization; {