diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index 3d4041c13..33a653cc8 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -43,7 +43,7 @@ use super::cmd_worker::CEEventTxApi; use crate::{ cmd::{Command, ProposeId}, error::{ApplyConfChangeError, ProposeError}, - log_entry::LogEntry, + log_entry::{EntryData, LogEntry}, members::{ClusterInfo, Member, ServerId}, role_change::RoleChange, rpc::{ConfChange, ConfChangeType, IdSet, ReadState}, @@ -548,6 +548,7 @@ impl RawCurp { let prev_last_log_index = log_w.last_log_index(); self.recover_from_spec_pools(&mut st_w, &mut log_w, spec_pools); + self.recover_ucp_from_log(&mut log_w); let last_log_index = log_w.last_log_index(); self.become_leader(&mut st_w); @@ -1009,6 +1010,7 @@ impl RawCurp { let spec_pools = cst.sps.drain().collect(); let mut log_w = RwLockUpgradableReadGuard::upgrade(log); self.recover_from_spec_pools(st, &mut log_w, spec_pools); + self.recover_ucp_from_log(&mut log_w); self.become_leader(st); None } else { @@ -1140,6 +1142,20 @@ impl RawCurp { } } + /// Recover the ucp from uncommitted log entries + fn recover_ucp_from_log(&self, log: &mut Log) { + let mut ucp_l = self.ctx.ucp.lock(); + + for i in log.commit_index + 1..=log.last_log_index() { + let entry = log.get(i).unwrap_or_else(|| { + unreachable!("system corrupted, get a `None` value on log[{i}]") + }); + if let EntryData::Command(ref cmd) = entry.entry_data { + let _ignore = ucp_l.insert(cmd.id().clone(), Arc::clone(cmd)); + } + } + } + /// Apply new logs fn apply(&self, log: &mut Log) { for i in (log.last_as + 1)..=log.commit_index { diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 7000f604b..9addb1c71 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -557,6 +557,37 @@ fn recover_from_spec_pools_will_pick_the_correct_cmds() { }); } +#[traced_test] +#[test] +fn recover_ucp_from_logs_will_pick_the_correct_cmds() { + let curp = { + let mut exe_tx = MockCEEventTxApi::::default(); + exe_tx + .expect_send_reset() + .returning(|_| oneshot::channel().1); + Arc::new(RawCurp::new_test(5, exe_tx, mock_role_change())) + }; + curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); + + let cmd0 = Arc::new(TestCommand::new_put(vec![1], 1)); + let cmd1 = Arc::new(TestCommand::new_put(vec![2], 1)); + let cmd2 = Arc::new(TestCommand::new_put(vec![3], 1)); + curp.push_cmd(Arc::clone(&cmd0)); + curp.push_cmd(Arc::clone(&cmd1)); + curp.push_cmd(Arc::clone(&cmd2)); + curp.log.map_write(|mut log_w| log_w.commit_index = 1); + + curp.recover_ucp_from_log(&mut *curp.log.write()); + + curp.ctx.ucp.map_lock(|ucp| { + let mut ids: Vec<_> = ucp.values().map(|c| c.id()).collect(); + assert_eq!(ids.len(), 2); + ids.sort(); + assert_eq!(ids[0], cmd1.id()); + assert_eq!(ids[1], cmd2.id()); + }); +} + /*************** tests for leader retires **************/ /// To ensure #331 is fixed diff --git a/simulation/src/curp_group.rs b/simulation/src/curp_group.rs index 7f13982a1..c7076cdfd 100644 --- a/simulation/src/curp_group.rs +++ b/simulation/src/curp_group.rs @@ -312,26 +312,79 @@ impl CurpGroup { .unwrap() } - // Disconnect the node from the network. + /// Disconnect the node from the network. pub fn disable_node(&self, id: ServerId) { - let handle = madsim::runtime::Handle::current(); let net = madsim::net::NetSim::current(); - let Some(node) = handle.get_node(id.to_string()) else { - panic!("no node with name {id} in the simulator") - }; + let node = Self::get_node_handle(id); net.clog_node(node.id()); } - // Reconnect the node to the network. + /// Reconnect the node to the network. pub fn enable_node(&self, id: ServerId) { - let handle = madsim::runtime::Handle::current(); let net = madsim::net::NetSim::current(); - let Some(node) = handle.get_node(id.to_string()) else { - panic!("no node with name {id} the simulator") - }; + let node = Self::get_node_handle(id); net.unclog_node(node.id()); } + /// Disconnect the network link between two nodes + pub fn clog_link_nodes(&self, fst: ServerId, snd: ServerId) { + let node_fst = Self::get_node_handle(fst); + let node_snd = Self::get_node_handle(snd); + Self::clog_bidirectional(&node_fst, &node_snd); + } + + /// Reconnect the network link between two nodes + pub fn unclog_link_nodes(&self, fst: ServerId, snd: ServerId) { + let node_fst = Self::get_node_handle(fst); + let node_snd = Self::get_node_handle(snd); + Self::unclog_bidirectional(&node_fst, &node_snd); + } + + /// Disconnect the network link between the client and a list of nodes + /// Note: This will affect SimClient + pub fn clog_link_client_nodes<'a>(&'a self, server_ids: impl Iterator) { + let client_node = &self.client_node; + for server_id in server_ids { + let server_node = Self::get_node_handle(*server_id); + Self::clog_bidirectional(client_node, &server_node); + } + } + + /// Reconnect the network link between the client and a list of nodes + pub fn unclog_link_client_nodes<'a>(&'a self, server_ids: impl Iterator) { + let client_node = &self.client_node; + for server_id in server_ids { + let server_node = Self::get_node_handle(*server_id); + Self::unclog_bidirectional(client_node, &server_node); + } + } + + /// Clog the network bidirectionally + fn clog_bidirectional(node_fst: &NodeHandle, node_snd: &NodeHandle) { + let net = madsim::net::NetSim::current(); + let id_fst = node_fst.id(); + let id_snd = node_snd.id(); + net.clog_link(id_fst, id_snd); + net.clog_link(id_snd, id_fst); + } + + /// Unclog the network bidirectionally + fn unclog_bidirectional(node_fst: &NodeHandle, node_snd: &NodeHandle) { + let net = madsim::net::NetSim::current(); + let id_fst = node_fst.id(); + let id_snd = node_snd.id(); + net.unclog_link(id_fst, id_snd); + net.unclog_link(id_snd, id_fst); + } + + /// Get the server node handle from ServerId + fn get_node_handle(id: ServerId) -> NodeHandle { + let handle = madsim::runtime::Handle::current(); + handle + .get_node(id.to_string()) + .expect("no node with name {id} the simulator") + } + pub async fn get_connect(&self, id: &ServerId) -> SimProtocolClient { let addr = self .all_members diff --git a/simulation/tests/it/curp/server_election.rs b/simulation/tests/it/curp/server_election.rs index f3770b02f..453df9344 100644 --- a/simulation/tests/it/curp/server_election.rs +++ b/simulation/tests/it/curp/server_election.rs @@ -154,3 +154,43 @@ async fn propose_after_reelect() { group.stop().await; } + +// Verifies that #438 has been fixed +// This will likely to fail without a fix +#[madsim::test] +async fn conflict_should_detected_in_new_leader() { + init_logger(); + + let group = CurpGroup::new(3).await; + let client = group.new_client(ClientConfig::default()).await; + let leader1 = group.get_leader().await.0; + + // client only propose to leader + group.clog_link_client_nodes(group.nodes.keys().filter(|id| **id != leader1)); + assert_eq!( + client + .propose(TestCommand::new_put(vec![0], 0), true) + .await + .unwrap() + .0 + .values, + vec![] + ); + + // re-elect a new leader + group.disable_node(leader1); + group.unclog_link_client_nodes(group.nodes.keys().filter(|id| **id != leader1)); + let (_leader, _term) = wait_for_election(&group).await; + + assert_eq!( + client + .propose(TestCommand::new_get(vec![0]), true) + .await + .unwrap() + .0 + .values, + vec![0] + ); + + group.stop().await; +}