Skip to content

Commit

Permalink
Merge pull request #1479 from drmingdrmer/upgrade-raft
Browse files Browse the repository at this point in the history
[store] feature: applies all logs to raft state machine.
  • Loading branch information
databend-bot authored Aug 16, 2021
2 parents a4e9ac2 + b309fdf commit b6ba0b3
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ common-tracing = {path = "../common/tracing"}

# Crates.io dependencies
anyhow = "1.0.42"
async-raft = { git = "https://github.com/datafuse-extras/async-raft", tag = "v0.6.2-alpha.7" }
async-raft = { git = "https://github.com/datafuse-extras/async-raft", tag = "v0.6.2-alpha.8" }
async-trait = "0.1"
byteorder = "1.1.0"
env_logger = "0.9"
Expand Down
2 changes: 2 additions & 0 deletions store/src/meta_service/applied_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum AppliedState {
prev: Option<SeqValue<KVValue>>,
result: Option<SeqValue<KVValue>>,
},

None,
}

impl AppDataResponse for AppliedState {}
Expand Down
14 changes: 5 additions & 9 deletions store/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,22 +317,18 @@ impl RaftStorage<LogEntry, AppliedState> for MetaStore {
#[tracing::instrument(level = "info", skip(self), fields(id=self.id))]
async fn apply_entry_to_state_machine(
&self,
index: &LogId,
data: &LogEntry,
entry: &Entry<LogEntry>,
) -> anyhow::Result<AppliedState> {
let mut sm = self.state_machine.write().await;
let resp = sm.apply(index, data).await?;
let resp = sm.apply(entry).await?;
Ok(resp)
}

#[tracing::instrument(level = "info", skip(self, entries), fields(id=self.id))]
async fn replicate_to_state_machine(
&self,
entries: &[(&LogId, &LogEntry)],
) -> anyhow::Result<()> {
async fn replicate_to_state_machine(&self, entries: &[&Entry<LogEntry>]) -> anyhow::Result<()> {
let mut sm = self.state_machine.write().await;
for (index, data) in entries {
sm.apply(*index, data).await?;
for entry in entries {
sm.apply(*entry).await?;
}
Ok(())
}
Expand Down
51 changes: 36 additions & 15 deletions store/src/meta_service/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::fs::remove_dir_all;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use async_raft::LogId;
use async_raft::raft::Entry;
use async_raft::raft::EntryPayload;
use common_exception::prelude::ErrorCode;
use common_exception::ToErrorCode;
use common_flights::storage_api_impl::AppendResult;
Expand All @@ -42,6 +43,7 @@ use crate::meta_service::placement::rand_n_from_m;
use crate::meta_service::raft_db::get_sled_db;
use crate::meta_service::sled_key_space;
use crate::meta_service::sled_key_space::StateMachineMeta;
use crate::meta_service::state_machine_meta::StateMachineMetaKey::LastMembership;
use crate::meta_service::AppliedState;
use crate::meta_service::AsKeySpace;
use crate::meta_service::Cmd;
Expand Down Expand Up @@ -334,31 +336,50 @@ impl StateMachine {
#[tracing::instrument(level = "trace", skip(self))]
pub async fn apply(
&mut self,
log_id: &LogId,
data: &LogEntry,
entry: &Entry<LogEntry>,
) -> common_exception::Result<AppliedState> {
// TODO(xp): all update need to be done in a tx.

let log_id = &entry.log_id;

let sm_meta = self.sm_meta();
sm_meta
.insert(&LastApplied, &StateMachineMetaValue::LogId(*log_id))
.await?;

if let Some(ref txid) = data.txid {
if let Some((serial, resp)) = self.client_last_resp.get(&txid.client) {
if serial == &txid.serial {
return Ok(resp.clone());
match entry.payload {
EntryPayload::Blank => {}
EntryPayload::Normal(ref norm) => {
let data = &norm.data;
if let Some(ref txid) = data.txid {
if let Some((serial, resp)) = self.client_last_resp.get(&txid.client) {
if serial == &txid.serial {
return Ok(resp.clone());
}
}
}
}
}

let resp = self.apply_non_dup(data).await?;
let resp = self.apply_non_dup(data).await?;

if let Some(ref txid) = data.txid {
self.client_last_resp
.insert(txid.client.clone(), (txid.serial, resp.clone()));
}
Ok(resp)
if let Some(ref txid) = data.txid {
self.client_last_resp
.insert(txid.client.clone(), (txid.serial, resp.clone()));
}
return Ok(resp);
}
EntryPayload::ConfigChange(ref mem) => {
sm_meta
.insert(
&LastMembership,
&StateMachineMetaValue::Membership(mem.membership.clone()),
)
.await?;
return Ok(AppliedState::None);
}
EntryPayload::SnapshotPointer(_) => {}
};

Ok(AppliedState::None)
}

/// Apply an op into state machine.
Expand Down
20 changes: 20 additions & 0 deletions store/src/meta_service/state_machine_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fmt;

use async_raft::raft::MembershipConfig;
use async_raft::LogId;
use common_exception::ErrorCode;
use serde::Deserialize;
Expand All @@ -27,13 +28,18 @@ use crate::meta_service::SledSerde;
pub enum StateMachineMetaKey {
/// The last applied log id in the state machine.
LastApplied,

/// Whether the state machine is initialized.
Initialized,

/// The last membership config
LastMembership,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StateMachineMetaValue {
LogId(LogId),
Bool(bool),
Membership(MembershipConfig),
}

impl fmt::Display for StateMachineMetaKey {
Expand All @@ -45,6 +51,9 @@ impl fmt::Display for StateMachineMetaKey {
StateMachineMetaKey::Initialized => {
write!(f, "initialized")
}
StateMachineMetaKey::LastMembership => {
write!(f, "last-membership")
}
}
}
}
Expand All @@ -54,6 +63,7 @@ impl SledOrderedSerde for StateMachineMetaKey {
let i = match self {
StateMachineMetaKey::LastApplied => 1,
StateMachineMetaKey::Initialized => 2,
StateMachineMetaKey::LastMembership => 3,
};

Ok(IVec::from(&[i]))
Expand All @@ -66,6 +76,8 @@ impl SledOrderedSerde for StateMachineMetaKey {
return Ok(StateMachineMetaKey::LastApplied);
} else if slice[0] == 2 {
return Ok(StateMachineMetaKey::Initialized);
} else if slice[0] == 3 {
return Ok(StateMachineMetaKey::LastMembership);
}

Err(ErrorCode::MetaStoreDamaged("invalid key IVec"))
Expand All @@ -91,3 +103,11 @@ impl From<StateMachineMetaValue> for bool {
}
}
}
impl From<StateMachineMetaValue> for MembershipConfig {
fn from(v: StateMachineMetaValue) -> Self {
match v {
StateMachineMetaValue::Membership(x) => x,
_ => panic!("expect Membership"),
}
}
}
48 changes: 33 additions & 15 deletions store/src/meta_service/state_machine_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use async_raft::raft::Entry;
use async_raft::raft::EntryNormal;
use async_raft::raft::EntryPayload;
use async_raft::LogId;
use common_metatypes::Database;
use common_metatypes::KVMeta;
Expand Down Expand Up @@ -189,9 +192,14 @@ async fn test_state_machine_apply_incr_seq() -> anyhow::Result<()> {

for (name, txid, k, want) in cases.iter() {
let resp = sm
.apply(&LogId { term: 0, index: 5 }, &LogEntry {
txid: txid.clone(),
cmd: Cmd::IncrSeq { key: k.to_string() },
.apply(&Entry {
log_id: LogId { term: 0, index: 5 },
payload: EntryPayload::Normal(EntryNormal {
data: LogEntry {
txid: txid.clone(),
cmd: Cmd::IncrSeq { key: k.to_string() },
},
}),
})
.await?;
assert_eq!(AppliedState::Seq { seq: *want }, resp, "{}", name);
Expand Down Expand Up @@ -541,12 +549,17 @@ async fn test_state_machine_apply_add_file() -> anyhow::Result<()> {

for (name, txid, k, v, want_prev, want_result) in cases.iter() {
let resp = sm
.apply(&LogId { term: 0, index: 5 }, &LogEntry {
txid: txid.clone(),
cmd: Cmd::AddFile {
key: k.to_string(),
value: v.to_string(),
},
.apply(&Entry {
log_id: LogId { term: 0, index: 5 },
payload: EntryPayload::Normal(EntryNormal {
data: LogEntry {
txid: txid.clone(),
cmd: Cmd::AddFile {
key: k.to_string(),
value: v.to_string(),
},
},
}),
})
.await?;
assert_eq!(
Expand Down Expand Up @@ -574,12 +587,17 @@ async fn test_state_machine_apply_set_file() -> anyhow::Result<()> {

for (name, txid, k, v, want_prev, want_result) in cases.iter() {
let resp = sm
.apply(&LogId { term: 0, index: 5 }, &LogEntry {
txid: txid.clone(),
cmd: Cmd::SetFile {
key: k.to_string(),
value: v.to_string(),
},
.apply(&Entry {
log_id: LogId { term: 0, index: 5 },
payload: EntryPayload::Normal(EntryNormal {
data: LogEntry {
txid: txid.clone(),
cmd: Cmd::SetFile {
key: k.to_string(),
value: v.to_string(),
},
},
}),
})
.await?;
assert_eq!(
Expand Down

0 comments on commit b6ba0b3

Please sign in to comment.