Skip to content

Commit

Permalink
Merge pull request #728 from drmingdrmer/meta-seq
Browse files Browse the repository at this point in the history
[store/meta] feature: generate unique auto-incr number
  • Loading branch information
databend-bot authored Jun 4, 2021
2 parents 934aa55 + ce75d1e commit d919229
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 30 deletions.
15 changes: 15 additions & 0 deletions fusestore/store/src/meta_service/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct Meta {
/// The file names stored in this cluster
pub keys: BTreeMap<String, String>,

/// storage of auto-incremental number.
pub sequences: BTreeMap<String, u64>,

// cluster nodes, key distribution etc.
pub slots: Vec<Slot>,
pub nodes: HashMap<NodeId, Node>,
Expand Down Expand Up @@ -74,6 +77,7 @@ impl MetaBuilder {

let mut m = Meta {
keys: BTreeMap::new(),
sequences: BTreeMap::new(),
slots: Vec::with_capacity(initial_slots as usize),
nodes: HashMap::new(),
replication,
Expand Down Expand Up @@ -113,6 +117,17 @@ impl Meta {
Ok((prev, Some(value.clone())).into())
}

Cmd::IncrSeq { ref key } => {
let prev = self.sequences.get(key);
let curr = match prev {
Some(v) => v + 1,
None => 1,
};
self.sequences.insert(key.clone(), curr);
tracing::info!("applied IncrSeq: {}={}", key, curr);
Ok(curr.into())
}

Cmd::AddNode {
ref node_id,
ref node,
Expand Down
41 changes: 31 additions & 10 deletions fusestore/store/src/meta_service/meta_service_impl_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@ use crate::meta_service::Cmd;
use crate::meta_service::GetReq;
use crate::meta_service::MetaNode;
use crate::meta_service::MetaServiceClient;
use crate::meta_service::MetaServiceImpl;
use crate::meta_service::MetaServiceServer;
use crate::tests::rand_local_addr;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_server_set_get() -> anyhow::Result<()> {
let addr = rand_local_addr();

let rst = MetaNode::boot(0, addr.clone()).await;
assert!(rst.is_ok());
let mn = rst.unwrap();

let meta_srv_impl = MetaServiceImpl::create(mn);
let meta_srv = MetaServiceServer::new(meta_srv_impl);

serve_grpc!(addr, meta_srv);
let _mn = MetaNode::boot(0, addr.clone()).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;

Expand Down Expand Up @@ -107,3 +98,33 @@ async fn test_meta_server_set_get() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_server_incr_seq() -> anyhow::Result<()> {
let addr = rand_local_addr();

let _mn = MetaNode::boot(0, addr.clone()).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;

let cases = crate::meta_service::raftmeta_test::cases_incr_seq();

for (name, txid, k, want) in cases.iter() {
let req = ClientRequest {
txid: txid.clone(),
cmd: Cmd::IncrSeq { key: k.to_string() },
};
let rst = client.write(req).await?.into_inner();
let resp: ClientResponse = rst.into();
match resp {
ClientResponse::Seq { seq } => {
assert_eq!(*want, seq, "{}", name);
}
_ => {
panic!("not Seq")
}
}
}

Ok(())
}
34 changes: 34 additions & 0 deletions fusestore/store/src/meta_service/meta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// SPDX-License-Identifier: Apache-2.0.

use crate::meta_service::meta::Replication;
use crate::meta_service::ClientRequest;
use crate::meta_service::ClientResponse;
use crate::meta_service::Cmd;
use crate::meta_service::Meta;
use crate::meta_service::Node;
use crate::meta_service::Slot;
Expand Down Expand Up @@ -90,3 +93,34 @@ fn test_meta_builder() -> anyhow::Result<()> {
assert_eq!(8, n);
Ok(())
}

// TODO test apply:AddFile,SetFile,AddNode

#[test]
fn test_meta_apply_incr_seq() -> anyhow::Result<()> {
let mut m = Meta::builder().build()?;

for i in 0..3 {
// incr "foo"

let resp = m.apply(&ClientRequest {
txid: None,
cmd: Cmd::IncrSeq {
key: "foo".to_string(),
},
})?;
assert_eq!(ClientResponse::Seq { seq: i + 1 }, resp);

// incr "bar"

let resp = m.apply(&ClientRequest {
txid: None,
cmd: Cmd::IncrSeq {
key: "bar".to_string(),
},
})?;
assert_eq!(ClientResponse::Seq { seq: i + 1 }, resp);
}

Ok(())
}
27 changes: 24 additions & 3 deletions fusestore/store/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,16 @@ const ERR_INCONSISTENT_LOG: &str =
/// A Cmd is committed by raft leader before being applied.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Cmd {
// AKA put-if-absent. add a key-value record only when key is absent.
/// AKA put-if-absent. add a key-value record only when key is absent.
AddFile { key: String, value: String },
// Override the record with key.

/// Override the record with key.
SetFile { key: String, value: String },
// Add node if absent

/// Increment the sequence number generator specified by `key` and returns the new value.
IncrSeq { key: String },

/// Add node if absent
AddNode { node_id: NodeId, node: Node },
}

Expand All @@ -75,6 +80,9 @@ impl fmt::Display for Cmd {
Cmd::SetFile { key, value } => {
write!(f, "setfile:{}={}", key, value)
}
Cmd::IncrSeq { key } => {
write!(f, "incr_seq:{}", key)
}
Cmd::AddNode { node_id, node } => {
write!(f, "addnode:{}={}", node_id, node)
}
Expand Down Expand Up @@ -171,6 +179,9 @@ pub enum ClientResponse {
// The value after applying a ClientRequest.
result: Option<String>,
},
Seq {
seq: u64,
},
Node {
prev: Option<Node>,
result: Option<Node>,
Expand All @@ -185,19 +196,22 @@ impl From<ClientResponse> for RaftMes {
RaftMes { data }
}
}

impl From<RaftMes> for ClientResponse {
fn from(msg: RaftMes) -> Self {
let resp: ClientResponse = serde_json::from_str(&msg.data).expect("fail to deserialize");
resp
}
}

impl From<tonic::Response<RaftMes>> for ClientResponse {
fn from(v: tonic::Response<RaftMes>) -> Self {
let mes = v.into_inner();
let resp: ClientResponse = serde_json::from_str(&mes.data).expect("fail to deserialize");
resp
}
}

impl From<(Option<String>, Option<String>)> for ClientResponse {
fn from(v: (Option<String>, Option<String>)) -> Self {
ClientResponse::String {
Expand All @@ -206,6 +220,13 @@ impl From<(Option<String>, Option<String>)> for ClientResponse {
}
}
}

impl From<u64> for ClientResponse {
fn from(seq: u64) -> Self {
ClientResponse::Seq { seq }
}
}

impl From<(Option<Node>, Option<Node>)> for ClientResponse {
fn from(v: (Option<Node>, Option<Node>)) -> Self {
ClientResponse::Node {
Expand Down
45 changes: 45 additions & 0 deletions fusestore/store/src/meta_service/raftmeta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,28 @@ use crate::meta_service::NodeId;
use crate::meta_service::RaftTxId;
use crate::tests::Seq;

// test cases fro Cmd::IncrSeq:
// case_name, txid, key, want
pub fn cases_incr_seq() -> Vec<(&'static str, Option<RaftTxId>, &'static str, u64)> {
vec![
("incr on none", Some(RaftTxId::new("foo", 1)), "k1", 1),
("incr on existent", Some(RaftTxId::new("foo", 2)), "k1", 2),
(
"dup: same serial, even with diff key, got the previous result",
Some(RaftTxId::new("foo", 2)),
"k2",
2,
),
(
"diff client, same serial, not a dup request",
Some(RaftTxId::new("bar", 2)),
"k2",
1,
),
("no txid, no de-dup", None, "k2", 2),
]
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_state_machine_apply_add_file() -> anyhow::Result<()> {
crate::tests::init_tracing();
Expand Down Expand Up @@ -167,6 +189,29 @@ async fn test_state_machine_apply_set_file() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_state_machine_apply_incr_seq() -> anyhow::Result<()> {
crate::tests::init_tracing();

let cases = cases_incr_seq();

let mut sm = MemStoreStateMachine::default();
for (name, txid, k, want) in cases.iter() {
let resp = sm.apply(5, &ClientRequest {
txid: txid.clone(),
cmd: Cmd::IncrSeq { key: k.to_string() },
});
assert_eq!(
ClientResponse::Seq { seq: *want },
resp.unwrap(),
"{}",
name
);
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_node_boot() -> anyhow::Result<()> {
// - Start a single node meta service cluster.
Expand Down
17 changes: 0 additions & 17 deletions fusestore/store/src/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,3 @@ pub fn rand_local_addr() -> String {
let addr = format!("127.0.0.1:{}", port);
return addr;
}

macro_rules! serve_grpc {
($addr:expr, $srv:expr) => {
let addr = $addr.parse::<std::net::SocketAddr>()?;

let srv = tonic::transport::Server::builder().add_service($srv);

tokio::spawn(async move {
srv.serve(addr)
.await
.map_err(|e| anyhow::anyhow!("Flight service error: {:?}", e))?;
Ok::<(), anyhow::Error>(())
});

tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
};
}

0 comments on commit d919229

Please sign in to comment.