Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check region epoch strictly #4125

Merged
merged 14 commits into from
Feb 1, 2019
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ impl<T: Simulator> Cluster<T> {
}

let resp = result.unwrap();
if resp.get_header().get_error().has_stale_epoch() {
if resp.get_header().get_error().has_epoch_not_match() {
warn!("seems split, let's retry");
sleep_ms(100);
continue;
Expand Down Expand Up @@ -832,7 +832,7 @@ impl<T: Simulator> Cluster<T> {
let mut resp = write_resp.response;
if resp.get_header().has_error() {
let error = resp.get_header().get_error();
if error.has_stale_epoch()
if error.has_epoch_not_match()
|| error.has_not_leader()
|| error.has_stale_command()
{
Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ fn check_stale_region(region: &metapb::Region, check_region: &metapb::Region) ->
}

Err(box_err!(
"stale epoch {:?}, we are now {:?}",
"epoch not match {:?}, we are now {:?}",
check_epoch,
epoch
))
Expand Down
8 changes: 4 additions & 4 deletions src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ quick_error! {
display("TikvRPC {:?}", err)
}
NotLeader(new_leader: Option<Peer>) {}
StaleEpoch(new_regions: Vec<Region>) {}
EpochNotMatch(current_regions: Vec<Region>) {}
UpdateRegion(new_region: RegionInfo) {}
ImportJobFailed(tag: String) {
display("{}", tag)
Expand All @@ -124,9 +124,9 @@ impl From<errorpb::Error> for Error {
} else {
Error::NotLeader(None)
}
} else if err.has_stale_epoch() {
let mut error = err.take_stale_epoch();
Error::StaleEpoch(error.take_new_regions().to_vec())
} else if err.has_epoch_not_match() {
let mut error = err.take_epoch_not_match();
Error::EpochNotMatch(error.take_current_regions().to_vec())
} else {
Error::TikvRPC(err)
}
Expand Down
21 changes: 12 additions & 9 deletions src/import/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,24 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.sst.inside_region(r))
.cloned();
match new_region {
Some(new_region) => {
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_region);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand Down Expand Up @@ -373,7 +376,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
Ok(())
} else {
match Error::from(resp.take_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
Expand Down
24 changes: 15 additions & 9 deletions src/import/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,24 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
region.leader = new_leader;
Err(Error::UpdateRegion(region))
}
Err(Error::StaleEpoch(new_regions)) => {
let new_region = new_regions.iter().find(|&r| self.need_split(r)).cloned();
match new_region {
Some(new_region) => {
Err(Error::EpochNotMatch(current_regions)) => {
let current_region = current_regions
.iter()
.find(|&r| self.need_split(r))
.cloned();
match current_region {
Some(current_region) => {
let new_leader = region
.leader
.and_then(|p| find_region_peer(&new_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(new_region, new_leader)))
.and_then(|p| find_region_peer(&current_region, p.get_store_id()));
Err(Error::UpdateRegion(RegionInfo::new(
current_region,
new_leader,
)))
}
None => {
warn!("{} stale epoch {:?}", self.tag, new_regions);
Err(Error::StaleEpoch(new_regions))
warn!("{} epoch not match {:?}", self.tag, current_regions);
Err(Error::EpochNotMatch(current_regions))
}
}
}
Expand Down Expand Up @@ -231,7 +237,7 @@ impl<Client: ImportClient> PrepareRangeJob<Client> {
Ok(resp)
} else {
match Error::from(resp.take_region_error()) {
e @ Error::NotLeader(_) | e @ Error::StaleEpoch(_) => return Err(e),
e @ Error::NotLeader(_) | e @ Error::EpochNotMatch(_) => return Err(e),
e => Err(e),
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/raftstore/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ quick_error! {
description("request timeout")
display("Timeout {}", msg)
}
StaleEpoch(msg: String, new_regions: Vec<metapb::Region>) {
description("region is stale")
display("StaleEpoch {}", msg)
EpochNotMatch(msg: String, new_regions: Vec<metapb::Region>) {
overvenus marked this conversation as resolved.
Show resolved Hide resolved
description("region epoch is not match")
display("EpochNotMatch {}", msg)
}
StaleCommand {
description("stale command")
Expand Down Expand Up @@ -186,10 +186,10 @@ impl Into<errorpb::Error> for Error {
.mut_key_not_in_region()
.set_end_key(region.get_end_key().to_vec());
}
Error::StaleEpoch(_, new_regions) => {
let mut e = errorpb::StaleEpoch::new();
e.set_new_regions(RepeatedField::from_vec(new_regions));
errorpb.set_stale_epoch(e);
Error::EpochNotMatch(_, new_regions) => {
let mut e = errorpb::EpochNotMatch::new();
e.set_current_regions(RepeatedField::from_vec(new_regions));
errorpb.set_epoch_not_match(e);
}
Error::StaleCommand => {
errorpb.set_stale_command(errorpb::StaleCommand::new());
Expand Down
31 changes: 15 additions & 16 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ impl ApplyDelegate {
///
/// An apply operation can fail in the following situations:
/// 1. it encounters an error that will occur on all stores, it can continue
/// applying next entry safely, like stale epoch for example;
/// applying next entry safely, like epoch not match for example;
/// 2. it encounters an error that may not occur on all stores, in this case
/// we should try to apply the entry again or panic. Considering that this
/// usually due to disk operation fail, which is rare, so just panic is ok.
Expand All @@ -905,7 +905,7 @@ impl ApplyDelegate {
// clear dirty values.
ctx.wb_mut().rollback_to_save_point().unwrap();
match e {
Error::StaleEpoch(..) => debug!("{} stale epoch err: {:?}", self.tag, e),
Error::EpochNotMatch(..) => debug!("{} epoch not match err: {:?}", self.tag, e),
_ => error!("{} execute raft command err: {:?}", self.tag, e),
}
(cmd_resp::new_error(e), ApplyResult::None)
Expand Down Expand Up @@ -986,7 +986,7 @@ impl ApplyDelegate {
ctx: &mut ApplyContext,
req: RaftCmdRequest,
) -> Result<(RaftCmdResponse, ApplyResult)> {
// Include region for stale epoch after merge may cause key not in range.
// Include region for epoch not match after merge may cause key not in range.
let include_region =
req.get_header().get_region_epoch().get_version() >= self.last_merge_version;
check_region_epoch(&req, &self.region, include_region)?;
Expand Down Expand Up @@ -1995,7 +1995,7 @@ fn check_sst_for_ingestion(sst: &SSTMeta, region: &Region) -> Result<()> {
|| epoch.get_version() != region_epoch.get_version()
{
let error = format!("{:?} != {:?}", epoch, region_epoch);
return Err(Error::StaleEpoch(error, vec![region.clone()]));
return Err(Error::EpochNotMatch(error, vec![region.clone()]));
}

let range = sst.get_range();
Expand Down Expand Up @@ -3180,7 +3180,7 @@ mod tests {
.build();
router.schedule_task(1, Msg::apply(Apply::new(1, 2, vec![put_entry])));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(resp.get_header().get_error().has_stale_epoch());
assert!(resp.get_header().get_error().has_epoch_not_match());
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.applied_index_term, 2);
assert_eq!(apply_res.apply_state.get_applied_index(), 3);
Expand Down Expand Up @@ -3290,12 +3290,12 @@ mod tests {
.ingest_sst(&meta1)
.epoch(0, 3)
.build();
let ingest_stale_epoch = EntryBuilder::new(11, 3)
let ingest_epoch_not_match = EntryBuilder::new(11, 3)
.capture_resp(&router, 3, 1, capture_tx.clone())
.ingest_sst(&meta2)
.epoch(0, 3)
.build();
let entries = vec![put_ok, ingest_ok, ingest_stale_epoch];
let entries = vec![put_ok, ingest_ok, ingest_epoch_not_match];
router.schedule_task(1, Msg::apply(Apply::new(1, 3, entries)));
let resp = capture_rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(!resp.get_header().has_error(), "{:?}", resp);
Expand Down Expand Up @@ -3528,9 +3528,6 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// All requests should be checked.
assert!(error_msg(&resp).contains("id count"), "{:?}", resp);

let mut new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
let checker = SplitResultChecker {
db: &engines.kv,
origin_peers: &peers,
Expand All @@ -3544,11 +3541,11 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Split should succeed.
assert!(!resp.get_header().has_error(), "{:?}", resp);
let mut new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
checker.check(b"", b"k1", 8, &[9, 10, 11], true);
checker.check(b"k1", b"k5", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3557,11 +3554,11 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 1;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k4", b"k5", 12, &[13, 14, 15], true);
checker.check(b"k1", b"k4", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3573,12 +3570,12 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k1", b"k2", 16, &[17, 18, 19], true);
checker.check(b"k2", b"k3", 20, &[21, 22, 23], true);
checker.check(b"k3", b"k4", 1, &[3, 5, 7], false);

new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
splits.mut_requests().clear();
splits
.mut_requests()
Expand All @@ -3590,6 +3587,8 @@ mod tests {
let resp = exec_split(&router, splits.clone());
// Right derive should be respected.
assert!(!resp.get_header().has_error(), "{:?}", resp);
new_version = epoch.borrow().get_version() + 2;
epoch.borrow_mut().set_version(new_version);
checker.check(b"k3", b"k31", 1, &[3, 5, 7], false);
checker.check(b"k31", b"k32", 24, &[25, 26, 27], true);
checker.check(b"k32", b"k4", 28, &[29, 30, 31], true);
Expand Down
8 changes: 4 additions & 4 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

match util::check_region_epoch(msg, self.fsm.peer.region(), true) {
Err(Error::StaleEpoch(msg, mut new_regions)) => {
Err(Error::EpochNotMatch(msg, mut new_regions)) => {
// Attach the region which might be split from the current region. But it doesn't
// matter if the region is not split from the current region. If the region meta
// received by the TiKV driver is newer than the meta cached in the driver, the meta is
Expand All @@ -1936,8 +1936,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
if let Some(sibling_region) = sibling_region {
new_regions.push(sibling_region);
}
self.ctx.raft_metrics.invalid_proposal.stale_epoch += 1;
Err(Error::StaleEpoch(msg, new_regions))
self.ctx.raft_metrics.invalid_proposal.epoch_not_match += 1;
Err(Error::EpochNotMatch(msg, new_regions))
}
Err(e) => Err(e),
Ok(()) => Ok(None),
Expand Down Expand Up @@ -2230,7 +2230,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
region.get_region_epoch(),
epoch
);
return Err(Error::StaleEpoch(
return Err(Error::EpochNotMatch(
format!(
"{} epoch changed {:?} != {:?}, retry later",
self.fsm.peer.tag, latest_epoch, epoch
Expand Down
12 changes: 6 additions & 6 deletions src/raftstore/store/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ pub struct RaftInvalidProposeMetrics {
pub not_leader: u64,
pub mismatch_peer_id: u64,
pub stale_command: u64,
pub stale_epoch: u64,
pub epoch_not_match: u64,
}

impl Default for RaftInvalidProposeMetrics {
Expand All @@ -323,7 +323,7 @@ impl Default for RaftInvalidProposeMetrics {
not_leader: 0,
mismatch_peer_id: 0,
stale_command: 0,
stale_epoch: 0,
epoch_not_match: 0,
}
}
}
Expand Down Expand Up @@ -360,11 +360,11 @@ impl RaftInvalidProposeMetrics {
.inc_by(self.stale_command as i64);
self.stale_command = 0;
}
if self.stale_epoch > 0 {
if self.epoch_not_match > 0 {
RAFT_INVALID_PROPOSAL_COUNTER_VEC
.with_label_values(&["stale_epoch"])
.inc_by(self.stale_epoch as i64);
self.stale_epoch = 0;
.with_label_values(&["epoch_not_match"])
.inc_by(self.epoch_not_match as i64);
self.epoch_not_match = 0;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ impl ReadExecutor {
pub fn execute(&mut self, msg: &RaftCmdRequest, region: &metapb::Region) -> ReadResponse {
if self.check_epoch {
if let Err(e) = check_region_epoch(msg, region, true) {
debug!("[region {}] stale epoch err: {:?}", region.get_id(), e);
debug!("[region {}] epoch not match err: {:?}", region.get_id(), e);
return ReadResponse {
response: cmd_resp::new_error(e),
snapshot: None,
Expand Down
Loading