Skip to content

Commit

Permalink
Change: remove AddLearnerResponse and AddLearnerError
Browse files Browse the repository at this point in the history
In openraft adds a learner is done by committing a membership config
log, which is almost the same as committing any log.

`AddLearnerResponse` contains a field `matched` to indicate the
replication state to the learner, which is not included in
`ClientWriteResponse`. This information can be retrieved via
`Raft::metrics()`.

Therefore to keep the API simple, replace `AddLearnerResponse` with
`ClientWriteResponse`.

Behavior change: adding a learner always commit a new membership config
log, no matter if it already exists in membership.
To avoid duplicated add, an application should check existence first by
examining `Raft::metrics()`

- Fix: #679

Upgrade tips:

- Replace AddLearnerResponse with ClientWriteResponse
- Replace AddLearnerError with ClientWriteError

Passes the application compilation.

See the changes in examples/.
  • Loading branch information
drmingdrmer committed Feb 17, 2023
1 parent 7c5a60d commit 0161a3d
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 121 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ExampleClient {
pub async fn add_learner(
&self,
req: (ExampleNodeId, String),
) -> Result<typ::AddLearnerResponse, typ::RPCError<typ::AddLearnerError>> {
) -> Result<typ::ClientWriteResponse, typ::RPCError<typ::ClientWriteError>> {
self.send_rpc_to_leader("add-learner", Some(&req)).await
}

Expand Down
2 changes: 0 additions & 2 deletions examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ pub mod typ {
openraft::error::RPCError<ExampleNodeId, BasicNode, RaftError<E>>;

pub type ClientWriteError = openraft::error::ClientWriteError<ExampleNodeId, BasicNode>;
pub type AddLearnerError = openraft::error::AddLearnerError<ExampleNodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<ExampleNodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<ExampleNodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<ExampleNodeId, BasicNode>;

pub type AddLearnerResponse = openraft::raft::AddLearnerResponse<ExampleNodeId>;
pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<ExampleTypeConfig>;
}

Expand Down
6 changes: 2 additions & 4 deletions examples/raft-kv-rocksdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::Mutex;

use openraft::error::AddLearnerError;
use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
Expand All @@ -11,7 +10,6 @@ use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::raft::AddLearnerResponse;
use openraft::raft::ClientWriteResponse;
use openraft::RaftMetrics;
use openraft::TryAsRef;
Expand Down Expand Up @@ -111,8 +109,8 @@ impl ExampleClient {
&self,
req: (ExampleNodeId, String, String),
) -> Result<
AddLearnerResponse<ExampleNodeId>,
RPCError<ExampleNodeId, ExampleNode, RaftError<ExampleNodeId, AddLearnerError<ExampleNodeId, ExampleNode>>>,
ClientWriteResponse<ExampleTypeConfig>,
RPCError<ExampleNodeId, ExampleNode, RaftError<ExampleNodeId, ClientWriteError<ExampleNodeId, ExampleNode>>>,
> {
self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await
}
Expand Down
41 changes: 2 additions & 39 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::AddLearnerResponse;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::AppendEntriesTx;
use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteTx;
use crate::raft::ExternalCommand;
use crate::raft::RaftAddLearnerTx;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::raft::VoteRequest;
Expand Down Expand Up @@ -377,7 +375,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&mut self,
target: C::NodeId,
node: C::Node,
tx: RaftAddLearnerTx<C::NodeId, C::Node>,
tx: ClientWriteTx<C, C::NodeId, C::Node>,
) -> Result<(), Fatal<C::NodeId>> {
if let Some(l) = &self.leader_data {
tracing::debug!(
Expand All @@ -389,47 +387,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
unreachable!("it has to be a leader!!!");
}

// Ensure the node doesn't already exist in the current config,
// in the set of new nodes already being synced, or in the nodes being removed.

let curr = &self.engine.state.membership_state.effective();
if curr.contains(&target) {
let matching = if let Some(l) = &self.engine.internal_server_state.leading() {
*l.progress.get(&target)
} else {
unreachable!("it has to be a leader!!!");
};

tracing::debug!(
"target {:?} already member or learner, can't add; matching:{:?}",
target,
matching
);

let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: self.engine.state.membership_state.effective().log_id,
matched: matching.matching,
}));
return Ok(());
}

let curr = &self.engine.state.membership_state.effective().membership;
let new_membership = curr.add_learner(target, node);

tracing::debug!(?new_membership, "new_membership with added learner: {}", target);

let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;

tracing::debug!(
"after add target node {} as learner; last_log_id: {:?}",
target,
self.engine.state.last_log_id()
);

let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: Some(log_id),
matched: None,
}));
self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?;

Ok(())
}
Expand Down
35 changes: 0 additions & 35 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,41 +200,6 @@ pub enum ChangeMembershipError<NID: NodeId> {
LearnerIsLagging(#[from] LearnerIsLagging<NID>),
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum AddLearnerError<NID, N>
where
NID: NodeId,
N: Node,
{
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader<NID, N>),
}

impl<NID, N> TryAsRef<ForwardToLeader<NID, N>> for AddLearnerError<NID, N>
where
NID: NodeId,
N: Node,
{
fn try_as_ref(&self) -> Option<&ForwardToLeader<NID, N>> {
let Self::ForwardToLeader(f) = self;
Some(f)
}
}

impl<NID, N> TryFrom<AddLearnerError<NID, N>> for ForwardToLeader<NID, N>
where
NID: NodeId,
N: Node,
{
type Error = AddLearnerError<NID, N>;

fn try_from(value: AddLearnerError<NID, N>) -> Result<Self, Self::Error> {
let AddLearnerError::ForwardToLeader(e) = value;
Ok(e)
}
}

/// The set of errors which may take place when initializing a pristine Raft node.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
Expand Down
36 changes: 6 additions & 30 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::core::TickHandle;
use crate::core::VoteWiseTime;
use crate::engine::Engine;
use crate::engine::EngineConfig;
use crate::error::AddLearnerError;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
Expand Down Expand Up @@ -495,7 +494,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
id: C::NodeId,
node: C::Node,
blocking: bool,
) -> Result<AddLearnerResponse<C::NodeId>, RaftError<C::NodeId, AddLearnerError<C::NodeId, C::Node>>> {
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>> {
let (tx, rx) = oneshot::channel();
let resp = self.call_core(RaftMsg::AddLearner { id, node, tx }, rx).await?;

Expand All @@ -510,19 +509,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
// Otherwise, blocks until the replication to the new learner becomes up to date.

// The log id of the membership that contains the added learner.
let membership_log_id = resp.membership_log_id;

let res0 = Arc::new(std::sync::Mutex::new(resp));
let res = res0.clone();
let membership_log_id = resp.log_id;

let wait_res = self
.wait(None)
.metrics(
|metrics| match self.check_replication_upto_date(metrics, id, membership_log_id) {
Ok(matched) => {
res.lock().unwrap().matched = matched;
true
}
|metrics| match self.check_replication_upto_date(metrics, id, Some(membership_log_id)) {
Ok(_matching) => true,
// keep waiting
Err(_) => false,
},
Expand All @@ -532,11 +525,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,

tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner");

let r = {
let x = res0.lock().unwrap();
x.clone()
};
Ok(r)
Ok(resp)
}

/// Returns Ok() with the latest known matched log id if it should quit waiting: leader change,
Expand Down Expand Up @@ -850,19 +839,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,

pub(crate) type RaftRespTx<T, E> = oneshot::Sender<Result<T, E>>;

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct AddLearnerResponse<NID: NodeId> {
/// The log id of the membership that contains the added learner.
pub membership_log_id: Option<LogId<NID>>,

/// The last log id that matches leader log.
pub matched: Option<LogId<NID>>,
}

/// TX for Add Learner Response
pub(crate) type RaftAddLearnerTx<NID, N> = RaftRespTx<AddLearnerResponse<NID>, AddLearnerError<NID, N>>;

/// TX for Install Snapshot Response
pub(crate) type InstallSnapshotTx<NID> = RaftRespTx<InstallSnapshotResponse<NID>, InstallSnapshotError>;

Expand Down Expand Up @@ -927,7 +903,7 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
node: C::Node,

/// Send the log id when the replication becomes line-rate.
tx: RaftAddLearnerTx<C::NodeId, C::Node>,
tx: ClientWriteTx<C, C::NodeId, C::Node>,
},

ChangeMembership {
Expand Down
5 changes: 2 additions & 3 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use memstore::Config as MemConfig;
use memstore::IntoMemClientRequest;
use memstore::MemStore;
use openraft::async_trait::async_trait;
use openraft::error::AddLearnerError;
use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::InstallSnapshotError;
Expand All @@ -34,9 +33,9 @@ use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::metrics::Wait;
use openraft::raft::AddLearnerResponse;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::ClientWriteResponse;
use openraft::raft::InstallSnapshotRequest;
use openraft::raft::InstallSnapshotResponse;
use openraft::raft::VoteRequest;
Expand Down Expand Up @@ -556,7 +555,7 @@ where
&self,
leader: C::NodeId,
target: C::NodeId,
) -> Result<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId, C::Node>> {
) -> Result<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>> {
let node = self.get_raft_handle(&leader).unwrap();
node.add_learner(target, C::Node::default(), true).await.map_err(|e| e.into_api_error().unwrap())
}
Expand Down
32 changes: 25 additions & 7 deletions openraft/tests/membership/t10_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::CommittedLeaderId;
use openraft::Config;
use openraft::LogId;
use openraft::Membership;
use openraft::RaftLogReader;
use openraft::StorageHelper;
use tokio::time::sleep;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand All @@ -34,10 +33,13 @@ async fn add_learner_basic() -> Result<()> {

let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {}).await?;

tracing::info!("--- re-adding leader does nothing");
tracing::info!("--- re-adding leader commits a new log but does nothing");
{
let res = router.add_learner(0, 0).await?;
assert_eq!(Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)), res.matched);
log_index += 1;

assert_eq!(log_index, res.log_id.index);
router.wait(&0, timeout()).log(Some(log_index), "commit re-adding leader log").await?;
}

tracing::info!("--- add new node node-1");
Expand Down Expand Up @@ -74,7 +76,14 @@ async fn add_learner_basic() -> Result<()> {
tracing::info!("--- re-add node-1, nothing changes");
{
let res = router.add_learner(0, 1).await?;
assert_eq!(Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)), res.matched);
log_index += 1;

assert_eq!(log_index, res.log_id.index);
router.wait(&0, timeout()).log(Some(log_index), "commit re-adding node-1 log").await?;

let metrics = router.get_raft_handle(&0)?.metrics().borrow().clone();
let node_ids = metrics.membership_config.membership.nodes().map(|x| *x.0).collect::<Vec<_>>();
assert_eq!(vec![0, 1], node_ids);
}

Ok(())
Expand Down Expand Up @@ -109,10 +118,19 @@ async fn add_learner_non_blocking() -> Result<()> {
router.wait(&0, timeout()).log(Some(log_index), "received 100 logs").await?;

router.new_raft_node(1).await;

// Replication problem should not block adding-learner in non-blocking mode.
router.isolate_node(1);

let raft = router.get_raft_handle(&0)?;
let res = raft.add_learner(1, (), false).await?;
raft.add_learner(1, (), false).await?;

sleep(Duration::from_millis(500)).await;

assert_eq!(None, res.matched);
let metrics = router.get_raft_handle(&0)?.metrics().borrow().clone();
let repl = metrics.replication.as_ref().unwrap();
let n1_repl = repl.data().replication.get(&1);
assert_eq!(None, n1_repl);
}

Ok(())
Expand Down

0 comments on commit 0161a3d

Please sign in to comment.