Skip to content

Commit

Permalink
refactor: V004 exports more compacted data format
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Nov 9, 2024
1 parent 23ab5c7 commit 7be3b41
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 272 deletions.
10 changes: 10 additions & 0 deletions src/meta/process/src/kv_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
Ok(Some(x))
}

RaftStoreEntry::LogEntry(entry) => {
let x = RaftStoreEntry::LogEntry(unwrap_or_return!(self.proc_raft_entry(entry)?));
Ok(Some(x))
}

RaftStoreEntry::GenericKV { key, value } => {
let data = (self.process_pb)(&key, value.data)?;

Expand All @@ -104,6 +109,11 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
RaftStoreEntry::Sequences { .. } => Ok(None),
RaftStoreEntry::ClientLastResps { .. } => Ok(None),
RaftStoreEntry::LogMeta { .. } => Ok(None),

RaftStoreEntry::NodeId(_) => Ok(None),
RaftStoreEntry::Vote(_) => Ok(None),
RaftStoreEntry::Committed(_) => Ok(None),
RaftStoreEntry::Purged(_) => Ok(None),
}
}

Expand Down
85 changes: 50 additions & 35 deletions src/meta/raft-store/src/key_spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,43 @@ pub enum RaftStoreEntry {
Sequences { key: <Sequences as SledKeySpace>::K, value: <Sequences as SledKeySpace>::V, },
ClientLastResps { key: <ClientLastResps as SledKeySpace>::K, value: <ClientLastResps as SledKeySpace>::V, },
LogMeta { key: <LogMeta as SledKeySpace>::K, value: <LogMeta as SledKeySpace>::V, },

// V004 log:
LogEntry(Entry),
NodeId(Option<NodeId>),
Vote(Option<Vote>),
Committed(Option<LogId>),
Purged(Option<LogId>),
}

impl RaftStoreEntry {
/// Upgrade V003 to V004
pub fn upgrade(self) -> Self {
match self.clone() {
Self::Logs { key: _, value } => Self::LogEntry(value),
Self::RaftStateKV { key: _, value } => match value {
RaftStateValue::NodeId(node_id) => Self::NodeId(Some(node_id)),
RaftStateValue::HardState(vote) => Self::Vote(Some(vote)),
RaftStateValue::Committed(committed) => Self::Committed(committed),
RaftStateValue::StateMachineId(_) => self,
},
Self::LogMeta { key: _, value } => Self::Purged(Some(value.log_id())),

RaftStoreEntry::DataHeader { .. }
| RaftStoreEntry::Nodes { .. }
| RaftStoreEntry::StateMachineMeta { .. }
| RaftStoreEntry::Expire { .. }
| RaftStoreEntry::GenericKV { .. }
| RaftStoreEntry::Sequences { .. }
| RaftStoreEntry::ClientLastResps { .. }
| RaftStoreEntry::LogEntry(_)
| RaftStoreEntry::NodeId(_)
| RaftStoreEntry::Vote(_)
| RaftStoreEntry::Committed(_)
| RaftStoreEntry::Purged(_) => self,
}
}

/// Serialize a key-value entry into a two elt vec of vec<u8>: `[key, value]`.
#[rustfmt::skip]
pub fn serialize(kv: &RaftStoreEntry) -> Result<(sled::IVec, sled::IVec), MetaStorageError> {
Expand All @@ -260,6 +294,14 @@ impl RaftStoreEntry {
Self::Sequences { key, value } => serialize_for_sled!(Sequences, key, value),
Self::ClientLastResps { key, value } => serialize_for_sled!(ClientLastResps, key, value),
Self::LogMeta { key, value } => serialize_for_sled!(LogMeta, key, value),

RaftStoreEntry::LogEntry(_) |
RaftStoreEntry::NodeId(_)|
RaftStoreEntry::Vote(_) |
RaftStoreEntry::Committed(_)|
RaftStoreEntry::Purged(_) => {
unreachable!("V004 entries should not be serialized for sled")
}
}
}

Expand Down Expand Up @@ -297,41 +339,6 @@ impl RaftStoreEntry {
value: header,
}
}

pub fn new_node_id(node_id: NodeId) -> Self {
Self::RaftStateKV {
key: RaftStateKey::Id,
value: RaftStateValue::NodeId(node_id),
}
}

pub fn new_vote(vote: Vote) -> Self {
Self::RaftStateKV {
key: RaftStateKey::HardState,
value: RaftStateValue::HardState(vote),
}
}

pub fn new_committed(committed: Option<LogId>) -> Self {
Self::RaftStateKV {
key: RaftStateKey::Committed,
value: RaftStateValue::Committed(committed),
}
}

pub fn new_purged(purged: LogId) -> Self {
Self::LogMeta {
key: LogMetaKey::LastPurged,
value: LogMetaValue::LogId(purged),
}
}

pub fn new_log_entry(log: Entry) -> Self {
Self::Logs {
key: log.log_id.index,
value: log,
}
}
}

impl TryInto<SMEntry> for RaftStoreEntry {
Expand All @@ -351,6 +358,14 @@ impl TryInto<SMEntry> for RaftStoreEntry {
Self::RaftStateKV { .. } => {Err("SMEntry does not contain RaftStateKV".to_string())}
Self::ClientLastResps { .. } => {Err("SMEntry does not contain ClientLastResps".to_string())}
Self::LogMeta { .. } => {Err("SMEntry does not contain LogMeta".to_string())}


Self::LogEntry (_) => {Err("SMEntry does not contain LogEntry".to_string())}
Self::Vote (_) => {Err("SMEntry does not contain Vote".to_string())}
Self::NodeId (_) => {Err("SMEntry does not contain NodeId".to_string())}
Self::Committed (_) => {Err("SMEntry does not contain Committed".to_string())}
Self::Purged (_) => {Err("SMEntry does not contain Purged".to_string())}

}
}
}
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/ondisk/upgrade_to_v004.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl OnDisk {
let kvs = tree.export()?;
for kv in kvs {
let ent = RaftStoreEntry::deserialize(&kv[0], &kv[1])?;
importer.import_raft_store_entry(ent)?;
importer.import_raft_store_entry(ent.upgrade())?;
}
}

Expand Down
75 changes: 43 additions & 32 deletions src/meta/raft-store/src/raft_log_v004/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use crate::raft_log_v004::codec_wrapper::Cw;
use crate::raft_log_v004::log_store_meta::LogStoreMeta;
use crate::raft_log_v004::util;
use crate::raft_log_v004::RaftLogV004;
use crate::state::RaftStateKey;
use crate::state_machine::LogMetaKey;

/// Import series of [`RaftStoreEntry`] record into [`RaftLogV004`].
///
Expand Down Expand Up @@ -51,55 +49,68 @@ impl Importer {
RaftStoreEntry::DataHeader { .. } => {
// V004 RaftLog does not store DataHeader
}
RaftStoreEntry::Logs { key: _, value } => {
let (log_id, payload) = (value.log_id, value.payload);

//////////////////////////// V004 log ////////////////////////////
RaftStoreEntry::LogEntry(log_entry) => {
let log_id = log_entry.log_id;
let payload = log_entry.payload;

self.raft_log.append([(Cw(log_id), Cw(payload))])?;
self.max_log_id = std::cmp::max(self.max_log_id, Some(value.log_id));
self.max_log_id = std::cmp::max(self.max_log_id, Some(log_id));
}
RaftStoreEntry::RaftStateKV { key, value } => match key {
RaftStateKey::Id => {
self.raft_log.save_user_data(Some(LogStoreMeta {
node_id: Some(value.node_id()),
}))?;
}
RaftStateKey::HardState => {
self.raft_log.save_vote(Cw(value.vote()))?;
}
RaftStateKey::StateMachineId => {
unreachable!("StateMachineId is removed");

RaftStoreEntry::NodeId(node_id) => {
self.raft_log
.save_user_data(Some(LogStoreMeta { node_id }))?;
}

RaftStoreEntry::Vote(vote) => {
if let Some(vote) = vote {
self.raft_log.save_vote(Cw(vote))?;
}
RaftStateKey::Committed => {
if let Some(value) = value.committed() {
self.raft_log.commit(Cw(value))?;
}
}

RaftStoreEntry::Committed(committed) => {
if let Some(committed) = committed {
self.raft_log.commit(Cw(committed))?;
}
},
RaftStoreEntry::LogMeta { key, value } => match key {
LogMetaKey::LastPurged => {
let purged = value.log_id();
}

RaftStoreEntry::Purged(purged) => {
if let Some(purged) = purged {
self.raft_log.purge(Cw(purged))?;
}
},
}

///////////////////////// V003 and before Log ////////////////////
RaftStoreEntry::Logs { .. } => {
unreachable!("V003 Logs should be written to V004 log");
}
RaftStoreEntry::RaftStateKV { .. } => {
unreachable!("V003 RaftStateKV should be written to V004 log");
}
RaftStoreEntry::LogMeta { .. } => {
unreachable!("V003 LogMeta should be written to V004 log");
}

//////////////////////////////////////////////////////////////////
//////////////////////// State machine entries ///////////////////////
RaftStoreEntry::StateMachineMeta { .. } => {
unreachable!("StateMachineMeta should be written to snapshot");
unreachable!("StateMachineMeta should be written to log");
}
RaftStoreEntry::Nodes { .. } => {
unreachable!("Nodes should be written to snapshot");
unreachable!("Nodes should be written to log");
}
RaftStoreEntry::Expire { .. } => {
unreachable!("Expire should be written to snapshot");
unreachable!("Expire should be written to log");
}
RaftStoreEntry::GenericKV { .. } => {
unreachable!("GenericKV should be written to snapshot");
unreachable!("GenericKV should be written to log");
}
RaftStoreEntry::Sequences { .. } => {
unreachable!("Sequences should be written to snapshot");
unreachable!("Sequences should be written to log");
}
RaftStoreEntry::ClientLastResps { .. } => {
unreachable!("ClientLastResps should be written to snapshot");
unreachable!("ClientLastResps should be written to log");
}
}

Expand Down
34 changes: 14 additions & 20 deletions src/meta/service/src/store/store_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_meta_raft_store::ondisk::Header;
use databend_common_meta_raft_store::ondisk::TREE_HEADER;
use databend_common_meta_raft_store::raft_log_v004;
use databend_common_meta_raft_store::raft_log_v004::util;
use databend_common_meta_raft_store::raft_log_v004::Cw;
use databend_common_meta_raft_store::raft_log_v004::RaftLogV004;
use databend_common_meta_raft_store::sm_v003::write_entry::WriteEntry;
use databend_common_meta_raft_store::sm_v003::SnapshotStoreV004;
Expand Down Expand Up @@ -378,34 +379,27 @@ impl StoreInner {

// Export raft state
{
let tree_name = "raft_state";
let tree_name = "raft_log";

if let Some(ud) = &state.user_data {
if let Some(node_id) = ud.node_id {
let entry = RaftStoreEntry::new_node_id(node_id);
yield encode_entry(tree_name, &entry)?;
}
}
let node_id = state.user_data.as_ref().and_then(|ud| ud.node_id);
let entry = RaftStoreEntry::NodeId(node_id);
yield encode_entry(tree_name, &entry)?;

if let Some(vote) = state.vote() {
let vote = (*vote).unpack();
let entry = RaftStoreEntry::new_vote(vote);
yield encode_entry(tree_name, &entry)?;
}
let vote = state.vote().map(Cw::to_inner);
let entry = RaftStoreEntry::Vote(vote);
yield encode_entry(tree_name, &entry)?;

let committed = state.committed().map(|x| (*x).unpack());
let entry = RaftStoreEntry::new_committed(committed);
let committed = state.committed().map(Cw::to_inner);
let entry = RaftStoreEntry::Committed(committed);
yield encode_entry(tree_name, &entry)?;
};

{
let tree_name = "raft_log";

let purged = state.purged().map(|x| (*x).unpack());
if let Some(purged) = purged {
let entry = RaftStoreEntry::new_purged(purged);
yield encode_entry(tree_name, &entry)?;
}
let purged = state.purged().map(Cw::to_inner);
let entry = RaftStoreEntry::Purged(purged);
yield encode_entry(tree_name, &entry)?;

for res in dump.iter() {
let (log_id, payload) = res?;
Expand All @@ -414,7 +408,7 @@ impl StoreInner {

let log_entry = Entry { log_id, payload };

let entry = RaftStoreEntry::new_log_entry(log_entry);
let entry = RaftStoreEntry::LogEntry(log_entry);
yield encode_entry(tree_name, &entry)?;
}
}
Expand Down
21 changes: 11 additions & 10 deletions src/meta/service/tests/it/grpc/metasrv_grpc_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,17 @@ async fn test_export() -> anyhow::Result<()> {

let want = vec![
r#"["header",{"DataHeader":{"key":"header","value":{"version":"V004"}}}]"#,
r#"["raft_state",{"RaftStateKV":{"key":"Id","value":{"NodeId":0}}}]"#,
r#"["raft_state",{"RaftStateKV":{"key":"HardState","value":{"HardState":{"leader_id":{"term":1,"node_id":0},"committed":true}}}}]"#,
r#"["raft_state",{"RaftStateKV":{"key":"Committed","value":{"Committed":{"leader_id":{"term":1,"node_id":0},"index":6}}}}]"#,
r#"["raft_log",{"Logs":{"key":0,"value":{"log_id":{"leader_id":{"term":0,"node_id":0},"index":0},"payload":{"Membership":{"configs":[[0]],"nodes":{"0":{}}}}}}}]"#,
r#"["raft_log",{"Logs":{"key":1,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":1},"payload":"Blank"}}}]"#,
r#"["raft_log",{"Logs":{"key":2,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":2},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"AddNode":{"node_id":0,"node":{"name":"0","endpoint":{"addr":"localhost","port":29000},"grpc_api_advertise_address":"127.0.0.1:29000"},"overriding":false}}}}}}}]"#,
r#"["raft_log",{"Logs":{"key":3,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":3},"payload":{"Membership":{"configs":[[0]],"nodes":{"0":{}}}}}}}]"#,
r#"["raft_log",{"Logs":{"key":4,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":4},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"foo","seq":{"GE":0},"value":{"Update":[102,111,111]},"value_meta":null}}}}}}}]"#,
r#"["raft_log",{"Logs":{"key":5,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":5},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"bar","seq":{"GE":0},"value":{"Update":[98,97,114]},"value_meta":null}}}}}}}]"#,
r#"["raft_log",{"Logs":{"key":6,"value":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":6},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"wow","seq":{"GE":0},"value":{"Update":[119,111,119]},"value_meta":null}}}}}}}]"#,
r#"["raft_log",{"NodeId":0}]"#,
r#"["raft_log",{"Vote":{"leader_id":{"term":1,"node_id":0},"committed":true}}]"#,
r#"["raft_log",{"Committed":{"leader_id":{"term":1,"node_id":0},"index":6}}]"#,
r#"["raft_log",{"Purged":null}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":0,"node_id":0},"index":0},"payload":{"Membership":{"configs":[[0]],"nodes":{"0":{}}}}}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":1},"payload":"Blank"}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":2},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"AddNode":{"node_id":0,"node":{"name":"0","endpoint":{"addr":"localhost","port":29000},"grpc_api_advertise_address":"127.0.0.1:29000"},"overriding":false}}}}}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":3},"payload":{"Membership":{"configs":[[0]],"nodes":{"0":{}}}}}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":4},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"foo","seq":{"GE":0},"value":{"Update":[102,111,111]},"value_meta":null}}}}}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":5},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"bar","seq":{"GE":0},"value":{"Update":[98,97,114]},"value_meta":null}}}}}}]"#,
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":6},"payload":{"Normal":{"txid":null,"time_ms":1111111111111,"cmd":{"UpsertKV":{"key":"wow","seq":{"GE":0},"value":{"Update":[119,111,119]},"value_meta":null}}}}}}]"#,
r#"["state_machine/0",{"Sequences":{"key":"generic-kv","value":3}}]"#,
r#"["state_machine/0",{"StateMachineMeta":{"key":"LastApplied","value":{"LogId":{"leader_id":{"term":1,"node_id":0},"index":6}}}}]"#,
r#"["state_machine/0",{"StateMachineMeta":{"key":"LastMembership","value":{"Membership":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":3},"membership":{"configs":[[0]],"nodes":{"0":{}}}}}}}]"#,
Expand Down
1 change: 1 addition & 0 deletions tests/metactl/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/_meta_dir
/exported
/grpc_exported
Loading

0 comments on commit 7be3b41

Please sign in to comment.