Skip to content

Commit

Permalink
raftstore: Implement coprocessor observer pre_persist (tikv#12957)
Browse files Browse the repository at this point in the history
ref tikv#12849

Support coprocessor observer pre_commit

Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo committed Sep 15, 2022
1 parent 30f5313 commit 76cabb0
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 10 deletions.
35 changes: 35 additions & 0 deletions components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,41 @@ impl RegionChangeObserver for TiFlashObserver {
.handle_destroy(ob_ctx.region().get_id());
}
}
fn pre_persist(
&self,
ob_ctx: &mut ObserverContext<'_>,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut should_persist = if is_finished {
false
} else {
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge | AdminCmdType::RollbackMerge => true,
_ => false,
}
} else {
false
}
};
if should_persist {
info!(
"observe pre_persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
);
} else {
debug!(
"observe pre_persist";
"region_id" => ob_ctx.region().get_id(),
"peer_id" => self.peer_id,
"is_finished" => is_finished,
);
};
should_persist
}
}

impl PdTaskObserver for TiFlashObserver {
Expand Down
20 changes: 20 additions & 0 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,26 @@ impl<E: KvEngine> CoprocessorHost<E> {
);
}

/// `pre_persist` is called we we want to persist data or meta for a region.
/// For example, in `finish_for` and `commit`,
/// we will separately call `pre_persist` with is_finished = true/false.
/// By returning false, we reject this persistence.
pub fn pre_persist(
&self,
region: &Region,
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
let mut ctx = ObserverContext::new(region);
for observer in &self.registry.region_change_observers {
let observer = observer.observer.inner();
if !observer.pre_persist(&mut ctx, is_finished, cmd) {
return false;
}
}
true
}

pub fn on_flush_applied_cmd_batch(
&self,
max_level: ObserveLevel,
Expand Down
11 changes: 11 additions & 0 deletions components/raftstore/src/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ pub enum RegionChangeEvent {
pub trait RegionChangeObserver: Coprocessor {
/// Hook to call when a region changed on this TiKV
fn on_region_changed(&self, _: &mut ObserverContext<'_>, _: RegionChangeEvent, _: StateRole) {}

/// Should be called everytime before we write a WriteBatch into
/// KvEngine. Returns false if we can't commit at this time.
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
true
}
}

#[derive(Clone, Debug, Default)]
Expand Down
67 changes: 57 additions & 10 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,17 @@ where
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
#[cfg(any(test, feature = "testexport"))]
{
if cfg!(feature = "compat_old_proxy") {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
if self.host.pre_persist(&delegate.region, true, None) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
} else {
debug!("do not persist when finish_for";
"region" => ?delegate.region,
"tag" => &delegate.tag,
);
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
Expand Down Expand Up @@ -1096,8 +1098,15 @@ where
return ApplyResult::Yield;
}
}
if should_flush_to_engine(&cmd) {
apply_ctx.commit_opt(self, true);
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if (has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine())
&& apply_ctx.host.pre_persist(&self.region, false, Some(&cmd))
{
// TODO(tiflash) may write apply state twice here.
// Originally use only `commit_opt`.
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
Expand Down Expand Up @@ -4983,6 +4992,7 @@ mod tests {
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
filter_compact_log: Arc<AtomicBool>,
filter_consistency_check: Arc<AtomicBool>,
skip_persist_when_pre_commit: Arc<AtomicBool>,
delay_remove_ssts: Arc<AtomicBool>,
last_delete_sst_count: Arc<AtomicU64>,
last_pending_delete_sst_count: Arc<AtomicU64>,
Expand Down Expand Up @@ -5106,6 +5116,17 @@ mod tests {
fn on_applied_current_term(&self, _: raft::StateRole, _: &Region) {}
}

impl RegionChangeObserver for ApplyObserver {
fn pre_persist(
&self,
_: &mut ObserverContext<'_>,
_is_finished: bool,
_cmd: Option<&RaftCmdRequest>,
) -> bool {
!self.skip_persist_when_pre_commit.load(Ordering::SeqCst)
}
}

#[test]
fn test_handle_raft_committed_entries() {
let (_path, engine) = create_tmp_engine("test-delegate");
Expand Down Expand Up @@ -5725,6 +5746,8 @@ mod tests {
let obs = ApplyObserver::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(obs.clone()));
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(obs.clone()));
host.registry
.register_query_observer(1, BoxQueryObserver::new(obs.clone()));

Expand Down Expand Up @@ -5760,6 +5783,8 @@ mod tests {
reg.region.mut_region_epoch().set_version(3);
router.schedule_task(1, Msg::Registration(reg));

obs.skip_persist_when_pre_commit
.store(true, Ordering::SeqCst);
let mut index_id = 1;
let put_entry = EntryBuilder::new(index_id, 1)
.put(b"k1", b"v1")
Expand All @@ -5768,7 +5793,19 @@ mod tests {
.epoch(1, 3)
.build();
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![put_entry], vec![])));
fetch_apply_res(&rx);
let apply_res = fetch_apply_res(&rx);

// We don't persist at `finish_for`, since we disabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index() + 1
);
obs.skip_persist_when_pre_commit
.store(false, Ordering::SeqCst);

// Phase 1: we test if pre_exec will filter execution of commands correctly.
index_id += 1;
Expand All @@ -5790,6 +5827,16 @@ mod tests {
assert_eq!(apply_res.exec_res.len(), 0);
assert_eq!(apply_res.apply_state.get_truncated_state().get_index(), 0);

// We persist at `finish_for`, since we enabled `pre_persist`.
let state: RaftApplyState = engine
.get_msg_cf(CF_RAFT, &keys::apply_state_key(1))
.unwrap()
.unwrap_or_default();
assert_eq!(
apply_res.apply_state.get_applied_index(),
state.get_applied_index()
);

index_id += 1;
// Don't filter CompactLog
obs.filter_compact_log.store(false, Ordering::SeqCst);
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl Simulator for NodeCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp.path().to_str().unwrap());
(snap_mgr, Some(tmp))
} else {
Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ impl ServerCluster {
.max_total_size(cfg.server.snap_max_total_size.0)
.encryption_key_manager(key_manager)
.max_per_file_size(cfg.raft_store.max_snapshot_file_raw_size.0)
.enable_multi_snapshot_files(true)
.build(tmp_str);
self.snap_mgrs.insert(node_id, snap_mgr.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
Expand Down

0 comments on commit 76cabb0

Please sign in to comment.