Skip to content

Commit

Permalink
Merge pull request #1231 from drmingdrmer/no-sync
Browse files Browse the repository at this point in the history
[store] fix: do not fsync sled data in unittets. File::sync_all() takes 10 ~ 30 ms, at worst 500 ms on a Mac
  • Loading branch information
databend-bot authored Jul 30, 2021
2 parents 5e8b747 + f5bcfc9 commit 6adf22e
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 61 deletions.
15 changes: 15 additions & 0 deletions fusestore/store/src/configs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ pub struct Config {
)]
pub meta_dir: String,

#[structopt(
long,
env = "FUSE_STORE_META_NO_SYNC",
help = concat!("Whether to fsync meta to disk for every meta write(raft log, state machine etc).",
" No-sync brings risks of data loss during a crash.",
" You should only use this in a testing environment, unless YOU KNOW WHAT YOU ARE DOING."
),
)]
pub meta_no_sync: bool,

// raft config
#[structopt(
long,
Expand Down Expand Up @@ -120,4 +130,9 @@ impl Config {
pub fn meta_api_addr(&self) -> String {
format!("{}:{}", self.meta_api_host, self.meta_api_port)
}

/// Returns true to fsync after a write operation to meta.
pub fn meta_sync(&self) -> bool {
!self.meta_no_sync
}
}
8 changes: 6 additions & 2 deletions fusestore/store/src/meta_service/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::ops::RangeBounds;
use async_raft::raft::Entry;
use common_tracing::tracing;

use crate::configs;
use crate::meta_service::LogEntry;
use crate::meta_service::LogIndex;
use crate::meta_service::SledSerde;
Expand Down Expand Up @@ -41,9 +42,12 @@ impl SledValueToKey<LogIndex> for Entry<LogEntry> {

impl RaftLog {
/// Open RaftLog
pub async fn open(db: &sled::Db) -> common_exception::Result<RaftLog> {
pub async fn open(
db: &sled::Db,
config: &configs::Config,
) -> common_exception::Result<RaftLog> {
let rl = RaftLog {
inner: SledTree::open(db, TREE_RAFT_LOG).await?,
inner: SledTree::open(db, TREE_RAFT_LOG, config.meta_sync()).await?,
};
Ok(rl)
}
Expand Down
12 changes: 6 additions & 6 deletions fusestore/store/src/meta_service/raft_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::tests::service::new_sled_test_context;
async fn test_raft_log_open() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
RaftLog::open(db).await?;
RaftLog::open(db, &tc.config).await?;

Ok(())
}
Expand All @@ -26,7 +26,7 @@ async fn test_raft_log_open() -> anyhow::Result<()> {
async fn test_raft_log_append_and_range_get() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = RaftLog::open(db).await?;
let rl = RaftLog::open(db, &tc.config).await?;

let logs: Vec<Entry<LogEntry>> = vec![
Entry {
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn test_raft_log_append_and_range_get() -> anyhow::Result<()> {
async fn test_raft_log_insert() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = RaftLog::open(db).await?;
let rl = RaftLog::open(db, &tc.config).await?;

assert_eq!(None, rl.get(&5)?);

Expand Down Expand Up @@ -134,7 +134,7 @@ async fn test_raft_log_insert() -> anyhow::Result<()> {
async fn test_raft_log_get() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = RaftLog::open(db).await?;
let rl = RaftLog::open(db, &tc.config).await?;

assert_eq!(None, rl.get(&5)?);

Expand Down Expand Up @@ -171,7 +171,7 @@ async fn test_raft_log_get() -> anyhow::Result<()> {
async fn test_raft_log_last() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = RaftLog::open(db).await?;
let rl = RaftLog::open(db, &tc.config).await?;

assert_eq!(None, rl.last()?);

Expand Down Expand Up @@ -203,7 +203,7 @@ async fn test_raft_log_last() -> anyhow::Result<()> {
async fn test_raft_log_range_delete() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = RaftLog::open(db).await?;
let rl = RaftLog::open(db, &tc.config).await?;

let logs: Vec<Entry<LogEntry>> = vec![
Entry {
Expand Down
4 changes: 2 additions & 2 deletions fusestore/store/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl MetaStore {
// TODO(xp): merge the duplicated snippets in new() and open(), when I got time :DDD

let raft_state = RaftState::create(&db, &id).await?;
let log = RaftLog::open(&db).await?;
let log = RaftLog::open(&db, config).await?;

let sm = RwLock::new(StateMachine::default());
let current_snapshot = RwLock::new(None);
Expand All @@ -151,7 +151,7 @@ impl MetaStore {
})?;

let raft_state = RaftState::open(&db)?;
let log = RaftLog::open(&db).await?;
let log = RaftLog::open(&db, config).await?;

let sm = RwLock::new(StateMachine::default());
let current_snapshot = RwLock::new(None);
Expand Down
35 changes: 27 additions & 8 deletions fusestore/store/src/meta_service/sled_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ pub trait SledValueToKey<K> {
/// The value type `V` can be any serialize impl, i.e. for most cases, to impl trait `SledSerde`.
pub struct SledTree<K: SledOrderedSerde + Display + Debug, V: SledSerde> {
pub name: String,

/// Whether to fsync after an write operation.
/// With sync==false, it WONT fsync even when user tell it to sync.
/// This is only used for testing when fsync is quite slow.
/// E.g. File::sync_all takes 10 ~ 30 ms on a Mac.
/// See: https://github.com/drmingdrmer/sledtest/blob/500929ab0b89afe547143a38fde6fe85d88f1f80/src/ben_sync.rs
sync: bool,

pub(crate) tree: sled::Tree,

phantom_k: PhantomData<K>,
phantom_v: PhantomData<V>,
}
Expand All @@ -35,6 +44,7 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {
pub async fn open<N: AsRef<[u8]> + Display>(
db: &sled::Db,
tree_name: N,
sync: bool,
) -> common_exception::Result<Self> {
let t = db
.open_tree(&tree_name)
Expand All @@ -44,6 +54,7 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {

let rl = SledTree {
name: format!("{}", tree_name),
sync,
tree: t,
phantom_k: PhantomData,
phantom_v: PhantomData,
Expand Down Expand Up @@ -125,7 +136,10 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {
format!("batch delete: {}", range_mes,)
})?;

if flush {
if flush && self.sync {
let span = tracing::span!(tracing::Level::DEBUG, "flush-range-delete");
let _ent = span.enter();

self.tree
.flush_async()
.await
Expand Down Expand Up @@ -196,10 +210,15 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {
.apply_batch(batch)
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "batch append")?;

self.tree
.flush_async()
.await
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "flush append")?;
if self.sync {
let span = tracing::span!(tracing::Level::DEBUG, "flush-append");
let _ent = span.enter();

self.tree
.flush_async()
.await
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "flush append")?;
}

Ok(())
}
Expand All @@ -223,8 +242,8 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {
.apply_batch(batch)
.map_err_to_code(ErrorCode::MetaStoreDamaged, || "batch append_values")?;

{
let span = tracing::span!(tracing::Level::DEBUG, "flush-append");
if self.sync {
let span = tracing::span!(tracing::Level::DEBUG, "flush-append-values");
let _ent = span.enter();

self.tree
Expand Down Expand Up @@ -255,7 +274,7 @@ impl<K: SledOrderedSerde + Display + Debug, V: SledSerde> SledTree<K, V> {
Some(x) => Some(V::de(x)?),
};

{
if self.sync {
let span = tracing::span!(tracing::Level::DEBUG, "flush-insert");
let _ent = span.enter();

Expand Down
18 changes: 9 additions & 9 deletions fusestore/store/src/meta_service/sled_tree_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::tests::service::new_sled_test_context;
async fn test_sled_tree_open() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
SledTree::<LogIndex, Entry<LogEntry>>::open(db, "foo").await?;
SledTree::<LogIndex, Entry<LogEntry>>::open(db, "foo", true).await?;

Ok(())
}
Expand All @@ -27,7 +27,7 @@ async fn test_sled_tree_open() -> anyhow::Result<()> {
async fn test_sled_tree_append() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

let logs: Vec<(LogIndex, Entry<LogEntry>)> = vec![
(8, Entry {
Expand Down Expand Up @@ -83,7 +83,7 @@ async fn test_sled_tree_append() -> anyhow::Result<()> {
async fn test_sled_tree_append_values_and_range_get() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

let logs: Vec<Entry<LogEntry>> = vec![
Entry {
Expand Down Expand Up @@ -156,7 +156,7 @@ async fn test_sled_tree_append_values_and_range_get() -> anyhow::Result<()> {
async fn test_sled_tree_range_keys() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

let logs: Vec<Entry<LogEntry>> = vec![
Entry {
Expand Down Expand Up @@ -206,7 +206,7 @@ async fn test_sled_tree_range_keys() -> anyhow::Result<()> {
async fn test_sled_tree_insert() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

assert_eq!(None, rl.get(&5)?);

Expand Down Expand Up @@ -264,7 +264,7 @@ async fn test_sled_tree_insert() -> anyhow::Result<()> {
async fn test_sled_tree_contains_key() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

assert_eq!(None, rl.get(&5)?);

Expand Down Expand Up @@ -301,7 +301,7 @@ async fn test_sled_tree_contains_key() -> anyhow::Result<()> {
async fn test_sled_tree_get() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

assert_eq!(None, rl.get(&5)?);

Expand Down Expand Up @@ -338,7 +338,7 @@ async fn test_sled_tree_get() -> anyhow::Result<()> {
async fn test_sled_tree_last() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

assert_eq!(None, rl.last()?);

Expand Down Expand Up @@ -370,7 +370,7 @@ async fn test_sled_tree_last() -> anyhow::Result<()> {
async fn test_sled_tree_range_delete() -> anyhow::Result<()> {
let tc = new_sled_test_context();
let db = &tc.db;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log").await?;
let rl = SledTree::<LogIndex, Entry<LogEntry>>::open(db, "log", true).await?;

let logs: Vec<Entry<LogEntry>> = vec![
Entry {
Expand Down
Loading

0 comments on commit 6adf22e

Please sign in to comment.