diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index b9862efc21..b1223a7d6d 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -67,11 +67,13 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'the bag will split at whichever threshold is reached first.' ) parser.add_argument( - '--max-cache-size', type=int, default=1024*1024, - help='maximum size (in bytes) of messages to hold in cache before writing to disk. ' - 'Default is 1 mebibyte, everytime the cache size equals or exceeds 1MB, ' - 'it will be written to disk. If the value specified is 0, then every message is' - 'directly written to disk.' + '--max-cache-size', type=int, default=100*1024*1024, + help='maximum size (in bytes) of messages to hold in each buffer of cache.' + 'Default is 100 mebibytes. The cache is handled through double buffering, ' + 'which means that in pessimistic case up to twice the parameter value of memory' + 'is needed. A rule of thumb is to cache an order of magitude corresponding to' + 'about one second of total recorded data volume.' + 'If the value specified is 0, then every message is directly written to disk.' ) parser.add_argument( '--compression-mode', type=str, default='none', diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8c99a403ff..5c8d4cdc35 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -37,20 +37,6 @@ namespace rosbag2_compression { -namespace -{ -std::string format_storage_uri(const std::string & base_folder, uint64_t storage_count) -{ - // Right now `base_folder_` is always just the folder name for where to install the bagfile. - // The name of the folder needs to be queried in case - // SequentialWriter is opened with a relative path. - std::stringstream storage_file_name; - storage_file_name << rcpputils::fs::path(base_folder).filename().string() << "_" << storage_count; - - return (rcpputils::fs::path(base_folder) / storage_file_name.str()).string(); -} -} // namespace - SequentialCompressionWriter::SequentialCompressionWriter( const rosbag2_compression::CompressionOptions & compression_options) : SequentialWriter(), @@ -123,6 +109,10 @@ void SequentialCompressionWriter::reset() metadata_io_->write_metadata(base_folder_, metadata_); } + if (use_cache_) { + cache_consumer_.reset(); + message_cache_.reset(); + } storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory storage_factory_.reset(); } @@ -155,11 +145,7 @@ void SequentialCompressionWriter::compress_last_file() void SequentialCompressionWriter::split_bagfile() { - storage_options_.uri = format_storage_uri( - base_folder_, - metadata_.relative_file_paths.size()); - - storage_ = storage_factory_->open_read_write(storage_options_); + switch_to_next_storage(); if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { compress_last_file(); @@ -169,18 +155,9 @@ void SequentialCompressionWriter::split_bagfile() // Add a check to make sure reset() does not compress the file again if we couldn't load the // storage plugin. should_compress_last_file_ = false; - - std::stringstream errmsg; - errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!"; - throw std::runtime_error{errmsg.str()}; } metadata_.relative_file_paths.push_back(storage_->get_relative_file_path()); - - // Re-register all topics since we rolled-over to a new bagfile. - for (const auto & topic : topics_names_to_info_) { - storage_->create_topic(topic.second.topic_metadata); - } } void SequentialCompressionWriter::compress_message( diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index cd378c4e2c..1d78d5831f 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -45,6 +45,9 @@ find_package(rosidl_typesupport_cpp REQUIRED) find_package(rosidl_typesupport_introspection_cpp REQUIRED) add_library(${PROJECT_NAME} SHARED + src/rosbag2_cpp/cache/cache_consumer.cpp + src/rosbag2_cpp/cache/message_cache_buffer.cpp + src/rosbag2_cpp/cache/message_cache.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp src/rosbag2_cpp/reader.cpp @@ -156,6 +159,12 @@ if(BUILD_TESTING) target_link_libraries(test_storage_without_metadata_file ${PROJECT_NAME}) endif() + ament_add_gmock(test_message_cache + test/rosbag2_cpp/test_message_cache.cpp) + if(TARGET test_message_cache) + target_link_libraries(test_message_cache ${PROJECT_NAME}) + endif() + # If compiling with gcc, run this test with sanitizers enabled ament_add_gmock(test_ros2_message test/rosbag2_cpp/types/test_ros2_message.cpp diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp new file mode 100644 index 0000000000..7a61485c15 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp @@ -0,0 +1,99 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_ +#define ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rosbag2_cpp/cache/message_cache.hpp" + +// This is necessary because of using stl types here. It is completely safe, because +// a) the member is not accessible from the outside +// b) there are no inline functions. +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_cpp +{ +namespace cache +{ +/** +* This class is responsible for consuming the cache using provided fuction. +* It can work with any callback conforming to the consume_callback_function_t +* signature, e.g. a storage write function. Consuming and thus the callback are +* called in a separate thread. +* +* Since the consuming callback likely involves disk operations, the main motivation +* for design is to make sure that consumer is busy anytime there is any work. +* This is realized through conditional variable and greedy buffer switching. +* +* The consumer uses MessageCache and waits for consumer buffer to be ready. This +* will happen as soon as that there are any messages put into producer buffer - +* a switch of buffer pointers will result in these messages being available for +* consumption. The consumer then proceeds to process the entire buffer in one go. +* While this is ongoing, the producer buffer is being filled with new messages. +* +* For SQLite implementation of storage, consumer callback will write consumer buffer +* in each loop iteration as a separate transaction. This results in a balancing +* mechanism for high-performance cases, where transaction size can be increased +* dynamically as previous, smaller transactions introduce delays in loop iteration. +*/ +class ROSBAG2_CPP_PUBLIC CacheConsumer +{ +public: + using consume_callback_function_t = std::function &)>; + + CacheConsumer( + std::shared_ptr message_cache, + consume_callback_function_t consume_callback); + + ~CacheConsumer(); + + /// shut down the consumer thread + void close(); + + /// Set new consume callback, restart thread if necessary + void change_consume_callback(consume_callback_function_t callback); + +private: + std::shared_ptr message_cache_; + consume_callback_function_t consume_callback_; + + /// Write buffer data to a storage + void exec_consuming(); + + /// Consumer thread shutdown sync + std::atomic_bool is_stop_issued_ {false}; + std::mutex consumer_mutex_; + + std::thread consumer_thread_; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#ifdef _WIN32 +# pragma warning(pop) +#endif + +#endif // ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp new file mode 100644 index 0000000000..a24e15348a --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp @@ -0,0 +1,123 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_ +#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +#include "rosbag2_cpp/cache/message_cache_buffer.hpp" +#include "rosbag2_cpp/visibility_control.hpp" + +#include "rosbag2_storage/serialized_bag_message.hpp" + +// This is necessary because of using stl types here. It is completely safe, because +// a) the member is not accessible from the outside +// b) there are no inline functions. +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_cpp +{ +namespace cache +{ +/** +* This class is responsible for implementing message cache, using two cache +* buffers and providing synchronization API for producer-consumer pattern. +* +* Double buffering is a part of producer-consumer pattern and optimizes for +* the consumer performance (which can be a bottleneck, e.g. disk writes). +* +* Two instances of MessageCacheBuffer are used, one for producer and one for +* the consumer. Buffers are switched through wait_for_buffer function, which +* involves synchronization and a simple pointer switch. +* +* The cache can enter a flushing state, intended as a finalization state, +* where all the remaining data is going to be processed: no new messages are +* accepted and buffer switching can be done unconditionally on consumer demand. +* +* The cache holds infomation about dropped messages (per topic). These are +* messages that were pushed to the cache when it was full. Such situation signals +* performance issues, most likely with the CacheConsumer consumer callback. +*/ +class ROSBAG2_CPP_PUBLIC MessageCache +{ +public: + explicit MessageCache(uint64_t max_buffer_size); + + ~MessageCache(); + + /// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost + void push(std::shared_ptr msg); + + /// Summarize dropped/remaining messages + void log_dropped(); + + /// Producer API: notify consumer to wake-up (primary buffer has data) + void notify_buffer_consumer(); + + /// Set the cache to consume-only mode for final buffer flush before closing + void finalize(); + + /// Notify that flushing is complete + void notify_flushing_done(); + + /** + * Consumer API: wait until primary buffer is ready and swap it with consumer buffer. + * The caller thread (consumer thread) will sleep on a conditional variable + * until it can be awaken, which is to happen when: + * a) data was inserted into the producer buffer, consuming can continue after a swap + * b) we are flushing the data (in case we missed the last notification when consuming) + **/ + void wait_for_buffer(); + + /// Consumer API: get current buffer to consume + std::shared_ptr consumer_buffer(); + + /// Exposes counts of messages dropped per topic + std::unordered_map messages_dropped() const; + +private: + /// Double buffers + std::shared_ptr primary_buffer_; + std::shared_ptr secondary_buffer_; + + /// Dropped messages per topic. Used for printing in alphabetic order + std::unordered_map messages_dropped_per_topic_; + + /// Double buffers sync (following cpp core guidelines for condition variables) + bool primary_buffer_can_be_swapped_ {false}; + std::condition_variable cache_condition_var_; + std::mutex cache_mutex_; + + /// Cache is no longer accepting messages and is in the process of flushing + std::atomic_bool flushing_ {false}; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#ifdef _WIN32 +# pragma warning(pop) +#endif + +#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp new file mode 100644 index 0000000000..2fab5ed4a5 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp @@ -0,0 +1,86 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_BUFFER_HPP_ +#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_BUFFER_HPP_ + +#include +#include +#include + +#include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +// This is necessary because of using stl types here. It is completely safe, because +// a) the member is not accessible from the outside +// b) there are no inline functions. +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_cpp +{ +namespace cache +{ +/** +* This class implements a single buffer for message cache. The buffer is byte size +* limited and won't accept any messages when current buffer byte size is already +* over the limit set by max_cache_size. This means that buffer can at times use +* more memory than max_cache_size, but never by more than a single message. When +* the buffer is full, the next incoming message is dropped. +* +* Note that it could be reused as a template with any class that has +* ->byte_size() - like interface +*/ +class ROSBAG2_CPP_PUBLIC MessageCacheBuffer +{ +public: + explicit MessageCacheBuffer(const uint64_t max_cache_size); + + using buffer_element_t = std::shared_ptr; + + /** + * If buffer size got some space left, we push message regardless of its size, but if + * this results in exceeding buffer size, we mark buffer to drop all new incoming messages. + * This flag is cleared when buffers are swapped. + */ + bool push(buffer_element_t msg); + + /// Clear buffer + void clear(); + + /// Get number of elements in the buffer + size_t size(); + + /// Get buffer data + const std::vector & data(); + +private: + std::vector buffer_; + uint64_t buffer_bytes_size_ {0u}; + const uint64_t max_bytes_size_; + + /// set when buffer is full and should drop messages instead of inserting them + std::atomic_bool drop_messages_ {false}; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#ifdef _WIN32 +# pragma warning(pop) +#endif + +#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_BUFFER_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 0938675eaa..74766e72d4 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -16,10 +16,13 @@ #define ROSBAG2_CPP__WRITERS__SEQUENTIAL_WRITER_HPP_ #include +#include #include #include #include +#include "rosbag2_cpp/cache/cache_consumer.hpp" +#include "rosbag2_cpp/cache/message_cache.hpp" #include "rosbag2_cpp/converter.hpp" #include "rosbag2_cpp/serialization_format_converter_factory.hpp" #include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp" @@ -111,18 +114,24 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter std::unique_ptr metadata_io_; std::unique_ptr converter_; + bool use_cache_; + std::shared_ptr message_cache_; + std::unique_ptr cache_consumer_; + + void switch_to_next_storage(); + + std::string format_storage_uri( + const std::string & base_folder, uint64_t storage_count); + rosbag2_storage::StorageOptions storage_options_; // Used in bagfile splitting; // specifies the best-effort maximum duration of a bagfile in seconds. std::chrono::seconds max_bagfile_duration; - // Intermediate cache to write multiple messages into the storage. - uint64_t current_cache_size_; - std::vector> cache_; - - // Used to track topic -> message count + // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer std::unordered_map topics_names_to_info_; + std::mutex topics_info_mutex_; rosbag2_storage::BagMetadata metadata_; @@ -138,9 +147,6 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter // Record TopicInformation into metadata void finalize_metadata(); - // Flush data into storage, and reset cache - void reset_cache(); - // Helper method used by write to get the message in a format that is ready to be written. // Common use cases include converting the message using the converter or // performing other operations like compression on it diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp new file mode 100644 index 0000000000..63dfd14420 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp @@ -0,0 +1,87 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rosbag2_cpp/cache/cache_consumer.hpp" +#include "rosbag2_cpp/logging.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ + +CacheConsumer::CacheConsumer( + std::shared_ptr message_cache, + consume_callback_function_t consume_callback) +: message_cache_(message_cache), + consume_callback_(consume_callback) +{ + consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this); +} + +CacheConsumer::~CacheConsumer() +{ + close(); +} + +void CacheConsumer::close() +{ + message_cache_->finalize(); + is_stop_issued_ = true; + + ROSBAG2_CPP_LOG_INFO_STREAM( + "Writing remaining messages from cache to the bag. It may take a while"); + + if (consumer_thread_.joinable()) { + consumer_thread_.join(); + } + message_cache_->notify_flushing_done(); +} + +void CacheConsumer::change_consume_callback( + CacheConsumer::consume_callback_function_t consume_callback) +{ + consume_callback_ = consume_callback; + if (!consumer_thread_.joinable()) { + is_stop_issued_ = false; + consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this); + } +} + +void CacheConsumer::exec_consuming() +{ + bool exit_flag = false; + bool flushing = false; + while (!exit_flag) { + // Invariant at loop start: consumer buffer is empty + + // swap producer buffer with consumer buffer + message_cache_->wait_for_buffer(); + + // make sure to use consistent callback for each iteration + auto callback_for_this_loop = consume_callback_; + + // consume all the data from consumer buffer + auto consumer_buffer = message_cache_->consumer_buffer(); + callback_for_this_loop(consumer_buffer->data()); + consumer_buffer->clear(); + + if (flushing) {exit_flag = true;} // this was the final run + if (is_stop_issued_) {flushing = true;} // run one final time to flush + } +} + +} // namespace cache +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp new file mode 100644 index 0000000000..44857644b5 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp @@ -0,0 +1,139 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "rosbag2_cpp/cache/message_cache.hpp" +#include "rosbag2_cpp/logging.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ + +MessageCache::MessageCache(uint64_t max_buffer_size) +{ + primary_buffer_ = std::make_shared(max_buffer_size); + secondary_buffer_ = std::make_shared(max_buffer_size); +} + +void MessageCache::push(std::shared_ptr msg) +{ + // While pushing, we keep track of inserted and dropped messages as well + bool pushed = false; + { + std::lock_guard cache_lock(cache_mutex_); + if (!flushing_) { + pushed = primary_buffer_->push(msg); + } + } + if (!pushed) { + messages_dropped_per_topic_[msg->topic_name]++; + } + + notify_buffer_consumer(); +} + +void MessageCache::finalize() +{ + { + std::lock_guard cache_lock(cache_mutex_); + flushing_ = true; + } + cache_condition_var_.notify_one(); +} + +void MessageCache::notify_flushing_done() +{ + flushing_ = false; +} + +void MessageCache::notify_buffer_consumer() +{ + { + std::lock_guard cache_lock(cache_mutex_); + primary_buffer_can_be_swapped_ = true; + } + cache_condition_var_.notify_one(); +} + +void MessageCache::wait_for_buffer() +{ + std::unique_lock lock(cache_mutex_); + if (!flushing_) { + // Required condition check to protect against spurious wakeups + cache_condition_var_.wait( + lock, [this] { + return primary_buffer_can_be_swapped_ || flushing_; + }); + primary_buffer_can_be_swapped_ = false; + } + std::swap(primary_buffer_, secondary_buffer_); +} + +std::shared_ptr MessageCache::consumer_buffer() +{ + return secondary_buffer_; +} + +std::unordered_map MessageCache::messages_dropped() const +{ + return messages_dropped_per_topic_; +} + +void MessageCache::log_dropped() +{ + uint64_t total_lost = 0; + std::string log_text("Cache buffers lost messages per topic: "); + + // worse performance than sorting key vector (neglible), but cleaner + std::map messages_dropped_per_topic_sorted( + messages_dropped_per_topic_.begin(), messages_dropped_per_topic_.end()); + + std::for_each( + messages_dropped_per_topic_.begin(), + messages_dropped_per_topic_.end(), + [&total_lost, &log_text](const auto & e) { + uint32_t lost = e.second; + if (lost > 0) { + log_text += "\n\t" + e.first + ": " + std::to_string(lost); + total_lost += lost; + } + }); + + if (total_lost > 0) { + log_text += "\nTotal lost: " + std::to_string(total_lost); + ROSBAG2_CPP_LOG_WARN_STREAM(log_text); + } + + size_t remaining = primary_buffer_->size() + secondary_buffer_->size(); + if (remaining > 0) { + ROSBAG2_CPP_LOG_WARN_STREAM( + "Cache buffers were unflushed with " << remaining << " remaining messages" + ); + } +} + +MessageCache::~MessageCache() +{ + log_dropped(); +} + +} // namespace cache +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp new file mode 100644 index 0000000000..ace6d7fb62 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp @@ -0,0 +1,63 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rosbag2_cpp/cache/message_cache_buffer.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ + +MessageCacheBuffer::MessageCacheBuffer(const uint64_t max_cache_size) +: max_bytes_size_(max_cache_size) +{ +} + +bool MessageCacheBuffer::push(buffer_element_t msg) +{ + bool pushed = false; + if (!drop_messages_) { + buffer_bytes_size_ += msg->serialized_data->buffer_length; + buffer_.push_back(msg); + pushed = true; + } + + if (buffer_bytes_size_ >= max_bytes_size_) { + drop_messages_ = true; + } + return pushed; +} + +void MessageCacheBuffer::clear() +{ + buffer_.clear(); + buffer_bytes_size_ = 0u; + drop_messages_ = false; +} + +size_t MessageCacheBuffer::size() +{ + return buffer_.size(); +} + +const std::vector & MessageCacheBuffer::data() +{ + return buffer_; +} + +} // namespace cache +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 583e7e42e7..34b0e35192 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -19,7 +19,9 @@ #include #include #include +#include #include +#include #include "rcpputils/filesystem_helper.hpp" @@ -34,22 +36,28 @@ namespace writers namespace { -std::string format_storage_uri(const std::string & base_folder, uint64_t storage_count) -{ - // Right now `base_folder_` is always just the folder name for where to install the bagfile. - // The name of the folder needs to be queried in case - // SequentialWriter is opened with a relative path. - std::stringstream storage_file_name; - storage_file_name << rcpputils::fs::path(base_folder).filename().string() << "_" << storage_count; - - return (rcpputils::fs::path(base_folder) / storage_file_name.str()).string(); -} - std::string strip_parent_path(const std::string & relative_path) { return rcpputils::fs::path(relative_path).filename().string(); } +rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t make_callback( + std::shared_ptr storage_interface, + std::unordered_map & topics_info_map, + std::mutex & topics_mutex) +{ + return [callback_interface = storage_interface, &topics_info_map, &topics_mutex]( + const std::vector> & msgs) { + callback_interface->write(msgs); + for (const auto & msg : msgs) { + // count messages as successfully written + std::lock_guard lock(topics_mutex); + if (topics_info_map.find(msg->topic_name) != topics_info_map.end()) { + topics_info_map[msg->topic_name].message_count++; + } + } + }; +} } // namespace SequentialWriter::SequentialWriter( @@ -86,9 +94,6 @@ void SequentialWriter::open( base_folder_ = storage_options.uri; storage_options_ = storage_options; - cache_.reserve(storage_options.max_cache_size); - current_cache_size_ = 0u; - if (converter_options.output_serialization_format != converter_options.input_serialization_format) { @@ -126,17 +131,33 @@ void SequentialWriter::open( throw std::runtime_error{error.str()}; } + use_cache_ = storage_options.max_cache_size > 0u; + if (use_cache_) { + message_cache_ = std::make_shared( + storage_options.max_cache_size); + cache_consumer_ = std::make_unique( + message_cache_, + make_callback( + storage_, + topics_names_to_info_, + topics_info_mutex_)); + } init_metadata(); } void SequentialWriter::reset() { + if (use_cache_) { + // destructor will flush message cache + cache_consumer_.reset(); + message_cache_.reset(); + } + if (!base_folder_.empty()) { finalize_metadata(); metadata_io_->write_metadata(base_folder_, metadata_); } - reset_cache(); storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory storage_factory_.reset(); } @@ -157,10 +178,15 @@ void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic rosbag2_storage::TopicInformation info{}; info.topic_metadata = topic_with_type; - const auto insert_res = topics_names_to_info_.insert( - std::make_pair(topic_with_type.name, info)); + bool insert_succeded = false; + { + std::lock_guard lock(topics_info_mutex_); + const auto insert_res = topics_names_to_info_.insert( + std::make_pair(topic_with_type.name, info)); + insert_succeded = insert_res.second; + } - if (!insert_res.second) { + if (!insert_succeded) { std::stringstream errmsg; errmsg << "Failed to insert topic \"" << topic_with_type.name << "\"!"; @@ -177,7 +203,13 @@ void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic throw std::runtime_error("Bag is not open. Call open() before removing."); } - if (topics_names_to_info_.erase(topic_with_type.name) > 0) { + bool erased = false; + { + std::lock_guard lock(topics_info_mutex_); + erased = topics_names_to_info_.erase(topic_with_type.name) > 0; + } + + if (erased) { storage_->remove_topic(topic_with_type); } else { std::stringstream errmsg; @@ -188,8 +220,26 @@ void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic } } -void SequentialWriter::split_bagfile() +std::string SequentialWriter::format_storage_uri( + const std::string & base_folder, uint64_t storage_count) +{ + // Right now `base_folder_` is always just the folder name for where to install the bagfile. + // The name of the folder needs to be queried in case + // SequentialWriter is opened with a relative path. + std::stringstream storage_file_name; + storage_file_name << rcpputils::fs::path(base_folder).filename().string() << "_" << storage_count; + + return (rcpputils::fs::path(base_folder) / storage_file_name.str()).string(); +} + +void SequentialWriter::switch_to_next_storage() { + // consumer remaining message cache + if (use_cache_) { + cache_consumer_->close(); + message_cache_->log_dropped(); + } + storage_options_.uri = format_storage_uri( base_folder_, metadata_.relative_file_paths.size()); @@ -202,12 +252,26 @@ void SequentialWriter::split_bagfile() throw std::runtime_error(errmsg.str()); } - metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path())); - // Re-register all topics since we rolled-over to a new bagfile. for (const auto & topic : topics_names_to_info_) { storage_->create_topic(topic.second.topic_metadata); } + + // Set new storage in buffer layer and restart consumer thread + if (use_cache_) { + cache_consumer_->change_consume_callback( + make_callback( + storage_, + topics_names_to_info_, + topics_info_mutex_)); + } +} + +void SequentialWriter::split_bagfile() +{ + switch_to_next_storage(); + + metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path())); } void SequentialWriter::write(std::shared_ptr message) @@ -216,9 +280,10 @@ void SequentialWriter::write(std::shared_ptrtopic_name).message_count; + topic_information = &topics_names_to_info_.at(message->topic_name); } catch (const std::out_of_range & /* oor */) { std::stringstream errmsg; errmsg << "Failed to write on topic '" << message->topic_name << @@ -241,15 +306,14 @@ void SequentialWriter::write(std::shared_ptrwrite(converted_msg); + ++topic_information->message_count; } else { - cache_.push_back(converted_msg); - current_cache_size_ += converted_msg->serialized_data->buffer_length; - if (current_cache_size_ >= storage_options_.max_cache_size) { - reset_cache(); - } + // Otherwise, use cache buffer + message_cache_->push(converted_msg); } } @@ -308,16 +372,5 @@ void SequentialWriter::finalize_metadata() } } -void SequentialWriter::reset_cache() -{ - // if cache data exists, it must flush the data into the storage - if (!cache_.empty()) { - storage_->write(cache_); - // reset cache - cache_.clear(); - cache_.reserve(storage_options_.max_cache_size); - current_cache_size_ = 0u; - } -} } // namespace writers } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_cache_consumer.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_cache_consumer.hpp new file mode 100644 index 0000000000..2895aebb48 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_cache_consumer.hpp @@ -0,0 +1,33 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__MOCK_CACHE_CONSUMER_HPP_ +#define ROSBAG2_CPP__MOCK_CACHE_CONSUMER_HPP_ + +#include +#include + +#include "mock_message_cache.hpp" +#include "rosbag2_cpp/cache/cache_consumer.hpp" + +class MockCacheConsumer : public rosbag2_cpp::cache::CacheConsumer +{ +public: + MockCacheConsumer( + std::shared_ptr message_cache, + rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t consume_callback) + : rosbag2_cpp::cache::CacheConsumer(message_cache, consume_callback) {} +}; + +#endif // ROSBAG2_CPP__MOCK_CACHE_CONSUMER_HPP_ diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp new file mode 100644 index 0000000000..e32c708e6d --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp @@ -0,0 +1,36 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__MOCK_MESSAGE_CACHE_HPP_ +#define ROSBAG2_CPP__MOCK_MESSAGE_CACHE_HPP_ + +#include + +#include +#include +#include + +#include "rosbag2_cpp/cache/message_cache.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +class MockMessageCache : public rosbag2_cpp::cache::MessageCache +{ +public: + explicit MockMessageCache(uint64_t max_buffer_size) + : rosbag2_cpp::cache::MessageCache(max_buffer_size) {} + + MOCK_METHOD0(log_dropped, void()); +}; + +#endif // ROSBAG2_CPP__MOCK_MESSAGE_CACHE_HPP_ diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp new file mode 100644 index 0000000000..4bb3a044e7 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp @@ -0,0 +1,152 @@ +// Copyright 2020, Robotec.ai sp. z o.o. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rosbag2_storage/ros_helper.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +#include "mock_cache_consumer.hpp" +#include "mock_message_cache.hpp" + +using namespace testing; // NOLINT + +namespace +{ +std::shared_ptr make_test_msg() +{ + static uint32_t counter = 0; + std::string msg_content = "Hello" + std::to_string(counter++); + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + return message; +} + +uint32_t sum_up(const std::unordered_map & map) +{ + return std::accumulate( + std::begin(map), + std::end(map), + 0, + [](const uint32_t previous, const auto & element) { + return previous + element.second; + } + ); +} +} // namespace + +class MessageCacheTest : public Test +{ +public: + MessageCacheTest() {} + + virtual ~MessageCacheTest() = default; + + const uint64_t cache_size_ {1 * 1000}; // ~1 Kb cache +}; + +TEST_F(MessageCacheTest, message_cache_writes_full_producer_buffer) { + const uint32_t message_count = 100; + uint64_t size_bytes_so_far = 0; + size_t should_be_dropped_count = 0; + size_t consumed_message_count {0}; + + auto mock_message_cache = std::make_shared>( + cache_size_); + + for (uint32_t i = 0; i < message_count; ++i) { + auto msg = make_test_msg(); + size_t serialized_data_size = msg->serialized_data->buffer_length; + mock_message_cache->push(msg); + if (cache_size_ < size_bytes_so_far) { + should_be_dropped_count++; + } + size_bytes_so_far += serialized_data_size; + } + + auto total_actually_dropped = sum_up(mock_message_cache->messages_dropped()); + EXPECT_EQ(should_be_dropped_count, total_actually_dropped); + + auto cb = [&consumed_message_count]( + const std::vector> & msgs) { + consumed_message_count += msgs.size(); + }; + + auto mock_cache_consumer = std::make_unique>( + mock_message_cache, + cb); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(20ms); + + mock_cache_consumer->close(); + EXPECT_EQ(consumed_message_count, message_count - should_be_dropped_count); +} + +TEST_F(MessageCacheTest, message_cache_changing_callback) { + const uint32_t message_count = 50; + + size_t callback1_counter = 0; + size_t callback2_counter = 0; + + auto mock_message_cache = std::make_shared>( + cache_size_); + + auto mock_cache_consumer = std::make_unique>( + mock_message_cache, + [&callback1_counter]( + const std::vector> & msgs) { + callback1_counter += msgs.size(); + }); + + rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t cb2 = + [&callback2_counter]( + const std::vector> & msgs) { + callback2_counter += msgs.size(); + }; + + uint32_t counter = 0; + for (uint32_t i = 0; i < message_count; ++i) { + if (counter >= message_count / 2) { + mock_cache_consumer->change_consume_callback(cb2); + } + auto msg = make_test_msg(); + mock_message_cache->push(msg); + counter++; + } + + auto total_actually_dropped = sum_up(mock_message_cache->messages_dropped()); + EXPECT_EQ(total_actually_dropped, 0u); + + mock_cache_consumer->close(); + + using namespace std::chrono_literals; + std::this_thread::sleep_for(20ms); + + size_t sum_consumed = callback1_counter + callback2_counter; + + EXPECT_EQ(sum_consumed, message_count); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index e0a80cbd09..0ff5bfbd72 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -265,41 +265,6 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf } } -TEST_F(SequentialWriterTest, only_write_after_cache_is_full) { - const uint64_t counter = 1000; - const uint64_t max_cache_size = 100; - std::string msg_content = "Hello"; - const auto msg_length = msg_content.length(); - EXPECT_CALL( - *storage_, - write(An> &>())). - Times(static_cast(counter * msg_length / max_cache_size)); - EXPECT_CALL( - *storage_, - write(An>())).Times(0); - - auto sequential_writer = std::make_unique( - std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); - - std::string rmw_format = "rmw_format"; - - auto message = std::make_shared(); - message->topic_name = "test_topic"; - message->serialized_data = rosbag2_storage::make_serialized_message( - msg_content.c_str(), msg_length); - - storage_options_.max_bagfile_size = 0; - storage_options_.max_cache_size = max_cache_size; - - writer_->open(storage_options_, {rmw_format, rmw_format}); - writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""}); - - for (auto i = 0u; i < counter; ++i) { - writer_->write(message); - } -} - TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) { const size_t counter = 1000; const uint64_t max_cache_size = 0; diff --git a/rosbag2_performance/rosbag2_performance_writer_benchmarking/include/rosbag2_performance_writer_benchmarking/writer_benchmark.hpp b/rosbag2_performance/rosbag2_performance_writer_benchmarking/include/rosbag2_performance_writer_benchmarking/writer_benchmark.hpp index 5418323557..73ae08ca5e 100644 --- a/rosbag2_performance/rosbag2_performance_writer_benchmarking/include/rosbag2_performance_writer_benchmarking/writer_benchmark.hpp +++ b/rosbag2_performance/rosbag2_performance_writer_benchmarking/include/rosbag2_performance_writer_benchmarking/writer_benchmark.hpp @@ -43,10 +43,10 @@ class WriterBenchmark : public rclcpp::Node unsigned int instances_; std::string db_folder_; std::string results_file_; - std::shared_ptr writer_; std::vector producer_threads_; std::vector> producers_; std::vector> queues_; + std::shared_ptr writer_; }; #endif // ROSBAG2_PERFORMANCE_WRITER_BENCHMARKING__WRITER_BENCHMARK_HPP_