Skip to content

Commit

Permalink
chore: rename: SMV002::get_kv() to get_maybe_expired_kv() (databendla…
Browse files Browse the repository at this point in the history
…bs#13466)

* chore: collect sys data API into SysData, reduce state machine API set

* chore: rename: SMV002::get_kv() to get_maybe_expired_kv()

Because its return value may be an expired record and should be considered as None
  • Loading branch information
drmingdrmer authored and andylokandy committed Nov 27, 2023
1 parent bbe06e9 commit 6061e7e
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 97 deletions.
8 changes: 5 additions & 3 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_meta_raft_store::ondisk::DataVersion;
use common_meta_raft_store::ondisk::OnDisk;
use common_meta_raft_store::ondisk::DATA_VERSION;
use common_meta_raft_store::ondisk::TREE_HEADER;
use common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use common_meta_raft_store::sm_v002::SnapshotStoreV002;
use common_meta_raft_store::state::RaftState;
use common_meta_sled_store::get_sled_db;
Expand Down Expand Up @@ -369,7 +370,7 @@ async fn init_new_cluster(

let last_applied = {
let sm2 = sto.get_state_machine().await;
*sm2.last_applied_ref()
*sm2.sys_data_ref().last_applied_ref()
};

let last_log_id = std::cmp::max(last_applied, max_log_id);
Expand All @@ -382,12 +383,13 @@ async fn init_new_cluster(
{
let mut sm2 = sto.get_state_machine().await;

*sm2.nodes_mut() = nodes.clone();
*sm2.sys_data_mut().nodes_mut() = nodes.clone();

// It must set membership to state machine because
// the snapshot may contain more logs than the last_log_id.
// In which case, logs will be purged upon startup.
*sm2.last_membership_mut() = StoredMembership::new(last_applied, membership.clone());
*sm2.sys_data_mut().last_membership_mut() =
StoredMembership::new(last_applied, membership.clone());
}

// Build snapshot to persist state machine.
Expand Down
40 changes: 23 additions & 17 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ impl<'a> Applier<'a> {

self.clean_expired_kvs(log_time_ms).await;

// TODO: it could persist the last_applied log id so that when starting up,
// it could re-apply the logs without waiting for the `committed` message from a leader.
*self.sm.last_applied_mut() = Some(*log_id);
*self.sm.sys_data_mut().last_applied_mut() = Some(*log_id);

let applied_state = match entry.payload {
EntryPayload::Blank => {
Expand All @@ -103,7 +101,8 @@ impl<'a> Applier<'a> {
EntryPayload::Membership(ref mem) => {
info!("apply: membership: {:?}", mem);

*self.sm.last_membership_mut() = StoredMembership::new(Some(*log_id), mem.clone());
*self.sm.sys_data_mut().last_membership_mut() =
StoredMembership::new(Some(*log_id), mem.clone());
AppliedState::None
}
};
Expand Down Expand Up @@ -149,16 +148,22 @@ impl<'a> Applier<'a> {
/// Insert a node only when it does not exist or `overriding` is true.
#[minitrace::trace]
fn apply_add_node(&mut self, node_id: &u64, node: &Node, overriding: bool) -> AppliedState {
let prev = self.sm.nodes_mut().get(node_id).cloned();
let prev = self.sm.sys_data_mut().nodes_mut().get(node_id).cloned();

if prev.is_none() {
self.sm.nodes_mut().insert(*node_id, node.clone());
self.sm
.sys_data_mut()
.nodes_mut()
.insert(*node_id, node.clone());
info!("applied AddNode(non-overriding): {}={:?}", node_id, node);
return (prev, Some(node.clone())).into();
}

if overriding {
self.sm.nodes_mut().insert(*node_id, node.clone());
self.sm
.sys_data_mut()
.nodes_mut()
.insert(*node_id, node.clone());
info!("applied AddNode(overriding): {}={:?}", node_id, node);
(prev, Some(node.clone())).into()
} else {
Expand All @@ -168,7 +173,7 @@ impl<'a> Applier<'a> {

#[minitrace::trace]
fn apply_remove_node(&mut self, node_id: &u64) -> AppliedState {
let prev = self.sm.nodes_mut().remove(node_id);
let prev = self.sm.sys_data_mut().nodes_mut().remove(node_id);
info!("applied RemoveNode: {}={:?}", node_id, prev);

(prev, None).into()
Expand Down Expand Up @@ -213,7 +218,6 @@ impl<'a> Applier<'a> {
upsert_kv, prev, result
);

// dbg!("push_change", &upsert_kv.key, prev.clone(), result.clone());
self.push_change(&upsert_kv.key, prev.clone(), result.clone());

(prev, result)
Expand Down Expand Up @@ -262,7 +266,11 @@ impl<'a> Applier<'a> {
debug!(cond = as_display!(cond); "txn_execute_one_condition");

let key = &cond.key;
let seqv = self.sm.get_kv(key).await;
// No expiration check:
// If the key expired, it should be treated as `None` value.
// sm.get_kv() does not check expiration.
// Expired keys are cleaned before applying a log, see: `clean_expired_kvs()`.
let seqv = self.sm.get_maybe_expired_kv(key).await;

debug!(
"txn_execute_one_condition: key: {} curr: seq:{} value:{:?}",
Expand Down Expand Up @@ -337,7 +345,7 @@ impl<'a> Applier<'a> {
}

async fn txn_execute_get(&self, get: &TxnGetRequest, resp: &mut TxnReply) {
let sv = self.sm.get_kv(&get.key).await;
let sv = self.sm.get_maybe_expired_kv(&get.key).await;
let value = sv.map(pb::SeqV::from);
let get_resp = TxnGetResponse {
key: get.key.clone(),
Expand Down Expand Up @@ -439,19 +447,17 @@ impl<'a> Applier<'a> {

{
let mut strm = std::pin::pin!(strm);
while let Some((expire_key, expire_value)) = strm.next().await {
// dbg!("check expired", &expire_key, &expire_value);
// dbg!(expire_key.is_expired(log_time_ms));

while let Some((expire_key, key)) = strm.next().await {
if !expire_key.is_expired(log_time_ms) {
break;
}
to_clean.push((expire_key.clone(), expire_value.clone()));

to_clean.push((expire_key, key));
}
}

for (expire_key, key) in to_clean {
let curr = self.sm.get_kv(&key).await;
let curr = self.sm.get_maybe_expired_kv(&key).await;
if let Some(seq_v) = &curr {
assert_eq!(expire_key.seq, seq_v.seq);
info!("clean expired: {}, {}", key, expire_key);
Expand Down
4 changes: 4 additions & 0 deletions src/meta/raft-store/src/sm_v002/leveled_store/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl Level {
}
}

pub(in crate::sm_v002) fn sys_data_ref(&self) -> &SysData {
&self.sys_data
}

pub(in crate::sm_v002) fn sys_data_mut(&mut self) -> &mut SysData {
&mut self.sys_data
}
Expand Down
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/sm_v002/leveled_store/sys_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ impl SysData {
self.sequence
}

pub(crate) fn last_applied_mut(&mut self) -> &mut Option<LogId> {
pub fn last_applied_mut(&mut self) -> &mut Option<LogId> {
&mut self.last_applied
}

pub(crate) fn last_membership_mut(&mut self) -> &mut StoredMembership {
pub fn last_membership_mut(&mut self) -> &mut StoredMembership {
&mut self.last_membership
}

pub(crate) fn nodes_mut(&mut self) -> &mut BTreeMap<NodeId, Node> {
pub fn nodes_mut(&mut self) -> &mut BTreeMap<NodeId, Node> {
&mut self.nodes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta_types::NodeId;
use common_meta_types::StoredMembership;

/// APIs to access the non-user-data of the state machine(leveled map).
pub(in crate::sm_v002) trait SysDataApiRO {
pub trait SysDataApiRO {
fn curr_seq(&self) -> u64;

fn last_applied_ref(&self) -> &Option<LogId>;
Expand Down
63 changes: 20 additions & 43 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future;
Expand All @@ -29,15 +28,11 @@ use common_meta_stoerr::MetaBytesError;
use common_meta_types::protobuf::StreamItem;
use common_meta_types::AppliedState;
use common_meta_types::Entry;
use common_meta_types::LogId;
use common_meta_types::MatchSeqExt;
use common_meta_types::Node;
use common_meta_types::NodeId;
use common_meta_types::Operation;
use common_meta_types::SeqV;
use common_meta_types::SeqValue;
use common_meta_types::SnapshotData;
use common_meta_types::StoredMembership;
use common_meta_types::TxnReply;
use common_meta_types::TxnRequest;
use common_meta_types::UpsertKV;
Expand All @@ -59,6 +54,7 @@ use crate::sm_v002::leveled_store::map_api::AsMap;
use crate::sm_v002::leveled_store::map_api::MapApi;
use crate::sm_v002::leveled_store::map_api::MapApiExt;
use crate::sm_v002::leveled_store::map_api::MapApiRO;
use crate::sm_v002::leveled_store::sys_data::SysData;
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use crate::sm_v002::marked::Marked;
use crate::sm_v002::sm_v002;
Expand All @@ -82,7 +78,7 @@ impl<'a> kvapi::KVApi for SMV002KVApi<'a> {
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let got = self.sm.get_kv(key).await;
let got = self.sm.get_maybe_expired_kv(key).await;

let local_now_ms = SeqV::<()>::now_ms();
let got = Self::non_expired(got, local_now_ms);
Expand All @@ -95,7 +91,7 @@ impl<'a> kvapi::KVApi for SMV002KVApi<'a> {
let mut values = Vec::with_capacity(keys.len());

for k in keys {
let got = self.sm.get_kv(k.as_str()).await;
let got = self.sm.get_maybe_expired_kv(k.as_str()).await;
let v = Self::non_expired(got, local_now_ms);
values.push(v);
}
Expand Down Expand Up @@ -188,11 +184,13 @@ impl SMV002 {
// The snapshot is empty but contains Nodes data that are manually added.
//
// See: `databend_metactl::snapshot`
if &new_last_applied <= sm.last_applied_ref() && sm.last_applied_ref().is_some() {
if &new_last_applied <= sm.sys_data_ref().last_applied_ref()
&& sm.sys_data_ref().last_applied_ref().is_some()
{
info!(
"no need to install: snapshot({:?}) <= sm({:?})",
new_last_applied,
sm.last_applied_ref()
sm.sys_data_ref().last_applied_ref()
);
return Ok(());
}
Expand Down Expand Up @@ -254,7 +252,7 @@ impl SMV002 {
/// Get a cloned value by key.
///
/// It does not check expiration of the returned entry.
pub async fn get_kv(&self, key: &str) -> Option<SeqV> {
pub async fn get_maybe_expired_kv(&self, key: &str) -> Option<SeqV> {
let got = self.levels.str_map().get(key).await;
Into::<Option<SeqV>>::into(got)
}
Expand Down Expand Up @@ -304,41 +302,18 @@ impl SMV002 {
.range(&self.expire_cursor..)
.await
// Return only non-deleted records
.filter_map(|(k, v)| async move {
//
v.unpack().map(|(v, _v_meta)| (k, v))
.filter_map(|(k, marked)| {
let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v));
future::ready(expire_entry)
})
}

pub fn curr_seq(&self) -> u64 {
self.levels.writable_ref().curr_seq()
}

pub fn last_applied_ref(&self) -> &Option<LogId> {
self.levels.writable_ref().last_applied_ref()
}

pub fn last_membership_ref(&self) -> &StoredMembership {
self.levels.writable_ref().last_membership_ref()
}

pub fn nodes_ref(&self) -> &BTreeMap<NodeId, Node> {
self.levels.writable_ref().nodes_ref()
pub fn sys_data_ref(&self) -> &SysData {
self.levels.writable_ref().sys_data_ref()
}

pub fn last_applied_mut(&mut self) -> &mut Option<LogId> {
self.levels.writable_mut().sys_data_mut().last_applied_mut()
}

pub fn last_membership_mut(&mut self) -> &mut StoredMembership {
self.levels
.writable_mut()
.sys_data_mut()
.last_membership_mut()
}

pub fn nodes_mut(&mut self) -> &mut BTreeMap<NodeId, Node> {
self.levels.writable_mut().sys_data_mut().nodes_mut()
pub fn sys_data_mut(&mut self) -> &mut SysData {
self.levels.writable_mut().sys_data_mut()
}

pub fn set_subscriber(&mut self, subscriber: Box<dyn StateMachineSubscriber>) {
Expand Down Expand Up @@ -376,14 +351,16 @@ impl SMV002 {
self.expire_cursor = ExpireKey::new(0, 0);
}

/// Keep the top(writable) level, replace the base level and all levels below it.
pub fn replace_base(&mut self, snapshot: &SnapshotViewV002) {
/// Keep the top(writable) level, replace all the frozen levels.
///
/// This is called after compacting some of the frozen levels.
pub fn replace_frozen(&mut self, snapshot: &SnapshotViewV002) {
assert!(
Arc::ptr_eq(
self.levels.frozen_ref().newest().unwrap(),
snapshot.original_ref().newest().unwrap()
),
"the base must not be changed"
"the frozen must not change"
);

self.levels.replace_frozen(snapshot.compacted());
Expand Down
Loading

0 comments on commit 6061e7e

Please sign in to comment.