Skip to content

Commit

Permalink
Change: move RaftStateMachine out of RaftStorage
Browse files Browse the repository at this point in the history
In Raft, the state machine is an independent storage component that
operates separately from the log store. As a result, accessing the log
store and accessing the state machine can be naturally parallelized.

This commit replaces the type parameter `RaftStorage` in
`Raft<.., S: RaftStorage>` with two type parameters: `RaftLogStorage` and
`RaftStateMachine`.

- Add: `RaftLogReaderExt` to provide additional log access methods based
  on a `RaftLogReader` implementation. Some of the methods are moved
  from `StorageHelper` to this trait.

- Add: `Adapter` to let application use the seperated log state machine
  framework without rewriting `RaftStorage` implementation.

- Refactor: shorten type names for the 2 example crates

### TODO

- [ ] Callback based log append is defined but is not used.

### Upgrade tip

Use an adapter to wrap `RaftStorage`:
```rust
// Before:
let store = MyRaftStorage::new();
Raft::new(..., store);

// After:
let store = MyRaftStorage::new();
let (log_store, sm) = Adaptoer::new(store);
Raft::new(..., log_store, sm);
```
  • Loading branch information
drmingdrmer committed Apr 12, 2023
1 parent 8f597f6 commit eaf45df
Show file tree
Hide file tree
Showing 65 changed files with 1,686 additions and 1,073 deletions.
18 changes: 8 additions & 10 deletions examples/raft-kv-memstore/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::sync::Arc;

use openraft::Config;

use crate::ExampleNodeId;
use crate::ExampleRaft;
use crate::ExampleStore;
use crate::NodeId;
use crate::Raft;
use crate::Store;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
pub struct ExampleApp {
pub id: ExampleNodeId,
pub struct App {
pub id: NodeId,
pub addr: String,
pub raft: ExampleRaft,
pub store: Arc<ExampleStore>,
pub config: Arc<Config>,
pub raft: Raft,
pub store: Arc<Store>,
pub config: Arc<openraft::Config>,
}
6 changes: 0 additions & 6 deletions examples/raft-kv-memstore/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use clap::Parser;
use openraft::Raft;
use raft_kv_memstore::network::raft_network_impl::ExampleNetwork;
use raft_kv_memstore::start_example_raft_node;
use raft_kv_memstore::store::ExampleStore;
use raft_kv_memstore::ExampleTypeConfig;
use tracing_subscriber::EnvFilter;

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, ExampleStore>;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
Expand Down
34 changes: 15 additions & 19 deletions examples/raft-kv-memstore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ use std::time::Duration;

use openraft::error::ForwardToLeader;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RemoteError;
use openraft::BasicNode;
use openraft::RaftMetrics;
use openraft::TryAsRef;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::timeout;

use crate::typ;
use crate::ExampleNodeId;
use crate::ExampleRequest;
use crate::NodeId;
use crate::Request;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Empty {}
Expand All @@ -27,14 +25,14 @@ pub struct ExampleClient {
/// The leader node to send request to.
///
/// All traffic should be sent to the leader in a cluster.
pub leader: Arc<Mutex<(ExampleNodeId, String)>>,
pub leader: Arc<Mutex<(NodeId, String)>>,

pub inner: Client,
pub inner: reqwest::Client,
}

impl ExampleClient {
/// Create a client with a leader node id and a node manager to get node address by node id.
pub fn new(leader_id: ExampleNodeId, leader_addr: String) -> Self {
pub fn new(leader_id: NodeId, leader_addr: String) -> Self {
Self {
leader: Arc::new(Mutex::new((leader_id, leader_addr))),
inner: reqwest::Client::new(),
Expand All @@ -49,10 +47,7 @@ impl ExampleClient {
/// will be applied to state machine.
///
/// The result of applying the request will be returned.
pub async fn write(
&self,
req: &ExampleRequest,
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
pub async fn write(&self, req: &Request) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("write", Some(req)).await
}

Expand Down Expand Up @@ -87,7 +82,7 @@ impl ExampleClient {
/// The node to add has to exist, i.e., being added with `write(ExampleRequest::AddNode{})`
pub async fn add_learner(
&self,
req: (ExampleNodeId, String),
req: (NodeId, String),
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("add-learner", Some(&req)).await
}
Expand All @@ -98,7 +93,7 @@ impl ExampleClient {
/// or an error [`LearnerNotFound`] will be returned.
pub async fn change_membership(
&self,
req: &BTreeSet<ExampleNodeId>,
req: &BTreeSet<NodeId>,
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("change-membership", Some(req)).await
}
Expand All @@ -108,7 +103,7 @@ impl ExampleClient {
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, typ::RPCError> {
pub async fn metrics(&self) -> Result<RaftMetrics<NodeId, BasicNode>, typ::RPCError> {
self.do_send_rpc_to_leader("metrics", None::<&()>).await
}

Expand All @@ -118,7 +113,8 @@ impl ExampleClient {
///
/// It sends out a POST request if `req` is Some. Otherwise a GET request.
/// The remote endpoint must respond a reply in form of `Result<T, E>`.
/// An `Err` happened on remote will be wrapped in an [`RPCError::RemoteError`].
/// An `Err` happened on remote will be wrapped in an
/// [`openraft::error::RPCError::RemoteError`].
async fn do_send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
Expand Down Expand Up @@ -150,22 +146,22 @@ impl ExampleClient {

let res = timeout(Duration::from_millis(3_000), fu).await;
let resp = match res {
Ok(x) => x.map_err(|e| RPCError::Network(NetworkError::new(&e)))?,
Ok(x) => x.map_err(|e| typ::RPCError::Network(NetworkError::new(&e)))?,
Err(timeout_err) => {
tracing::error!("timeout {} to url: {}", timeout_err, url);
return Err(RPCError::Network(NetworkError::new(&timeout_err)));
return Err(typ::RPCError::Network(NetworkError::new(&timeout_err)));
}
};

let res: Result<Resp, typ::RaftError<Err>> =
resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
resp.json().await.map_err(|e| typ::RPCError::Network(NetworkError::new(&e)))?;
tracing::debug!(
"<<< client recv reply from {}: {}",
url,
serde_json::to_string_pretty(&res).unwrap()
);

res.map_err(|e| RPCError::RemoteError(RemoteError::new(leader_id, e)))
res.map_err(|e| typ::RPCError::RemoteError(RemoteError::new(leader_id, e)))
}

/// Try the best to send a request to the leader.
Expand Down
56 changes: 29 additions & 27 deletions examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,54 @@ use std::sync::Arc;
use actix_web::middleware;
use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::App;
use actix_web::HttpServer;
use openraft::storage::Adaptor;
use openraft::BasicNode;
use openraft::Config;
use openraft::Raft;

use crate::app::ExampleApp;
use crate::app::App;
use crate::network::api;
use crate::network::management;
use crate::network::raft;
use crate::network::raft_network_impl::ExampleNetwork;
use crate::store::ExampleRequest;
use crate::store::ExampleResponse;
use crate::store::ExampleStore;
use crate::network::Network;
use crate::store::Request;
use crate::store::Response;
use crate::store::Store;

pub mod app;
pub mod client;
pub mod network;
pub mod store;

pub type ExampleNodeId = u64;
pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub ExampleTypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, Entry = openraft::Entry<ExampleTypeConfig>
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, Entry = openraft::Entry<TypeConfig>
);

pub type ExampleRaft = Raft<ExampleTypeConfig, ExampleNetwork, Arc<ExampleStore>>;
pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft = openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;

pub mod typ {
use openraft::BasicNode;

use crate::ExampleNodeId;
use crate::ExampleTypeConfig;
use crate::NodeId;
use crate::TypeConfig;

pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<ExampleNodeId, E>;
pub type RPCError<E = openraft::error::Infallible> =
openraft::error::RPCError<ExampleNodeId, BasicNode, RaftError<E>>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;

pub type ClientWriteError = openraft::error::ClientWriteError<ExampleNodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<ExampleNodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<ExampleNodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<ExampleNodeId, BasicNode>;
pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<ExampleTypeConfig>;
pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub async fn start_example_raft_node(node_id: ExampleNodeId, http_addr: String) -> std::io::Result<()> {
pub async fn start_example_raft_node(node_id: NodeId, http_addr: String) -> std::io::Result<()> {
// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 500,
Expand All @@ -65,18 +65,20 @@ pub async fn start_example_raft_node(node_id: ExampleNodeId, http_addr: String)
let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft data will be stored.
let store = Arc::new(ExampleStore::default());
let store = Arc::new(Store::default());

let (log_store, state_machine) = Adaptor::new(store.clone());

// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = ExampleNetwork {};
let network = Network {};

// Create a local raft instance.
let raft = Raft::new(node_id, config.clone(), network, store.clone()).await.unwrap();
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine).await.unwrap();

// Create an application that will store all the instances created above, this will
// be later used on the actix-web services.
let app = Data::new(ExampleApp {
let app_data = Data::new(App {
id: node_id,
addr: http_addr.clone(),
raft,
Expand All @@ -86,11 +88,11 @@ pub async fn start_example_raft_node(node_id: ExampleNodeId, http_addr: String)

// Start the actix-web server.
let server = HttpServer::new(move || {
App::new()
actix_web::App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.wrap(middleware::Compress::default())
.app_data(app.clone())
.app_data(app_data.clone())
// raft internal RPC
.service(raft::append)
.service(raft::snapshot)
Expand Down
14 changes: 7 additions & 7 deletions examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use openraft::error::RaftError;
use openraft::BasicNode;
use web::Json;

use crate::app::ExampleApp;
use crate::store::ExampleRequest;
use crate::ExampleNodeId;
use crate::app::App;
use crate::store::Request;
use crate::NodeId;

/**
* Application API
Expand All @@ -22,13 +22,13 @@ use crate::ExampleNodeId;
* - `POST - /read` attempt to find a value from a given key.
*/
#[post("/write")]
pub async fn write(app: Data<ExampleApp>, req: Json<ExampleRequest>) -> actix_web::Result<impl Responder> {
pub async fn write(app: Data<App>, req: Json<Request>) -> actix_web::Result<impl Responder> {
let response = app.raft.client_write(req.0).await;
Ok(Json(response))
}

#[post("/read")]
pub async fn read(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
pub async fn read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl Responder> {
let state_machine = app.store.state_machine.read().await;
let key = req.0;
let value = state_machine.data.get(&key).cloned();
Expand All @@ -38,7 +38,7 @@ pub async fn read(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result
}

#[post("/consistent_read")]
pub async fn consistent_read(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
pub async fn consistent_read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl Responder> {
let ret = app.raft.is_leader().await;

match ret {
Expand All @@ -47,7 +47,7 @@ pub async fn consistent_read(app: Data<ExampleApp>, req: Json<String>) -> actix_
let key = req.0;
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId, BasicNode>>> =
let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
Ok(value.unwrap_or_default());
Ok(Json(res))
}
Expand Down
23 changes: 8 additions & 15 deletions examples/raft-kv-memstore/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use std::collections::BTreeSet;

use actix_web::get;
use actix_web::post;
use actix_web::web;
use actix_web::web::Data;
use actix_web::web::Json;
use actix_web::Responder;
use openraft::error::Infallible;
use openraft::BasicNode;
use openraft::RaftMetrics;
use web::Json;

use crate::app::ExampleApp;
use crate::ExampleNodeId;
use crate::app::App;
use crate::NodeId;

// --- Cluster management

Expand All @@ -22,10 +21,7 @@ use crate::ExampleNodeId;
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
#[post("/add-learner")]
pub async fn add_learner(
app: Data<ExampleApp>,
req: Json<(ExampleNodeId, String)>,
) -> actix_web::Result<impl Responder> {
pub async fn add_learner(app: Data<App>, req: Json<(NodeId, String)>) -> actix_web::Result<impl Responder> {
let node_id = req.0 .0;
let node = BasicNode { addr: req.0 .1.clone() };
let res = app.raft.add_learner(node_id, node, true).await;
Expand All @@ -34,17 +30,14 @@ pub async fn add_learner(

/// Changes specified learners to members, or remove members.
#[post("/change-membership")]
pub async fn change_membership(
app: Data<ExampleApp>,
req: Json<BTreeSet<ExampleNodeId>>,
) -> actix_web::Result<impl Responder> {
pub async fn change_membership(app: Data<App>, req: Json<BTreeSet<NodeId>>) -> actix_web::Result<impl Responder> {
let res = app.raft.change_membership(req.0, false).await;
Ok(Json(res))
}

/// Initialize a single-node cluster.
#[post("/init")]
pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
pub async fn init(app: Data<App>) -> actix_web::Result<impl Responder> {
let mut nodes = BTreeMap::new();
nodes.insert(app.id, BasicNode { addr: app.addr.clone() });
let res = app.raft.initialize(nodes).await;
Expand All @@ -53,9 +46,9 @@ pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {

/// Get the latest metrics of the cluster
#[get("/metrics")]
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
pub async fn metrics(app: Data<App>) -> actix_web::Result<impl Responder> {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<ExampleNodeId, BasicNode>, Infallible> = Ok(metrics);
let res: Result<RaftMetrics<NodeId, BasicNode>, Infallible> = Ok(metrics);
Ok(Json(res))
}
5 changes: 4 additions & 1 deletion examples/raft-kv-memstore/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pub mod api;
pub mod management;
pub mod raft;
pub mod raft_network_impl;
mod raft_network_impl;

pub use raft_network_impl::Network;
pub use raft_network_impl::NetworkConnection;
Loading

0 comments on commit eaf45df

Please sign in to comment.