Skip to content

Commit

Permalink
Sqlite storage double buffering (#546)
Browse files Browse the repository at this point in the history
* Double buffers

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Circular queue and FLUSH option as define

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Minor naming and lexical fixes.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Removed FLUSH_BUFFERS define check.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Sqlite3 storage logging fixes.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Sqlite3 storage circular buffer with pre allocated memory.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Sqlite3 storage buffers moved to shared_ptrs.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Uncrustify

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Moved double buffers to writer

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Buffer layer reset in seq compression writer in rosbag2 cpp

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Buffer layer for rosbag2 writer refactor

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Changed buffers in BufferLayer to std vectors.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* BufferLayer uncrustify

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Removed non-applicable test for writer cache.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* BufferLayer review fixes

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Rosbag metadata msgs count fixed for BufferLayer

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Condition variable for buffer layer sync.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Fixed buffer locks

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Buffers in BufferLayer refactored, moved into new class

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Buffer layer split bags fixed.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Storage options include fix in buffer layer header.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Mutex around swapping buffers in buffer layer.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Fixed cache 0 bug in buffer layer.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Minor buffer layer refactor.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Counting messages in writer refactored.

Signed-off-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>

* Changed default cache size to 100Mb and updated parameter description

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* Applied review remarks:
- significant refactoring: separation of cache classes
- applied suggested improvements
- some renaming
- reduce code duplication that would otherwise increase with cache refactor, between compression and plain writers

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* Applied review comments
- cache consumer now takes a callback and is independent of storage
- namespace changes, renaming, cleaning
- counting and logging messages by topic

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* linter

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* Changes after review: fixing flushing, topic counts, and more

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* Fix for splitting - flushing state now correctly turns off

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* cache classes documentation

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* simplified signature

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* a couple of tests for cache

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* address review: explicit constructor and doxygen styling

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* Windows warnings fix

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* fixed type mismatch warning on Windows

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

* added minor comment

Signed-off-by: Adam Dabrowski <adam.dabrowski@robotec.ai>

Co-authored-by: Piotr Jaroszek <piotr.jaroszek@robotec.ai>
  • Loading branch information
adamdbrw and pijaro authored Nov 25, 2020
1 parent 0c1ea6a commit c38630b
Show file tree
Hide file tree
Showing 16 changed files with 948 additions and 118 deletions.
12 changes: 7 additions & 5 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'the bag will split at whichever threshold is reached first.'
)
parser.add_argument(
'--max-cache-size', type=int, default=1024*1024,
help='maximum size (in bytes) of messages to hold in cache before writing to disk. '
'Default is 1 mebibyte, everytime the cache size equals or exceeds 1MB, '
'it will be written to disk. If the value specified is 0, then every message is'
'directly written to disk.'
'--max-cache-size', type=int, default=100*1024*1024,
help='maximum size (in bytes) of messages to hold in each buffer of cache.'
'Default is 100 mebibytes. The cache is handled through double buffering, '
'which means that in pessimistic case up to twice the parameter value of memory'
'is needed. A rule of thumb is to cache an order of magitude corresponding to'
'about one second of total recorded data volume.'
'If the value specified is 0, then every message is directly written to disk.'
)
parser.add_argument(
'--compression-mode', type=str, default='none',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,6 @@
namespace rosbag2_compression
{

namespace
{
std::string format_storage_uri(const std::string & base_folder, uint64_t storage_count)
{
// Right now `base_folder_` is always just the folder name for where to install the bagfile.
// The name of the folder needs to be queried in case
// SequentialWriter is opened with a relative path.
std::stringstream storage_file_name;
storage_file_name << rcpputils::fs::path(base_folder).filename().string() << "_" << storage_count;

return (rcpputils::fs::path(base_folder) / storage_file_name.str()).string();
}
} // namespace

SequentialCompressionWriter::SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options)
: SequentialWriter(),
Expand Down Expand Up @@ -123,6 +109,10 @@ void SequentialCompressionWriter::reset()
metadata_io_->write_metadata(base_folder_, metadata_);
}

if (use_cache_) {
cache_consumer_.reset();
message_cache_.reset();
}
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
}
Expand Down Expand Up @@ -155,11 +145,7 @@ void SequentialCompressionWriter::compress_last_file()

void SequentialCompressionWriter::split_bagfile()
{
storage_options_.uri = format_storage_uri(
base_folder_,
metadata_.relative_file_paths.size());

storage_ = storage_factory_->open_read_write(storage_options_);
switch_to_next_storage();

if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
compress_last_file();
Expand All @@ -169,18 +155,9 @@ void SequentialCompressionWriter::split_bagfile()
// Add a check to make sure reset() does not compress the file again if we couldn't load the
// storage plugin.
should_compress_last_file_ = false;

std::stringstream errmsg;
errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!";
throw std::runtime_error{errmsg.str()};
}

metadata_.relative_file_paths.push_back(storage_->get_relative_file_path());

// Re-register all topics since we rolled-over to a new bagfile.
for (const auto & topic : topics_names_to_info_) {
storage_->create_topic(topic.second.topic_metadata);
}
}

void SequentialCompressionWriter::compress_message(
Expand Down
9 changes: 9 additions & 0 deletions rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ find_package(rosidl_typesupport_cpp REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)

add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/cache/cache_consumer.cpp
src/rosbag2_cpp/cache/message_cache_buffer.cpp
src/rosbag2_cpp/cache/message_cache.cpp
src/rosbag2_cpp/converter.cpp
src/rosbag2_cpp/info.cpp
src/rosbag2_cpp/reader.cpp
Expand Down Expand Up @@ -156,6 +159,12 @@ if(BUILD_TESTING)
target_link_libraries(test_storage_without_metadata_file ${PROJECT_NAME})
endif()

ament_add_gmock(test_message_cache
test/rosbag2_cpp/test_message_cache.cpp)
if(TARGET test_message_cache)
target_link_libraries(test_message_cache ${PROJECT_NAME})
endif()

# If compiling with gcc, run this test with sanitizers enabled
ament_add_gmock(test_ros2_message
test/rosbag2_cpp/types/test_ros2_message.cpp
Expand Down
99 changes: 99 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2020, Robotec.ai sp. z o.o.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_
#define ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include "rosbag2_cpp/cache/message_cache.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
// b) there are no inline functions.
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif

namespace rosbag2_cpp
{
namespace cache
{
/**
* This class is responsible for consuming the cache using provided fuction.
* It can work with any callback conforming to the consume_callback_function_t
* signature, e.g. a storage write function. Consuming and thus the callback are
* called in a separate thread.
*
* Since the consuming callback likely involves disk operations, the main motivation
* for design is to make sure that consumer is busy anytime there is any work.
* This is realized through conditional variable and greedy buffer switching.
*
* The consumer uses MessageCache and waits for consumer buffer to be ready. This
* will happen as soon as that there are any messages put into producer buffer -
* a switch of buffer pointers will result in these messages being available for
* consumption. The consumer then proceeds to process the entire buffer in one go.
* While this is ongoing, the producer buffer is being filled with new messages.
*
* For SQLite implementation of storage, consumer callback will write consumer buffer
* in each loop iteration as a separate transaction. This results in a balancing
* mechanism for high-performance cases, where transaction size can be increased
* dynamically as previous, smaller transactions introduce delays in loop iteration.
*/
class ROSBAG2_CPP_PUBLIC CacheConsumer
{
public:
using consume_callback_function_t = std::function<void (const
std::vector<MessageCacheBuffer::buffer_element_t> &)>;

CacheConsumer(
std::shared_ptr<MessageCache> message_cache,
consume_callback_function_t consume_callback);

~CacheConsumer();

/// shut down the consumer thread
void close();

/// Set new consume callback, restart thread if necessary
void change_consume_callback(consume_callback_function_t callback);

private:
std::shared_ptr<MessageCache> message_cache_;
consume_callback_function_t consume_callback_;

/// Write buffer data to a storage
void exec_consuming();

/// Consumer thread shutdown sync
std::atomic_bool is_stop_issued_ {false};
std::mutex consumer_mutex_;

std::thread consumer_thread_;
};

} // namespace cache
} // namespace rosbag2_cpp

#ifdef _WIN32
# pragma warning(pop)
#endif

#endif // ROSBAG2_CPP__CACHE__CACHE_CONSUMER_HPP_
123 changes: 123 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2020, Robotec.ai sp. z o.o.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_
#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_

#include <atomic>
#include <condition_variable>
#include <unordered_map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "rosbag2_cpp/cache/message_cache_buffer.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
// b) there are no inline functions.
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif

namespace rosbag2_cpp
{
namespace cache
{
/**
* This class is responsible for implementing message cache, using two cache
* buffers and providing synchronization API for producer-consumer pattern.
*
* Double buffering is a part of producer-consumer pattern and optimizes for
* the consumer performance (which can be a bottleneck, e.g. disk writes).
*
* Two instances of MessageCacheBuffer are used, one for producer and one for
* the consumer. Buffers are switched through wait_for_buffer function, which
* involves synchronization and a simple pointer switch.
*
* The cache can enter a flushing state, intended as a finalization state,
* where all the remaining data is going to be processed: no new messages are
* accepted and buffer switching can be done unconditionally on consumer demand.
*
* The cache holds infomation about dropped messages (per topic). These are
* messages that were pushed to the cache when it was full. Such situation signals
* performance issues, most likely with the CacheConsumer consumer callback.
*/
class ROSBAG2_CPP_PUBLIC MessageCache
{
public:
explicit MessageCache(uint64_t max_buffer_size);

~MessageCache();

/// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg);

/// Summarize dropped/remaining messages
void log_dropped();

/// Producer API: notify consumer to wake-up (primary buffer has data)
void notify_buffer_consumer();

/// Set the cache to consume-only mode for final buffer flush before closing
void finalize();

/// Notify that flushing is complete
void notify_flushing_done();

/**
* Consumer API: wait until primary buffer is ready and swap it with consumer buffer.
* The caller thread (consumer thread) will sleep on a conditional variable
* until it can be awaken, which is to happen when:
* a) data was inserted into the producer buffer, consuming can continue after a swap
* b) we are flushing the data (in case we missed the last notification when consuming)
**/
void wait_for_buffer();

/// Consumer API: get current buffer to consume
std::shared_ptr<MessageCacheBuffer> consumer_buffer();

/// Exposes counts of messages dropped per topic
std::unordered_map<std::string, uint32_t> messages_dropped() const;

private:
/// Double buffers
std::shared_ptr<MessageCacheBuffer> primary_buffer_;
std::shared_ptr<MessageCacheBuffer> secondary_buffer_;

/// Dropped messages per topic. Used for printing in alphabetic order
std::unordered_map<std::string, uint32_t> messages_dropped_per_topic_;

/// Double buffers sync (following cpp core guidelines for condition variables)
bool primary_buffer_can_be_swapped_ {false};
std::condition_variable cache_condition_var_;
std::mutex cache_mutex_;

/// Cache is no longer accepting messages and is in the process of flushing
std::atomic_bool flushing_ {false};
};

} // namespace cache
} // namespace rosbag2_cpp

#ifdef _WIN32
# pragma warning(pop)
#endif

#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_HPP_
Loading

0 comments on commit c38630b

Please sign in to comment.