Skip to content

Commit

Permalink
raft: leader reponnds to learner read index message
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Apr 18, 2019
1 parent a726ccc commit 3c983cc
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
9 changes: 8 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,12 +1675,19 @@ impl<T: Storage> Raft<T> {
}
}
}
} else {
} else if m.get_from() == INVALID_ID || m.get_from() == self.id {
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::new();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(self.raft_log.committed);
to_send.set_entries(m.take_entries());
self.send(to_send);
}
return Ok(());
}
Expand Down
69 changes: 69 additions & 0 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,75 @@ fn test_read_only_option_safe() {
}
}

#[test]
fn test_read_only_with_learner() {
setup_for_test();
let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage());
let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage());

let mut nt = Network::new(vec![Some(a), Some(b)]);

// we can not let system choose the value of randomizedElectionTimeout
// otherwise it will introduce some uncertainty into this test case
// we need to ensure randomizedElectionTimeout > electionTimeout here
let b_election_timeout = nt.peers[&2].get_election_timeout();
nt.peers
.get_mut(&2)
.unwrap()
.set_randomized_election_timeout(b_election_timeout + 1);

for _ in 0..b_election_timeout {
nt.peers.get_mut(&2).unwrap().tick();
}
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);

let mut tests = vec![
(1, 10, 11, "ctx1"),
(2, 10, 21, "ctx2"),
(1, 10, 31, "ctx3"),
(2, 10, 41, "ctx4"),
];

for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() {
for _ in 0..proposals {
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
}

let e = new_entry(0, 0, Some(wctx));
nt.send(vec![new_message_with_entries(
id,
id,
MessageType::MsgReadIndex,
vec![e],
)]);

let read_states: Vec<ReadState> = nt
.peers
.get_mut(&id)
.unwrap()
.read_states
.drain(..)
.collect();
if read_states.is_empty() {
panic!("#{}: read_states is empty, want non-empty", i);
}
let rs = &read_states[0];
if rs.index != wri {
panic!("#{}: read_index = {}, want {}", i, rs.index, wri)
}
let vec_wctx = wctx.as_bytes().to_vec();
if rs.request_ctx != vec_wctx {
panic!(
"#{}: request_ctx = {:?}, want {:?}",
i, rs.request_ctx, vec_wctx
)
}
}
}

#[test]
fn test_read_only_option_lease() {
setup_for_test();
Expand Down

0 comments on commit 3c983cc

Please sign in to comment.