Skip to content

Commit

Permalink
recursive_mutex and call it a day
Browse files Browse the repository at this point in the history
  • Loading branch information
facontidavide committed Nov 28, 2023
1 parent 9d26161 commit e4b52e6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
23 changes: 16 additions & 7 deletions data_tamer/include/data_tamer/sinks/mcap_sink.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "data_tamer/data_sink.hpp"
#include "data_tamer/details/mutex.hpp"

#include <atomic>
#include <condition_variable>
Expand All @@ -27,11 +26,12 @@ class MCAPSink : public DataSinkBase
{
public:
/**
* @brief MCAPSink
* @brief MCAPSink.
* IMPORTANT: if you want the recorder to be more robust to crash/segfault,
* set `do_compression` to false
*
* @param filepath path of the file to be saved. Should have extension ".mcap"
* @param do_compression if true, compress the data on the fly. Note that, in case of a crash/segfault,
* some of the data may (will!) be lost; it is therefore more conservative to leave this to false.
* @param do_compression if true, compress the data on the fly.
*/
explicit MCAPSink(std::string const& filepath, bool do_compression = false);

Expand All @@ -45,9 +45,19 @@ class MCAPSink : public DataSinkBase
/// and overwritten. Default value is 600 seconds (10 minutes)
void setMaxTimeBeforeReset(std::chrono::seconds reset_time);

/// Stop recording (can't be restarted) and save the file
/// Stop recording and save the file
void stopRecording();

/**
* @brief restartRecording saves the current file (unless we did it already,
* calling stopRecording) and start recording into a new one.
* Note that all the registered channels and their schemas will be copied into the new file.
*
* @param filepath file path of the new file (should be ".mcap" extension)
* @param do_compression if true, compress the data on the fly.
*/
void restartRecording(std::string const& filepath, bool do_compression = false);

private:
std::string filepath_;
bool compression_ = false;
Expand All @@ -60,8 +70,7 @@ class MCAPSink : public DataSinkBase
std::chrono::system_clock::time_point start_time_;

bool forced_stop_recording_ = false;
Mutex schema_mutex_;
std::recursive_mutex writer_mutex_;
std::recursive_mutex mutex_;

void openFile(std::string const& filepath);
};
Expand Down
65 changes: 31 additions & 34 deletions data_tamer/src/sinks/mcap_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ MCAPSink::MCAPSink(const std::string& filepath, bool do_compression) : filepath_

void DataTamer::MCAPSink::openFile(std::string const& filepath)
{
std::scoped_lock wlk(writer_mutex_);
std::scoped_lock lk(mutex_);
writer_ = std::make_unique<mcap::McapWriter>();
mcap::McapWriterOptions options(kDataTamer);
options.compression = compression_ ? mcap::Compression::Zstd : mcap::Compression::None;
Expand All @@ -57,16 +57,12 @@ void DataTamer::MCAPSink::openFile(std::string const& filepath)
MCAPSink::~MCAPSink()
{
stopThread();
std::scoped_lock wlk(writer_mutex_);
if(forced_stop_recording_)
{
writer_->close();
}
std::scoped_lock lk(mutex_);
}

void MCAPSink::addChannel(std::string const& channel_name, Schema const& schema)
{
std::scoped_lock lk(schema_mutex_);
std::scoped_lock lk(mutex_);
schemas_[channel_name] = schema;
auto it = hash_to_channel_id_.find(schema.hash);
if (it != hash_to_channel_id_.end())
Expand All @@ -79,22 +75,20 @@ void MCAPSink::addChannel(std::string const& channel_name, Schema const& schema)
std::string schema_str = ss.str();

auto const schema_name = channel_name + "::" + std::to_string(schema.hash);
{
std::scoped_lock wlk(writer_mutex_);
// Register a Schema
mcap::Schema mcap_schema(schema_name, kDataTamer, schema_str);
writer_->addSchema(mcap_schema);

// Register a Channel
mcap::Channel publisher(channel_name, kDataTamer, mcap_schema.id);
writer_->addChannel(publisher);
hash_to_channel_id_[schema.hash] = publisher.id;
}

// Register a Schema
mcap::Schema mcap_schema(schema_name, kDataTamer, schema_str);
writer_->addSchema(mcap_schema);

// Register a Channel
mcap::Channel publisher(channel_name, kDataTamer, mcap_schema.id);
writer_->addChannel(publisher);
hash_to_channel_id_[schema.hash] = publisher.id;
}

bool MCAPSink::storeSnapshot(const Snapshot& snapshot)
{
std::scoped_lock wlk(writer_mutex_);
std::scoped_lock lk(mutex_);
if(forced_stop_recording_)
{
return false;
Expand All @@ -111,10 +105,7 @@ bool MCAPSink::storeSnapshot(const Snapshot& snapshot)

// Write our message
mcap::Message msg;
{
std::scoped_lock lk(schema_mutex_);
msg.channelId = hash_to_channel_id_.at(snapshot.schema_hash);
}
msg.channelId = hash_to_channel_id_.at(snapshot.schema_hash);
msg.sequence = 1; // Optional
// Timestamp requires nanosecond
msg.logTime = mcap::Timestamp(snapshot.timestamp.count());
Expand All @@ -128,16 +119,7 @@ bool MCAPSink::storeSnapshot(const Snapshot& snapshot)
auto const now = std::chrono::system_clock::now();
if (now - start_time_ > reset_time_)
{
// close current file
writer_->close();
// reopen the same path (overwrite)
openFile(filepath_);

// rebuild the channels
for (auto const& [name, schema] : schemas_)
{
addChannel(name, schema);
}
restartRecording(filepath_, compression_);
}
return true;
}
Expand All @@ -149,9 +131,24 @@ void MCAPSink::setMaxTimeBeforeReset(std::chrono::seconds reset_time)

void MCAPSink::stopRecording()
{
std::scoped_lock wlk(writer_mutex_);
std::scoped_lock lk(mutex_);
forced_stop_recording_ = true;
writer_->close();
writer_.reset();
}

void MCAPSink::restartRecording(const std::string &filepath, bool do_compression)
{
std::scoped_lock lk(mutex_);
filepath_ = filepath;
compression_ = do_compression;
openFile(filepath_);

// rebuild the channels
for (auto const& [name, schema] : schemas_)
{
addChannel(name, schema);
}
}

} // namespace DataTamer

0 comments on commit e4b52e6

Please sign in to comment.