diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index d646295d868a5..2464f0c91f876 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -191,11 +191,22 @@ impl<'a> Applier<'a> { Change::new(prev, result).into() } + // TODO(1): when get an applier, pass in a now_ms to ensure all expired are cleaned. + /// Update or insert a kv entry. + /// + /// If the input entry has expired, it performs a delete operation. #[minitrace::trace] - async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option, Option) { + pub(crate) async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option, Option) { debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv"); - let (prev, result) = self.sm.upsert_kv(upsert_kv.clone()).await; + let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await; + + self.sm + .update_expire_index(&upsert_kv.key, &prev, &result) + .await; + + let prev = Into::>::into(prev); + let result = Into::>::into(result); debug!( "applied UpsertKV: {:?}; prev: {:?}; result: {:?}", @@ -209,7 +220,6 @@ impl<'a> Applier<'a> { } #[minitrace::trace] - async fn apply_txn(&mut self, req: &TxnRequest) -> AppliedState { debug!(txn = as_display!(req); "apply txn cmd"); @@ -445,9 +455,7 @@ impl<'a> Applier<'a> { assert_eq!(expire_key.seq, seq_v.seq); info!("clean expired: {}, {}", key, expire_key); - self.sm.upsert_kv(UpsertKV::delete(key.clone())).await; - // dbg!("clean_expired", &key, &curr); - self.push_change(key, curr, None); + self.upsert_kv(&UpsertKV::delete(key.clone())).await; } else { unreachable!( "trying to remove un-cleanable: {}, {}, kv-entry: {:?}", diff --git a/src/meta/raft-store/src/lib.rs b/src/meta/raft-store/src/lib.rs index 3fc733afd1e80..f26f60b358a16 100644 --- a/src/meta/raft-store/src/lib.rs +++ b/src/meta/raft-store/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![allow(clippy::uninlined_format_args)] +#![feature(impl_trait_in_assoc_type)] // #![feature(type_alias_impl_trait)] // #![allow(incomplete_features)] diff --git a/src/meta/raft-store/src/sm_v002/importer.rs b/src/meta/raft-store/src/sm_v002/importer.rs index ce10f5ebced6b..ee90b4fe7f313 100644 --- a/src/meta/raft-store/src/sm_v002/importer.rs +++ b/src/meta/raft-store/src/sm_v002/importer.rs @@ -20,7 +20,7 @@ use common_meta_types::LogId; use common_meta_types::StoredMembership; use crate::key_spaces::RaftStoreEntry; -use crate::sm_v002::leveled_store::level_data::LevelData; +use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v002::marked::Marked; use crate::state_machine::ExpireKey; @@ -29,7 +29,7 @@ use crate::state_machine::StateMachineMetaKey; /// A container of temp data that are imported to a LevelData. #[derive(Debug, Default)] pub struct Importer { - level_data: LevelData, + level_data: Level, kv: BTreeMap, expire: BTreeMap>, @@ -109,7 +109,7 @@ impl Importer { Ok(()) } - pub fn commit(mut self) -> LevelData { + pub fn commit(mut self) -> Level { let d = &mut self.level_data; d.replace_kv(self.kv); diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/level_data.rs b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs similarity index 73% rename from src/meta/raft-store/src/sm_v002/leveled_store/level_data.rs rename to src/meta/raft-store/src/sm_v002/leveled_store/level.rs index 75fb87e42e1a8..7142d3dbc39a2 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/level_data.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs @@ -22,16 +22,24 @@ use futures_util::StreamExt; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiRO; +use crate::sm_v002::leveled_store::map_api::MapKey; use crate::sm_v002::leveled_store::sys_data::SysData; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v002::marked::Marked; use crate::state_machine::ExpireKey; +impl MapKey for String { + type V = Vec; +} +impl MapKey for ExpireKey { + type V = String; +} + /// A single level of state machine data. /// /// State machine data is composed of multiple levels. #[derive(Debug, Default)] -pub struct LevelData { +pub struct Level { /// System data(non-user data). sys_data: SysData, @@ -42,7 +50,7 @@ pub struct LevelData { expire: BTreeMap>, } -impl LevelData { +impl Level { /// Create a new level that is based on this level. pub(crate) fn new_level(&self) -> Self { Self { @@ -74,10 +82,8 @@ impl LevelData { } #[async_trait::async_trait] -impl MapApiRO for LevelData { - type V = Vec; - - async fn get(&self, key: &Q) -> Marked +impl MapApiRO for Level { + async fn get(&self, key: &Q) -> Marked<::V> where String: Borrow, Q: Ord + Send + Sync + ?Sized, @@ -85,24 +91,27 @@ impl MapApiRO for LevelData { self.kv.get(key).cloned().unwrap_or(Marked::empty()) } - async fn range<'a, T, R>(&'a self, range: R) -> BoxStream<'a, (String, Marked)> + async fn range<'f, Q, R>( + &'f self, + range: R, + ) -> BoxStream<'f, (String, Marked<::V>)> where - String: 'a, - String: Borrow, - T: Ord + ?Sized, - R: RangeBounds + Send, + String: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Clone + Send + Sync, { - futures::stream::iter(self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()))).boxed() + let it = self.kv.range(range).map(|(k, v)| (k.clone(), v.clone())); + futures::stream::iter(it).boxed() } } #[async_trait::async_trait] -impl MapApi for LevelData { +impl MapApi for Level { async fn set( &mut self, key: String, - value: Option<(Self::V, Option)>, - ) -> (Marked, Marked) { + value: Option<(::V, Option)>, + ) -> (Marked<::V>, Marked<::V>) { // The chance it is the bottom level is very low in a loaded system. // Thus we always tombstone the key if it is None. @@ -117,17 +126,15 @@ impl MapApi for LevelData { Marked::new_tomb_stone(seq) }; - let prev = MapApiRO::::get(self, key.as_str()).await; + let prev = MapApiRO::::get(&*self, key.as_str()).await; self.kv.insert(key, marked.clone()); (prev, marked) } } #[async_trait::async_trait] -impl MapApiRO for LevelData { - type V = String; - - async fn get(&self, key: &Q) -> Marked +impl MapApiRO for Level { + async fn get(&self, key: &Q) -> Marked<::V> where ExpireKey: Borrow, Q: Ord + Send + Sync + ?Sized, @@ -135,15 +142,14 @@ impl MapApiRO for LevelData { self.expire.get(key).cloned().unwrap_or(Marked::empty()) } - async fn range<'a, T: ?Sized, R>( - &'a self, + async fn range<'f, Q, R>( + &'f self, range: R, - ) -> BoxStream<'a, (ExpireKey, Marked)> + ) -> BoxStream<'f, (ExpireKey, Marked<::V>)> where - ExpireKey: 'a, - ExpireKey: Borrow, - T: Ord, - R: RangeBounds + Send, + ExpireKey: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Clone + Send + Sync, { let it = self .expire @@ -155,12 +161,15 @@ impl MapApiRO for LevelData { } #[async_trait::async_trait] -impl MapApi for LevelData { +impl MapApi for Level { async fn set( &mut self, key: ExpireKey, - value: Option<(Self::V, Option)>, - ) -> (Marked, Marked) { + value: Option<(::V, Option)>, + ) -> ( + Marked<::V>, + Marked<::V>, + ) { // dbg!("set expire", &key, &value); let seq = self.curr_seq(); @@ -171,13 +180,13 @@ impl MapApi for LevelData { Marked::TombStone { internal_seq: seq } }; - let prev = MapApiRO::::get(self, &key).await; + let prev = MapApiRO::::get(&*self, &key).await; self.expire.insert(key, marked.clone()); (prev, marked) } } -impl AsRef for LevelData { +impl AsRef for Level { fn as_ref(&self) -> &SysData { &self.sys_data } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs index 293f29dc1bbfa..3d6a21dc22821 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs @@ -19,14 +19,16 @@ use std::sync::Arc; use common_meta_types::KVMeta; use futures_util::stream::BoxStream; -use stream_more::KMerge; -use stream_more::StreamMore; -use crate::sm_v002::leveled_store::level_data::LevelData; +use crate::sm_v002::leveled_store::level::Level; +use crate::sm_v002::leveled_store::map_api::compacted_get; +use crate::sm_v002::leveled_store::map_api::compacted_range; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiRO; +use crate::sm_v002::leveled_store::map_api::MapKey; +use crate::sm_v002::leveled_store::ref_::Ref; +use crate::sm_v002::leveled_store::ref_mut::RefMut; use crate::sm_v002::leveled_store::static_leveled_map::StaticLeveledMap; -use crate::sm_v002::leveled_store::util; use crate::sm_v002::marked::Marked; /// State machine data organized in multiple levels. @@ -38,7 +40,7 @@ use crate::sm_v002::marked::Marked; #[derive(Debug, Default)] pub struct LeveledMap { /// The top level is the newest and writable. - writable: LevelData, + writable: Level, /// The immutable levels, from the oldest to the newest. /// levels[0] is the bottom and oldest level. @@ -46,7 +48,7 @@ pub struct LeveledMap { } impl LeveledMap { - pub(crate) fn new(writable: LevelData) -> Self { + pub(crate) fn new(writable: Level) -> Self { Self { writable, frozen: Default::default(), @@ -54,7 +56,7 @@ impl LeveledMap { } /// Return an iterator of all levels in reverse order. - pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator { + pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator { [&self.writable] .into_iter() .chain(self.frozen.iter_levels()) @@ -71,12 +73,12 @@ impl LeveledMap { } /// Return an immutable reference to the top level i.e., the writable level. - pub fn writable_ref(&self) -> &LevelData { + pub fn writable_ref(&self) -> &Level { &self.writable } /// Return a mutable reference to the top level i.e., the writable level. - pub fn writable_mut(&mut self) -> &mut LevelData { + pub fn writable_mut(&mut self) -> &mut Level { &mut self.writable } @@ -89,76 +91,59 @@ impl LeveledMap { pub(crate) fn replace_frozen_levels(&mut self, b: StaticLeveledMap) { self.frozen = b; } + + pub(crate) fn leveled_ref_mut(&mut self) -> RefMut { + RefMut::new(&mut self.writable, &self.frozen) + } + + pub(crate) fn leveled_ref(&self) -> Ref { + Ref::new(Some(&self.writable), &self.frozen) + } } #[async_trait::async_trait] impl MapApiRO for LeveledMap where - K: Ord + fmt::Debug + Send + Sync + Unpin + 'static, - LevelData: MapApiRO, + K: MapKey + fmt::Debug, + Level: MapApiRO, { - type V = >::V; - - async fn get(&self, key: &Q) -> Marked + async fn get(&self, key: &Q) -> Marked where K: Borrow, Q: Ord + Send + Sync + ?Sized, { - for level_data in self.iter_levels() { - let got = level_data.get(key).await; - if !got.is_not_found() { - return got; - } - } - return Marked::empty(); + let levels = self.iter_levels(); + compacted_get(key, levels).await } - async fn range<'a, T: ?Sized, R>(&'a self, range: R) -> BoxStream<'a, (K, Marked)> + async fn range<'f, Q, R>(&'f self, range: R) -> BoxStream<'f, (K, Marked)> where - K: 'a, - K: Borrow + Clone, - Self::V: Unpin, - T: Ord, - R: RangeBounds + Clone + Send + Sync, + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Clone + Send + Sync, { - let mut km = KMerge::by(util::by_key_seq); - - for api in self.iter_levels() { - let a = api.range(range.clone()).await; - km = km.merge(a); - } - - // Merge entries with the same key, keep the one with larger internal-seq - let m = km.coalesce(util::choose_greater); - - Box::pin(m) + let levels = self.iter_levels(); + compacted_range(range, levels).await } } #[async_trait::async_trait] impl MapApi for LeveledMap where - K: Ord + fmt::Debug + Send + Sync + Unpin + 'static, - LevelData: MapApi, + K: MapKey, + Level: MapApi, { async fn set( &mut self, key: K, - value: Option<(Self::V, Option)>, - ) -> (Marked, Marked) + value: Option<(K::V, Option)>, + ) -> (Marked, Marked) where K: Ord, { - // Get from this level or the base level. - let prev = self.get(&key).await.clone(); - - // No such entry at all, no need to create a tombstone for delete - if prev.is_not_found() && value.is_none() { - return (prev, Marked::new_tomb_stone(0)); - } + let mut l = self.leveled_ref_mut(); + MapApi::set(&mut l, key, value).await - // The data is a single level map and the returned `_prev` is only from that level. - let (_prev, inserted) = self.writable_mut().set(key, value).await; - (prev, inserted) + // (&mut l).set(key, value).await } } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs index 2f615451ff305..13ed09d2bee98 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs @@ -17,6 +17,7 @@ use futures_util::StreamExt; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; use crate::sm_v002::leveled_store::map_api::MapApi; +use crate::sm_v002::leveled_store::map_api::MapApiExt; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::marked::Marked; @@ -204,9 +205,11 @@ async fn test_two_levels() -> anyhow::Result<()> { /// Create multi levels store: /// +/// ```text /// l2 | c(D) d /// l1 | b(D) c e /// l0 | a b c d +/// ``` async fn build_3_levels() -> LeveledMap { let mut l = LeveledMap::default(); // internal_seq: 0 @@ -356,8 +359,10 @@ async fn test_three_levels_delete() -> anyhow::Result<()> { Ok(()) } +/// ```text /// | b(m) c /// | a(m) b c(m) +/// ``` async fn build_2_level_with_meta() -> LeveledMap { let mut l = LeveledMap::default(); @@ -401,7 +406,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { { let mut l = build_2_level_with_meta().await; - let (prev, result) = MapApi::::upsert_value(&mut l, s("a"), b("a1")).await; + let (prev, result) = MapApiExt::upsert_value(&mut l, s("a"), b("a1")).await; assert_eq!( prev, Marked::new_normal(1, b("a0"), Some(KVMeta { expire_at: Some(1) })) @@ -422,7 +427,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { { let mut l = build_2_level_with_meta().await; - let (prev, result) = MapApi::::upsert_value(&mut l, s("b"), b("x1")).await; + let (prev, result) = MapApiExt::upsert_value(&mut l, s("b"), b("x1")).await; assert_eq!( prev, Marked::new_normal( @@ -461,7 +466,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { { let mut l = build_2_level_with_meta().await; - let (prev, result) = MapApi::::upsert_value(&mut l, s("d"), b("d1")).await; + let (prev, result) = MapApiExt::upsert_value(&mut l, s("d"), b("d1")).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_normal(6, b("d1"), None)); @@ -479,8 +484,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { let mut l = build_2_level_with_meta().await; let (prev, result) = - MapApi::::update_meta(&mut l, s("a"), Some(KVMeta { expire_at: Some(2) })) - .await; + MapApiExt::update_meta(&mut l, s("a"), Some(KVMeta { expire_at: Some(2) })).await; assert_eq!( prev, Marked::new_normal(1, b("a0"), Some(KVMeta { expire_at: Some(1) })) @@ -501,7 +505,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { { let mut l = build_2_level_with_meta().await; - let (prev, result) = MapApi::::update_meta(&mut l, s("b"), None).await; + let (prev, result) = MapApiExt::update_meta(&mut l, s("b"), None).await; assert_eq!( prev, Marked::new_normal( @@ -522,7 +526,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { { let mut l = build_2_level_with_meta().await; - let (prev, result) = MapApi::::update_meta( + let (prev, result) = MapApiExt::update_meta( &mut l, s("c"), Some(KVMeta { @@ -560,8 +564,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { let mut l = build_2_level_with_meta().await; let (prev, result) = - MapApi::::update_meta(&mut l, s("d"), Some(KVMeta { expire_at: Some(2) })) - .await; + MapApiExt::update_meta(&mut l, s("d"), Some(KVMeta { expire_at: Some(2) })).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_tomb_stone(0)); diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs index c877bd60380d2..e3e1691ffd7dd 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs @@ -13,22 +13,59 @@ // limitations under the License. use std::borrow::Borrow; +use std::fmt; use std::ops::RangeBounds; use common_meta_types::KVMeta; use futures_util::stream::BoxStream; +use stream_more::KMerge; +use stream_more::StreamMore; +use crate::sm_v002::leveled_store::level::Level; +use crate::sm_v002::leveled_store::util; use crate::sm_v002::marked::Marked; +/// MapKey defines the behavior of a key in a map. +/// +/// It is `Clone` to let MapApi clone a range of key. +/// It is `Unpin` to let MapApi extract a key from pinned data, such as a stream. +/// And it only accepts `'static` value for simplicity. +pub(in crate::sm_v002) trait MapKey: + Clone + Ord + fmt::Debug + Send + Sync + Unpin + 'static +{ + type V: MapValue; +} + +/// MapValue defines the behavior of a value in a map. +/// +/// It is `Clone` to let MapApi return an owned value. +/// It is `Unpin` to let MapApi extract a value from pinned data, such as a stream. +/// And it only accepts `'static` value for simplicity. +pub(in crate::sm_v002) trait MapValue: + Clone + Send + Sync + Unpin + 'static +{ +} + +// Auto implement MapValue for all types that satisfy the constraints. +impl MapValue for V where V: Clone + Send + Sync + Unpin + 'static {} + /// Provide a readonly key-value map API set, used to access state machine data. +/// +/// `MapApiRO` and `MapApi` both have two lifetime parameters, `'me` and `'d`, +/// to describe the lifetime of the MapApi object and the lifetime of the data. +/// +/// When an implementation owns the data it operates on, `'me` must outlive `'d`. +/// Otherwise, i.e., the implementation just keeps a reference to the data, +/// `'me` could be shorter than `'d`. +/// +/// There is no lifetime constraint on the trait, +/// and it's the implementation's duty to specify a valid lifetime constraint. #[async_trait::async_trait] pub(in crate::sm_v002) trait MapApiRO: Send + Sync -where K: Ord + Send + Sync + 'static +where K: MapKey { - type V: Clone + Send + Sync + 'static; - /// Get an entry by key. - async fn get(&self, key: &Q) -> Marked + async fn get(&self, key: &Q) -> Marked where K: Borrow, Q: Ord + Send + Sync + ?Sized; @@ -36,35 +73,42 @@ where K: Ord + Send + Sync + 'static /// Iterate over a range of entries by keys. /// /// The returned iterator contains tombstone entries: [`Marked::TombStone`]. - async fn range<'a, T: ?Sized, R>(&'a self, range: R) -> BoxStream<'a, (K, Marked)> + async fn range<'f, Q, R>(&'f self, range: R) -> BoxStream<'f, (K, Marked)> where - K: Clone + Borrow + 'a, - Self::V: Unpin, - T: Ord, - R: RangeBounds + Send + Sync + Clone; + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Send + Sync + Clone; } /// Provide a read-write key-value map API set, used to access state machine data. #[async_trait::async_trait] -pub(in crate::sm_v002) trait MapApi: MapApiRO + Send + Sync -where K: Ord + Send + Sync + 'static +pub(in crate::sm_v002) trait MapApi: MapApiRO +where K: MapKey { /// Set an entry and returns the old value and the new value. async fn set( &mut self, key: K, - value: Option<(Self::V, Option)>, - ) -> (Marked, Marked); + value: Option<(K::V, Option)>, + ) -> (Marked, Marked); +} + +pub(in crate::sm_v002) struct MapApiExt; +impl MapApiExt { /// Update only the meta associated to an entry and keeps the value unchanged. /// If the entry does not exist, nothing is done. - async fn update_meta( - &mut self, + pub(in crate::sm_v002) async fn update_meta( + s: &mut T, key: K, meta: Option, - ) -> (Marked, Marked) { + ) -> (Marked, Marked) + where + K: MapKey, + T: MapApi, + { // - let got = self.get(&key).await; + let got = s.get(&key).await; if got.is_tomb_stone() { return (got.clone(), got.clone()); } @@ -72,13 +116,22 @@ where K: Ord + Send + Sync + 'static // Safe unwrap(), got is Normal let (v, _) = got.unpack_ref().unwrap(); - self.set(key, Some((v.clone(), meta))).await + s.set(key, Some((v.clone(), meta))).await } /// Update only the value and keeps the meta unchanged. /// If the entry does not exist, create one. - async fn upsert_value(&mut self, key: K, value: Self::V) -> (Marked, Marked) { - let got = self.get(&key).await; + #[allow(dead_code)] + pub(in crate::sm_v002) async fn upsert_value( + s: &mut T, + key: K, + value: K::V, + ) -> (Marked, Marked) + where + K: MapKey, + T: MapApi, + { + let got = s.get(&key).await; let meta = if let Some((_, meta)) = got.unpack_ref() { meta @@ -86,6 +139,57 @@ where K: Ord + Send + Sync + 'static None }; - self.set(key, Some((value, meta.cloned()))).await + s.set(key, Some((value, meta.cloned()))).await } } + +/// Get a key from multi levels data. +/// +/// Returns the first non-tombstone entry. +pub(in crate::sm_v002) async fn compacted_get<'d, K, Q>( + key: &Q, + levels: impl Iterator, +) -> Marked +where + K: MapKey, + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + Level: MapApiRO, +{ + for lvl in levels { + let got = lvl.get(key).await; + if !got.not_found() { + return got; + } + } + Marked::empty() +} + +/// Iterate over a range of entries by keys from multi levels. +/// +/// The returned iterator contains at most one entry for each key. +/// There could be tombstone entries: [`Marked::TombStone`] +pub(in crate::sm_v002) async fn compacted_range<'d, K, Q, R>( + range: R, + levels: impl Iterator, +) -> BoxStream<'d, (K, Marked)> +where + K: MapKey, + K: Borrow, + R: RangeBounds + Clone + Send + Sync, + Q: Ord + Send + Sync + ?Sized, + Level: MapApiRO, +{ + let mut kmerge = KMerge::by(util::by_key_seq); + + for lvl in levels { + let strm = lvl.range(range.clone()).await; + kmerge = kmerge.merge(strm); + } + + // Merge entries with the same key, keep the one with larger internal-seq + let m = kmerge.coalesce(util::choose_greater); + + let strm: BoxStream<'_, (K, Marked)> = Box::pin(m); + strm +} diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs b/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs index 923fbb0b0bf19..21f81cb3410ef 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod level_data; +pub mod level; pub mod leveled_map; pub mod map_api; +pub mod ref_; +pub mod ref_mut; pub mod static_leveled_map; pub mod sys_data; pub mod sys_data_api; diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs b/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs new file mode 100644 index 0000000000000..b2d3ed9862127 --- /dev/null +++ b/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs @@ -0,0 +1,77 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Borrow; +use std::fmt; +use std::ops::RangeBounds; + +use futures_util::stream::BoxStream; + +use crate::sm_v002::leveled_store::level::Level; +use crate::sm_v002::leveled_store::map_api::compacted_get; +use crate::sm_v002::leveled_store::map_api::compacted_range; +use crate::sm_v002::leveled_store::map_api::MapApiRO; +use crate::sm_v002::leveled_store::map_api::MapKey; +use crate::sm_v002::leveled_store::static_leveled_map::StaticLeveledMap; +use crate::sm_v002::marked::Marked; + +/// A readonly leveled map that does not not own the data. +#[derive(Debug)] +pub struct Ref<'d> { + /// The top level is the newest and writable. + writable: Option<&'d Level>, + + /// The immutable levels. + frozen: &'d StaticLeveledMap, +} + +impl<'d> Ref<'d> { + pub(in crate::sm_v002) fn new( + writable: Option<&'d Level>, + frozen: &'d StaticLeveledMap, + ) -> Ref<'d> { + Self { writable, frozen } + } + + /// Return an iterator of all levels in reverse order. + pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator + 'd { + self.writable.into_iter().chain(self.frozen.iter_levels()) + } +} + +#[async_trait::async_trait] +impl<'d, K> MapApiRO for Ref<'d> +where + K: MapKey + fmt::Debug, + Level: MapApiRO, +{ + async fn get(&self, key: &Q) -> Marked + where + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + { + let levels = self.iter_levels(); + compacted_get(key, levels).await + } + + async fn range<'f, Q, R>(&'f self, range: R) -> BoxStream<'f, (K, Marked)> + where + K: Borrow, + R: RangeBounds + Clone + Send + Sync, + Q: Ord + Send + Sync + ?Sized, + { + let levels = self.iter_levels(); + compacted_range(range, levels).await + } +} diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs b/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs new file mode 100644 index 0000000000000..d12a9d94f0d6b --- /dev/null +++ b/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs @@ -0,0 +1,113 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Borrow; +use std::ops::RangeBounds; + +use common_meta_types::KVMeta; +use futures_util::stream::BoxStream; + +use crate::sm_v002::leveled_store::level::Level; +use crate::sm_v002::leveled_store::map_api::compacted_get; +use crate::sm_v002::leveled_store::map_api::compacted_range; +use crate::sm_v002::leveled_store::map_api::MapApi; +use crate::sm_v002::leveled_store::map_api::MapApiRO; +use crate::sm_v002::leveled_store::map_api::MapKey; +use crate::sm_v002::leveled_store::ref_::Ref; +use crate::sm_v002::leveled_store::static_leveled_map::StaticLeveledMap; +use crate::sm_v002::marked::Marked; + +/// A writable leveled map that does not not own the data. +#[derive(Debug)] +pub struct RefMut<'d> { + /// The top level is the newest and writable. + writable: &'d mut Level, + + /// The immutable levels. + frozen: &'d StaticLeveledMap, +} + +impl<'d> RefMut<'d> { + pub(in crate::sm_v002) fn new(writable: &'d mut Level, frozen: &'d StaticLeveledMap) -> Self { + Self { writable, frozen } + } + + #[allow(dead_code)] + pub(in crate::sm_v002) fn to_leveled_ref(&self) -> Ref { + Ref::new(Some(&*self.writable), self.frozen) + } + + /// Return an iterator of all levels in new-to-old order. + pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator + '_ { + [&*self.writable] + .into_iter() + .chain(self.frozen.iter_levels()) + } +} + +// Because `LeveledRefMut` has a mut ref of lifetime 'd, +// `self` must outlive 'd otherwise there will be two mut ref. +#[async_trait::async_trait] +impl<'d, K> MapApiRO for RefMut<'d> +where + K: MapKey, + Level: MapApiRO, +{ + async fn get(&self, key: &Q) -> Marked + where + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + { + let levels = self.iter_levels(); + compacted_get(key, levels).await + } + + async fn range<'f, Q, R>(&'f self, range: R) -> BoxStream<'f, (K, Marked)> + where + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Clone + Send + Sync, + { + let levels = self.iter_levels(); + compacted_range(range, levels).await + } +} + +#[async_trait::async_trait] +impl<'d, K> MapApi for RefMut<'d> +where + K: MapKey, + Level: MapApi, +{ + async fn set( + &mut self, + key: K, + value: Option<(K::V, Option)>, + ) -> (Marked, Marked) + where + K: Ord, + { + // Get from this level or the base level. + let prev = self.get(&key).await.clone(); + + // No such entry at all, no need to create a tombstone for delete + if prev.not_found() && value.is_none() { + return (prev, Marked::new_tomb_stone(0)); + } + + // The data is a single level map and the returned `_prev` is only from that level. + let (_prev, inserted) = self.writable.set(key, value).await; + (prev, inserted) + } +} diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/static_leveled_map.rs b/src/meta/raft-store/src/sm_v002/leveled_store/static_leveled_map.rs index 785ddd25d93ac..6e6ae6e3637cf 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/static_leveled_map.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/static_leveled_map.rs @@ -13,90 +13,78 @@ // limitations under the License. use std::borrow::Borrow; -use std::fmt; use std::ops::RangeBounds; use std::sync::Arc; use futures_util::stream::BoxStream; -use stream_more::KMerge; -use stream_more::StreamMore; -use crate::sm_v002::leveled_store::level_data::LevelData; +use crate::sm_v002::leveled_store::level::Level; +use crate::sm_v002::leveled_store::map_api::compacted_get; +use crate::sm_v002::leveled_store::map_api::compacted_range; use crate::sm_v002::leveled_store::map_api::MapApiRO; -use crate::sm_v002::leveled_store::util; +use crate::sm_v002::leveled_store::map_api::MapKey; +use crate::sm_v002::leveled_store::ref_::Ref; use crate::sm_v002::marked::Marked; +/// A readonly leveled map that owns the data. #[derive(Debug, Default, Clone)] pub struct StaticLeveledMap { /// From oldest to newest, i.e., levels[0] is the oldest - levels: Vec>, + levels: Vec>, } impl StaticLeveledMap { - pub(in crate::sm_v002) fn new(levels: impl IntoIterator>) -> Self { + pub(in crate::sm_v002) fn new(levels: impl IntoIterator>) -> Self { Self { levels: levels.into_iter().collect(), } } /// Return an iterator of all levels from newest to oldest. - pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator { + pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator { self.levels.iter().map(|x| x.as_ref()).rev() } - pub(in crate::sm_v002) fn newest(&self) -> Option<&Arc> { + pub(in crate::sm_v002) fn newest(&self) -> Option<&Arc> { self.levels.last() } - pub(in crate::sm_v002) fn push(&mut self, level: Arc) { + pub(in crate::sm_v002) fn push(&mut self, level: Arc) { self.levels.push(level); } pub(in crate::sm_v002) fn len(&self) -> usize { self.levels.len() } + + #[allow(dead_code)] + pub(in crate::sm_v002) fn to_ref(&self) -> Ref<'_> { + Ref::new(None, self) + } } #[async_trait::async_trait] impl MapApiRO for StaticLeveledMap where - K: Ord + fmt::Debug + Send + Sync + Unpin + 'static, - LevelData: MapApiRO, + K: MapKey, + Level: MapApiRO, { - type V = >::V; - - async fn get(&self, key: &Q) -> Marked + async fn get(&self, key: &Q) -> Marked where K: Borrow, Q: Ord + Send + Sync + ?Sized, { - for level_data in self.iter_levels() { - let got = level_data.get(key).await; - if !got.is_not_found() { - return got; - } - } - return Marked::empty(); + let levels = self.iter_levels(); + compacted_get(key, levels).await } - async fn range<'a, T: ?Sized, R>(&'a self, range: R) -> BoxStream<'a, (K, Marked)> + async fn range<'f, Q, R>(&'f self, range: R) -> BoxStream<'f, (K, Marked)> where - K: 'a, - K: Borrow + Clone, - Self::V: Unpin, - T: Ord, - R: RangeBounds + Clone + Send + Sync, + K: Borrow, + Q: Ord + Send + Sync + ?Sized, + R: RangeBounds + Clone + Send + Sync, { - let mut km = KMerge::by(util::by_key_seq::); - - for api in self.iter_levels() { - let a = api.range(range.clone()).await; - km = km.merge(a); - } - - // keep one of the entries with the same key, which has larger internal-seq - let m = km.coalesce(util::choose_greater); - - Box::pin(m) + let levels = self.iter_levels(); + compacted_range(range, levels).await } } diff --git a/src/meta/raft-store/src/sm_v002/marked/mod.rs b/src/meta/raft-store/src/sm_v002/marked/mod.rs index 160549e0b0377..a165ad2aaec1a 100644 --- a/src/meta/raft-store/src/sm_v002/marked/mod.rs +++ b/src/meta/raft-store/src/sm_v002/marked/mod.rs @@ -31,7 +31,7 @@ use crate::state_machine::ExpireValue; /// A deleted tombstone also have `internal_seq`, while for an application, deleted entry has seq=0. /// A normal entry(non-deleted) has a positive `seq` that is same as the corresponding `internal_seq`. #[derive(Debug, Clone, PartialEq, Eq)] -pub(in crate::sm_v002) enum Marked> { +pub(crate) enum Marked> { TombStone { internal_seq: u64, }, @@ -156,8 +156,8 @@ impl Marked { } } - /// Not a normal entry or a tombstone. - pub fn is_not_found(&self) -> bool { + /// Return if the entry is neither a normal entry nor a tombstone. + pub fn not_found(&self) -> bool { matches!(self, Marked::TombStone { internal_seq: 0, .. diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index bff4f9de53bf8..2e3edfeb30c72 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -50,9 +50,10 @@ use tokio::sync::RwLock; use crate::applier::Applier; use crate::key_spaces::RaftStoreEntry; -use crate::sm_v002::leveled_store::level_data::LevelData; +use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; use crate::sm_v002::leveled_store::map_api::MapApi; +use crate::sm_v002::leveled_store::map_api::MapApiExt; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v002::marked::Marked; @@ -203,7 +204,7 @@ impl SMV002 { Ok(()) } - pub fn import(data: impl Iterator) -> Result { + pub fn import(data: impl Iterator) -> Result { let mut importer = Self::new_importer(); for ent in data { @@ -226,6 +227,11 @@ impl SMV002 { &self.blocking_config } + #[allow(dead_code)] + pub(crate) fn new_applier(&mut self) -> Applier { + Applier::new(self) + } + pub async fn apply_entries<'a>( &mut self, entries: impl IntoIterator, @@ -245,26 +251,10 @@ impl SMV002 { /// /// It does not check expiration of the returned entry. pub async fn get_kv(&self, key: &str) -> Option { - let got = MapApiRO::::get(&self.levels, key).await; + let got = MapApiRO::::get(&self.levels.leveled_ref(), key).await; Into::>::into(got) } - // TODO(1): when get an applier, pass in a now_ms to ensure all expired are cleaned. - /// Update or insert a kv entry. - /// - /// If the input entry has expired, it performs a delete operation. - pub(crate) async fn upsert_kv(&mut self, upsert_kv: UpsertKV) -> (Option, Option) { - let (prev, result) = self.upsert_kv_primary_index(&upsert_kv).await; - - self.update_expire_index(&upsert_kv.key, &prev, &result) - .await; - - let prev = Into::>::into(prev); - let result = Into::>::into(result); - - (prev, result) - } - /// List kv entries by prefix. /// /// If a value is expired, it is not returned. @@ -396,7 +386,7 @@ impl SMV002 { } /// It returns 2 entries: the previous one and the new one after upsert. - async fn upsert_kv_primary_index( + pub(crate) async fn upsert_kv_primary_index( &mut self, upsert_kv: &UpsertKV, ) -> (Marked>, Marked>) { @@ -419,9 +409,12 @@ impl SMV002 { } Operation::Delete => self.levels.set(upsert_kv.key.clone(), None).await, Operation::AsIs => { - self.levels - .update_meta(upsert_kv.key.clone(), upsert_kv.value_meta.clone()) - .await + MapApiExt::update_meta( + &mut self.levels, + upsert_kv.key.clone(), + upsert_kv.value_meta.clone(), + ) + .await } }; @@ -447,7 +440,7 @@ impl SMV002 { /// Update the secondary index for speeding up expiration operation. /// /// Remove the expiration index for the removed record, and add a new one for the new record. - async fn update_expire_index( + pub(crate) async fn update_expire_index( &mut self, key: impl ToString, removed: &Marked>, diff --git a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs index e0c9fb6a405d0..d2bae9d6e184b 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs @@ -27,19 +27,23 @@ use crate::state_machine::ExpireKey; async fn test_one_level_upsert_get_range() -> anyhow::Result<()> { let mut sm = SMV002::default(); - let (prev, result) = sm.upsert_kv(UpsertKV::update("a", b"a0")).await; + let mut a = sm.new_applier(); + let (prev, result) = a.upsert_kv(&UpsertKV::update("a", b"a0")).await; assert_eq!(prev, None); assert_eq!(result, Some(SeqV::new(1, b("a0")))); + let got = sm.get_kv("a").await; assert_eq!(got, Some(SeqV::new(1, b("a0")))); - let (prev, result) = sm.upsert_kv(UpsertKV::update("b", b"b0")).await; + let mut a = sm.new_applier(); + let (prev, result) = a.upsert_kv(&UpsertKV::update("b", b"b0")).await; assert_eq!(prev, None); assert_eq!(result, Some(SeqV::new(2, b("b0")))); let got = sm.get_kv("b").await; assert_eq!(got, Some(SeqV::new(2, b("b0")))); - let (prev, result) = sm.upsert_kv(UpsertKV::update("a", b"a00")).await; + let mut a = sm.new_applier(); + let (prev, result) = a.upsert_kv(&UpsertKV::update("a", b"a00")).await; assert_eq!(prev, Some(SeqV::new(1, b("a0")))); assert_eq!(result, Some(SeqV::new(3, b("a00")))); let got = sm.get_kv("a").await; @@ -71,18 +75,20 @@ async fn test_two_level_upsert_get_range() -> anyhow::Result<()> { // | a a/b c let mut sm = SMV002::default(); + let mut a = sm.new_applier(); // internal_seq = 0 - sm.upsert_kv(UpsertKV::update("a", b"a0")).await; - sm.upsert_kv(UpsertKV::update("a/b", b"b0")).await; - sm.upsert_kv(UpsertKV::update("c", b"c0")).await; + a.upsert_kv(&UpsertKV::update("a", b"a0")).await; + a.upsert_kv(&UpsertKV::update("a/b", b"b0")).await; + a.upsert_kv(&UpsertKV::update("c", b"c0")).await; sm.levels.freeze_writable(); + let mut a = sm.new_applier(); // internal_seq = 3 - sm.upsert_kv(UpsertKV::delete("a/b")).await; - sm.upsert_kv(UpsertKV::update("c", b"c1")).await; - sm.upsert_kv(UpsertKV::update("d", b"d1")).await; + a.upsert_kv(&UpsertKV::delete("a/b")).await; + a.upsert_kv(&UpsertKV::update("c", b"c1")).await; + a.upsert_kv(&UpsertKV::update("d", b"d1")).await; // get_kv_ref() @@ -148,17 +154,19 @@ async fn test_update_expire_index() -> anyhow::Result<()> { /// l0 | a₁ b₂ | 5,2₂ -> b 10,1₁ -> a async fn build_sm_with_expire() -> SMV002 { let mut sm = SMV002::default(); + let mut a = sm.new_applier(); - sm.upsert_kv(UpsertKV::update("a", b"a0").with_expire_sec(10)) + a.upsert_kv(&UpsertKV::update("a", b"a0").with_expire_sec(10)) .await; - sm.upsert_kv(UpsertKV::update("b", b"b0").with_expire_sec(5)) + a.upsert_kv(&UpsertKV::update("b", b"b0").with_expire_sec(5)) .await; sm.levels.freeze_writable(); + let mut a = sm.new_applier(); - sm.upsert_kv(UpsertKV::update("c", b"c0").with_expire_sec(20)) + a.upsert_kv(&UpsertKV::update("c", b"c0").with_expire_sec(20)) .await; - sm.upsert_kv(UpsertKV::update("a", b"a1").with_expire_sec(15)) + a.upsert_kv(&UpsertKV::update("a", b"a1").with_expire_sec(15)) .await; // let x: Vec<(&ExpireKey, &Marked)> = @@ -225,8 +233,10 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> { sm.update_expire_cursor(15_000); + let mut a = sm.new_applier(); + // Inserting an expired entry will delete it. - sm.upsert_kv(UpsertKV::update("a", b"a1").with_expire_sec(10)) + a.upsert_kv(&UpsertKV::update("a", b"a1").with_expire_sec(10)) .await; assert_eq!(sm.get_kv("a").await, None, "a is expired"); diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs index ed3f32d8a6ede..2071fe57b3e93 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs @@ -299,16 +299,18 @@ async fn build_3_levels() -> LeveledMap { async fn build_sm_with_expire() -> SMV002 { let mut sm = SMV002::default(); - sm.upsert_kv(UpsertKV::update("a", b"a0").with_expire_sec(10)) + let mut a = sm.new_applier(); + a.upsert_kv(&UpsertKV::update("a", b"a0").with_expire_sec(10)) .await; - sm.upsert_kv(UpsertKV::update("b", b"b0").with_expire_sec(5)) + a.upsert_kv(&UpsertKV::update("b", b"b0").with_expire_sec(5)) .await; sm.levels.freeze_writable(); - sm.upsert_kv(UpsertKV::update("c", b"c0").with_expire_sec(20)) + let mut a = sm.new_applier(); + a.upsert_kv(&UpsertKV::update("c", b"c0").with_expire_sec(20)) .await; - sm.upsert_kv(UpsertKV::update("a", b"a1").with_expire_sec(15)) + a.upsert_kv(&UpsertKV::update("a", b"a1").with_expire_sec(15)) .await; sm