From e4b52e62ba572987b08a88a98662b582f57afb07 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Tue, 28 Nov 2023 15:28:50 +0100 Subject: [PATCH] recursive_mutex and call it a day --- .../include/data_tamer/sinks/mcap_sink.hpp | 23 +++++-- data_tamer/src/sinks/mcap_sink.cpp | 65 +++++++++---------- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/data_tamer/include/data_tamer/sinks/mcap_sink.hpp b/data_tamer/include/data_tamer/sinks/mcap_sink.hpp index 28bbbdb..77f911c 100644 --- a/data_tamer/include/data_tamer/sinks/mcap_sink.hpp +++ b/data_tamer/include/data_tamer/sinks/mcap_sink.hpp @@ -1,7 +1,6 @@ #pragma once #include "data_tamer/data_sink.hpp" -#include "data_tamer/details/mutex.hpp" #include #include @@ -27,11 +26,12 @@ class MCAPSink : public DataSinkBase { public: /** - * @brief MCAPSink + * @brief MCAPSink. + * IMPORTANT: if you want the recorder to be more robust to crash/segfault, + * set `do_compression` to false * * @param filepath path of the file to be saved. Should have extension ".mcap" - * @param do_compression if true, compress the data on the fly. Note that, in case of a crash/segfault, - * some of the data may (will!) be lost; it is therefore more conservative to leave this to false. + * @param do_compression if true, compress the data on the fly. */ explicit MCAPSink(std::string const& filepath, bool do_compression = false); @@ -45,9 +45,19 @@ class MCAPSink : public DataSinkBase /// and overwritten. Default value is 600 seconds (10 minutes) void setMaxTimeBeforeReset(std::chrono::seconds reset_time); - /// Stop recording (can't be restarted) and save the file + /// Stop recording and save the file void stopRecording(); + /** + * @brief restartRecording saves the current file (unless we did it already, + * calling stopRecording) and start recording into a new one. + * Note that all the registered channels and their schemas will be copied into the new file. + * + * @param filepath file path of the new file (should be ".mcap" extension) + * @param do_compression if true, compress the data on the fly. + */ + void restartRecording(std::string const& filepath, bool do_compression = false); + private: std::string filepath_; bool compression_ = false; @@ -60,8 +70,7 @@ class MCAPSink : public DataSinkBase std::chrono::system_clock::time_point start_time_; bool forced_stop_recording_ = false; - Mutex schema_mutex_; - std::recursive_mutex writer_mutex_; + std::recursive_mutex mutex_; void openFile(std::string const& filepath); }; diff --git a/data_tamer/src/sinks/mcap_sink.cpp b/data_tamer/src/sinks/mcap_sink.cpp index f60c8f4..2d4781d 100644 --- a/data_tamer/src/sinks/mcap_sink.cpp +++ b/data_tamer/src/sinks/mcap_sink.cpp @@ -40,7 +40,7 @@ MCAPSink::MCAPSink(const std::string& filepath, bool do_compression) : filepath_ void DataTamer::MCAPSink::openFile(std::string const& filepath) { - std::scoped_lock wlk(writer_mutex_); + std::scoped_lock lk(mutex_); writer_ = std::make_unique(); mcap::McapWriterOptions options(kDataTamer); options.compression = compression_ ? mcap::Compression::Zstd : mcap::Compression::None; @@ -57,16 +57,12 @@ void DataTamer::MCAPSink::openFile(std::string const& filepath) MCAPSink::~MCAPSink() { stopThread(); - std::scoped_lock wlk(writer_mutex_); - if(forced_stop_recording_) - { - writer_->close(); - } + std::scoped_lock lk(mutex_); } void MCAPSink::addChannel(std::string const& channel_name, Schema const& schema) { - std::scoped_lock lk(schema_mutex_); + std::scoped_lock lk(mutex_); schemas_[channel_name] = schema; auto it = hash_to_channel_id_.find(schema.hash); if (it != hash_to_channel_id_.end()) @@ -79,22 +75,20 @@ void MCAPSink::addChannel(std::string const& channel_name, Schema const& schema) std::string schema_str = ss.str(); auto const schema_name = channel_name + "::" + std::to_string(schema.hash); - { - std::scoped_lock wlk(writer_mutex_); - // Register a Schema - mcap::Schema mcap_schema(schema_name, kDataTamer, schema_str); - writer_->addSchema(mcap_schema); - - // Register a Channel - mcap::Channel publisher(channel_name, kDataTamer, mcap_schema.id); - writer_->addChannel(publisher); - hash_to_channel_id_[schema.hash] = publisher.id; - } + + // Register a Schema + mcap::Schema mcap_schema(schema_name, kDataTamer, schema_str); + writer_->addSchema(mcap_schema); + + // Register a Channel + mcap::Channel publisher(channel_name, kDataTamer, mcap_schema.id); + writer_->addChannel(publisher); + hash_to_channel_id_[schema.hash] = publisher.id; } bool MCAPSink::storeSnapshot(const Snapshot& snapshot) { - std::scoped_lock wlk(writer_mutex_); + std::scoped_lock lk(mutex_); if(forced_stop_recording_) { return false; @@ -111,10 +105,7 @@ bool MCAPSink::storeSnapshot(const Snapshot& snapshot) // Write our message mcap::Message msg; - { - std::scoped_lock lk(schema_mutex_); - msg.channelId = hash_to_channel_id_.at(snapshot.schema_hash); - } + msg.channelId = hash_to_channel_id_.at(snapshot.schema_hash); msg.sequence = 1; // Optional // Timestamp requires nanosecond msg.logTime = mcap::Timestamp(snapshot.timestamp.count()); @@ -128,16 +119,7 @@ bool MCAPSink::storeSnapshot(const Snapshot& snapshot) auto const now = std::chrono::system_clock::now(); if (now - start_time_ > reset_time_) { - // close current file - writer_->close(); - // reopen the same path (overwrite) - openFile(filepath_); - - // rebuild the channels - for (auto const& [name, schema] : schemas_) - { - addChannel(name, schema); - } + restartRecording(filepath_, compression_); } return true; } @@ -149,9 +131,24 @@ void MCAPSink::setMaxTimeBeforeReset(std::chrono::seconds reset_time) void MCAPSink::stopRecording() { - std::scoped_lock wlk(writer_mutex_); + std::scoped_lock lk(mutex_); forced_stop_recording_ = true; writer_->close(); + writer_.reset(); +} + +void MCAPSink::restartRecording(const std::string &filepath, bool do_compression) +{ + std::scoped_lock lk(mutex_); + filepath_ = filepath; + compression_ = do_compression; + openFile(filepath_); + + // rebuild the channels + for (auto const& [name, schema] : schemas_) + { + addChannel(name, schema); + } } } // namespace DataTamer