Skip to content

Commit

Permalink
Merge pull request #585 from drmingdrmer/50-fix-584
Browse files Browse the repository at this point in the history
Fix: Error After change_membership: `assertion failed: value > prev`: #584
  • Loading branch information
drmingdrmer authored Oct 29, 2022
2 parents f6f14f1 + c558376 commit d041202
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 5 deletions.
2 changes: 2 additions & 0 deletions examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::uninlined_format_args)]

use std::sync::Arc;

use actix_web::middleware;
Expand Down
2 changes: 2 additions & 0 deletions examples/raft-kv-memstore/tests/cluster/main.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#![allow(clippy::uninlined_format_args)]

mod test_cluster;
2 changes: 2 additions & 0 deletions examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::uninlined_format_args)]

use std::fmt::Display;
use std::path::Path;
use std::sync::Arc;
Expand Down
2 changes: 2 additions & 0 deletions examples/raft-kv-rocksdb/tests/cluster/main.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#![allow(clippy::uninlined_format_args)]

mod test_cluster;
7 changes: 4 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(crate) async fn spawn_replication_stream(
&mut self,
target: C::NodeId,
matched: Option<LogId<C::NodeId>>,
) -> Result<ReplicationStream<C::NodeId>, N::ConnectionError> {
let target_node = self.engine.state.membership_state.effective.get_node(&target);
let membership_log_id = self.engine.state.membership_state.effective.log_id;
Expand All @@ -1007,6 +1008,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.config.clone(),
self.engine.state.last_log_id(),
self.engine.state.committed,
matched,
network,
self.storage.get_log_reader().await,
self.tx_api.clone(),
Expand Down Expand Up @@ -1611,9 +1613,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::UpdateReplicationStreams { targets } => {
self.remove_all_replication().await;

// TODO: use _matched to initialize replication
for (node_id, _matched) in targets.iter() {
match self.spawn_replication_stream(*node_id).await {
for (node_id, matched) in targets.iter() {
match self.spawn_replication_stream(*node_id, *matched).await {
Ok(state) => {
if let Some(l) = &mut self.leader_data {
l.nodes.insert(*node_id, state);
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config: Arc<Config>,
last_log: Option<LogId<C::NodeId>>,
committed: Option<LogId<C::NodeId>>,
matched: Option<LogId<C::NodeId>>,
network: N::Network,
log_reader: S::LogReader,
raft_core_tx: mpsc::UnboundedSender<RaftMsg<C, N, S>>,
Expand All @@ -158,7 +159,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config,
target_repl_state: TargetReplState::LineRate,
committed,
matched: None,
matched,
max_possible_matched_index: last_log.index(),
raft_core_tx,
repl_rx,
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ mod t30_step_down;
mod t40_removed_follower;
mod t45_remove_unreachable_follower;
mod t99_issue_471_adding_learner_uses_uninit_leader_id;
mod t99_issue_584_replication_state_reverted;
mod t99_new_leader_auto_commit_uniform_config;
1 change: 0 additions & 1 deletion openraft/tests/membership/t10_add_learner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::option::Option::None;
use std::sync::Arc;
use std::time::Duration;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::Config;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn t99_issue_584_replication_state_reverted() -> Result<()> {
// - Add a learner and replicate all logs to it.
// - Add the learner as a voter. When membership changes, openraft internally restarts all replication.
//
// This case asserts it does not break the internal monotonic-replication-progress guarantee.

let config = Arc::new(
Config {
max_in_snapshot_log_to_keep: 2000, // prevent snapshot
enable_tick: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;

let n = 500u64;
tracing::info!("--- write up to {} logs", n);
{
router.client_request_many(0, "foo", (n - log_index) as usize).await?;
log_index = n;

router.wait(&1, timeout()).log(Some(log_index), "replicate all logs to learner").await?;
}

tracing::info!("--- change-membership: make learner node-1 a voter. This should not panic");
{
let leader = router.get_raft_handle(&0)?;
leader.change_membership(btreeset![0, 1], true, false).await?;
log_index += 2; // 2 change_membership log

let _ = log_index;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit d041202

Please sign in to comment.