Skip to content

Commit

Permalink
[store] feature: sled backed raft state
Browse files Browse the repository at this point in the history
- When starting a MetaNode(and a DFS that depends on a MetaNode),
  assign a on-disk dir for it to store persisted raft state, including:
  `node_id`, `current_term`, `voted_for`.

- Add: RaftState, which is a disk backed store of raft state.

- Change: MetaStore::new() now accepts a complete Config instead of a
  single listening addr.

- Test: add new_test_context() to create a temp env for testing MetaNode
  or DFS, which includes a temp meta store dir, unique port for meta
  service etc.

- Test: all tests that invovles a MetaStore assign a temp dir for meta
  storage.
  • Loading branch information
drmingdrmer committed Jul 15, 2021
1 parent 97f77a7 commit 3e5e3ae
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 94 deletions.
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions common/exception/src/exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ build_exceptions! {

InvalidConfig(2301),

// meta store errors

MetaStoreDamaged(2401),
MetaStoreAlreadyExists(2401),


// TODO
// We may need to separate front-end errors from API errors (and system errors?)
Expand Down
1 change: 1 addition & 0 deletions fusestore/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ prost = "0.7"
rand = "0.8.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sled = { version = "0.34.6", features = ["event_log", "pretty_backtrace"]}
structopt = "0.3"
structopt-toml = "0.4.5"
tempfile = "3.2.0"
Expand Down
9 changes: 6 additions & 3 deletions fusestore/store/src/api/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use common_arrow::arrow_flight::flight_service_server::FlightServiceServer;
use common_tracing::tracing;
use tonic::transport::Server;

use crate::api::rpc::StoreFlightImpl;
Expand All @@ -29,19 +30,21 @@ impl StoreServer {
.flight_api_address
.parse::<std::net::SocketAddr>()?;

tracing::info!("flight addr: {}", addr);

// TODO(xp): add local fs dir to config and use it.
let p = tempfile::tempdir()?;
let fs = LocalFS::try_create(p.path().to_str().unwrap().into())?;

let meta_addr = format!("{}:{}", self.conf.meta_api_host, self.conf.meta_api_port);

// TODO(xp): support non-boot mode.
// for now it can only be run in single-node cluster mode.
// if !self.conf.boot {
// todo!("non-boot mode is not impl yet")
// }

let mn = MetaNode::boot(0, meta_addr.clone()).await?;
let mn = MetaNode::boot(0, &self.conf).await?;

tracing::info!("boot done");

let dfs = Dfs::create(fs, mn.clone());

Expand Down
4 changes: 4 additions & 0 deletions fusestore/store/src/configs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ impl Config {
pub fn empty() -> Self {
Self::from_iter(&Vec::<&'static str>::new())
}

pub fn meta_api_addr(&self) -> String {
format!("{}:{}", self.meta_api_host, self.meta_api_port)
}
}
8 changes: 5 additions & 3 deletions fusestore/store/src/dfs/distributed_fs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::meta_service::GetReq;
use crate::meta_service::MetaNode;
use crate::meta_service::MetaServiceClient;
use crate::tests::assert_meta_connection;
use crate::tests::rand_local_addr;
use crate::tests::service::new_test_context;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_distributed_fs_single_node_read_all() -> anyhow::Result<()> {
Expand Down Expand Up @@ -113,8 +113,10 @@ async fn bring_up_dfs(root: &TempDir, files: HashMap<&str, &str>) -> anyhow::Res
let root = root.path().to_str().unwrap().to_string();
let fs = LocalFS::try_create(root)?;

let meta_addr = rand_local_addr();
let mn = MetaNode::boot(0, meta_addr.clone()).await?;
let tc = new_test_context();
let meta_addr = tc.config.meta_api_addr();

let mn = MetaNode::boot(0, &tc.config).await?;
assert_meta_connection(&meta_addr).await?;

let dfs = Dfs::create(fs, mn);
Expand Down
9 changes: 5 additions & 4 deletions fusestore/store/src/executor/action_handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::executor::ActionHandler;
use crate::fs::FileSystem;
use crate::localfs::LocalFS;
use crate::meta_service::MetaNode;
use crate::tests::rand_local_addr;
use crate::tests::service::new_test_context;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_action_handler_do_pull_file() -> anyhow::Result<()> {
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn test_action_handler_get_database() -> anyhow::Result<()> {
engine: DatabaseEngineType::Local,
options: Default::default(),
};
let cba = CreateDatabaseAction { plan: plan };
let cba = CreateDatabaseAction { plan };
hdlr.handle(cba).await?;
}

Expand Down Expand Up @@ -215,8 +215,9 @@ async fn bring_up_dfs_action_handler(
) -> anyhow::Result<ActionHandler> {
let fs = LocalFS::try_create(root.to_str().unwrap().to_string())?;

let meta_addr = rand_local_addr();
let mn = MetaNode::boot(0, meta_addr.clone()).await?;
let tc = new_test_context();

let mn = MetaNode::boot(0, &tc.config).await?;

let dfs = Dfs::create(fs, mn.clone());

Expand Down
33 changes: 20 additions & 13 deletions fusestore/store/src/meta_service/meta_service_impl_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ use crate::meta_service::MetaNode;
use crate::meta_service::MetaServiceClient;
use crate::meta_service::RetryableError;
use crate::tests::assert_meta_connection;
use crate::tests::rand_local_addr;
use crate::tests::service::new_test_context;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_server_add_file() -> anyhow::Result<()> {
common_tracing::init_default_tracing();

let addr = rand_local_addr();
let tc = new_test_context();
let addr = tc.config.meta_api_addr();

let _mn = MetaNode::boot(0, addr.clone()).await?;
let _mn = MetaNode::boot(0, &tc.config).await?;
assert_meta_connection(&addr).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;
Expand Down Expand Up @@ -61,9 +62,10 @@ async fn test_meta_server_add_file() -> anyhow::Result<()> {
async fn test_meta_server_set_file() -> anyhow::Result<()> {
common_tracing::init_default_tracing();

let addr = rand_local_addr();
let tc = new_test_context();
let addr = tc.config.meta_api_addr();

let _mn = MetaNode::boot(0, addr.clone()).await?;
let _mn = MetaNode::boot(0, &tc.config).await?;
assert_meta_connection(&addr).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;
Expand Down Expand Up @@ -102,9 +104,10 @@ async fn test_meta_server_add_set_get() -> anyhow::Result<()> {

common_tracing::init_default_tracing();

let addr = rand_local_addr();
let tc = new_test_context();
let addr = tc.config.meta_api_addr();

let _mn = MetaNode::boot(0, addr.clone()).await?;
let _mn = MetaNode::boot(0, &tc.config).await?;
assert_meta_connection(&addr).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;
Expand Down Expand Up @@ -196,9 +199,10 @@ async fn test_meta_server_add_set_get() -> anyhow::Result<()> {
async fn test_meta_server_incr_seq() -> anyhow::Result<()> {
common_tracing::init_default_tracing();

let addr = rand_local_addr();
let tc = new_test_context();
let addr = tc.config.meta_api_addr();

let _mn = MetaNode::boot(0, addr.clone()).await?;
let _mn = MetaNode::boot(0, &tc.config).await?;
assert_meta_connection(&addr).await?;

let mut client = MetaServiceClient::connect(format!("http://{}", addr)).await?;
Expand Down Expand Up @@ -234,15 +238,18 @@ async fn test_meta_cluster_write_on_non_leader() -> anyhow::Result<()> {

common_tracing::init_default_tracing();

let addr0 = rand_local_addr();
let addr1 = rand_local_addr();
let tc0 = new_test_context();
let tc1 = new_test_context();

let mn0 = MetaNode::boot(0, addr0.clone()).await?;
let addr0 = tc0.config.meta_api_addr();
let addr1 = tc1.config.meta_api_addr();

let mn0 = MetaNode::boot(0, &tc0.config).await?;
assert_meta_connection(&addr0).await?;

{
// add node 1 as non-voter
let mn1 = MetaNode::boot_non_voter(1, &addr1).await?;
let mn1 = MetaNode::boot_non_voter(1, &tc1.config).await?;
assert_meta_connection(&addr0).await?;

let resp = mn0.add_node(1, addr1.clone()).await?;
Expand Down
3 changes: 3 additions & 0 deletions fusestore/store/src/meta_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub use crate::protobuf::RaftMes;
mod meta_service_impl_test;
#[cfg(test)]
mod placement_test;
mod raft_state;
#[cfg(test)]
mod raft_state_test;
#[cfg(test)]
mod raftmeta_test;
#[cfg(test)]
Expand Down
Loading

0 comments on commit 3e5e3ae

Please sign in to comment.