From 7313311cf1070c64f1db8e00b431d66e98d61094 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 25 Jan 2022 17:46:01 +0800 Subject: [PATCH 1/7] introduce resolved ts Signed-off-by: Yu Juncen --- Cargo.lock | 1 + components/br-stream/Cargo.toml | 1 + components/br-stream/src/endpoint.rs | 37 +++++++++++- components/br-stream/src/event_loader.rs | 4 ++ components/br-stream/src/metrics.rs | 5 +- components/br-stream/src/observer.rs | 2 +- components/br-stream/src/router.rs | 72 ++++++++++++++++++------ components/br-stream/src/utils.rs | 15 +++-- 8 files changed, 109 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eec447c79b..04fd9bb77a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,6 +432,7 @@ dependencies = [ "raftstore", "rand 0.8.3", "regex", + "resolved_ts", "slog", "slog-global", "thiserror", diff --git a/components/br-stream/Cargo.toml b/components/br-stream/Cargo.toml index 860906f3a6..a3bfcd1c20 100644 --- a/components/br-stream/Cargo.toml +++ b/components/br-stream/Cargo.toml @@ -47,6 +47,7 @@ raft = { version = "0.6.0-alpha", default-features = false, features = ["protobu raftstore = { path = "../raftstore", default-features = false } tikv = { path = "../../", default-features = false } online_config = { path = "../online_config" } +resolved_ts = { path = "../resolved_ts"} [dev-dependencies] rand = "0.8.0" diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index 3f85d576ff..544ab1fd4d 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -4,13 +4,16 @@ use std::convert::AsRef; use std::fmt; use std::marker::PhantomData; use std::path::PathBuf; +use std::sync::Arc; +use dashmap::DashMap; use engine_traits::KvEngine; use kvproto::metapb::Region; use raftstore::router::RaftStoreRouter; use raftstore::store::fsm::ChangeObserver; +use resolved_ts::Resolver; use tikv_util::time::Instant; use tokio::io::Result as TokioResult; use tokio::runtime::Runtime; @@ -51,6 +54,7 @@ pub struct Endpoint { regions: R, engine: PhantomData, router: RT, + resolvers: Arc>, } impl Endpoint @@ -104,6 +108,7 @@ where }); } + info!("the endpoint of stream backup started"; "path" => %config.streaming_path); Endpoint { config, meta_client, @@ -115,6 +120,7 @@ where regions: accessor, engine: PhantomData, router, + resolvers: Default::default(), } } } @@ -159,7 +165,17 @@ where fn backup_batch(&self, batch: CmdBatch) { let mut sw = StopWatch::new(); let region_id = batch.region_id; - let kvs = ApplyEvent::from_cmd_batch(batch, /* TODO */ 0); + let mut resolver = match self.resolvers.as_ref().get_mut(®ion_id) { + Some(rts) => rts, + None => { + warn!("BUG: the region isn't registered (no resolver found) but sent to backup_batch."; "region_id" => %region_id); + return; + } + }; + + let kvs = ApplyEvent::from_cmd_batch(batch, resolver.value_mut()); + drop(resolver); + HANDLE_EVENT_DURATION_HISTOGRAM .with_label_values(&["to_stream_event"]) .observe(sw.lap().as_secs_f64()); @@ -248,6 +264,7 @@ where let start = Instant::now_coarse(); let start_ts = task.info.get_start_ts(); let ob = self.observer.clone(); + let rs = self.resolvers.clone(); let success = self .observer .ranges @@ -266,7 +283,11 @@ where start_key.clone(), end_key.clone(), TimeStamp::new(start_ts), - |region_id, handle| ob.subs.register_region(region_id, handle), + |region_id, handle| { + // Note: maybe we'd better schedule a "register region" here? + ob.subs.register_region(region_id, handle); + rs.insert(region_id, Resolver::new(region_id)); + }, ); match range_init_result { Ok(stat) => { @@ -300,8 +321,17 @@ where pub fn on_flush(&self, task: String, store_id: u64) { let router = self.range_router.clone(); + let cli = self + .meta_client + .as_ref() + .expect("on_flush: executed from an endpoint without cli") + .clone(); self.pool.spawn(async move { - router.do_flush(&task, store_id).await; + if let Some(rts) = router.do_flush(&task, store_id).await { + if let Err(err) = cli.step_task(&task, rts).await { + err.report(format!("on flushing task {}", task)); + } + } }); } @@ -320,6 +350,7 @@ where e.report(format!("register region {} to raftstore", region.get_id())); } self.observer.subs.register_region(region_id, handle); + self.resolvers.insert(region.id, Resolver::new(region.id)); } pub fn do_backup(&mut self, events: Vec) { diff --git a/components/br-stream/src/event_loader.rs b/components/br-stream/src/event_loader.rs index ac73eab03f..8345802070 100644 --- a/components/br-stream/src/event_loader.rs +++ b/components/br-stream/src/event_loader.rs @@ -177,6 +177,8 @@ where } /// Initialize the region: register it to the raftstore and the observer. + /// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`) + /// and generate the corresponding ApplyEvent to the sink directly. pub fn initialize_region( &self, region: &Region, @@ -184,6 +186,7 @@ where cmd: ChangeObserver, ) -> Result { let snap = self.observe_over(region, cmd)?; + // It is ok to sink more data than needed. So scan to +inf TS for convenance. let mut event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?; let mut stats = StatisticsSummary::default(); loop { @@ -207,6 +210,7 @@ where Ok(stats.stat) } + /// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`]. pub fn initialize_range( &self, start_key: Vec, diff --git a/components/br-stream/src/metrics.rs b/components/br-stream/src/metrics.rs index 991fc3313d..ff5e5e8760 100644 --- a/components/br-stream/src/metrics.rs +++ b/components/br-stream/src/metrics.rs @@ -40,10 +40,11 @@ lazy_static! { &["type"] ) .unwrap(); - pub static ref ON_EVENT_COST_HISTOGRAM : HistogramVec = register_histogram_vec!( + pub static ref ON_EVENT_COST_HISTOGRAM: HistogramVec = register_histogram_vec!( "tikv_stream_on_event_duration_seconds", "The time cost of handling events.", &["stage"], exponential_buckets(0.001, 2.0, 16).unwrap() - ).unwrap(); + ) + .unwrap(); } diff --git a/components/br-stream/src/observer.rs b/components/br-stream/src/observer.rs index d2c1a8168e..9b67619002 100644 --- a/components/br-stream/src/observer.rs +++ b/components/br-stream/src/observer.rs @@ -1,6 +1,6 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use std::sync::{Arc, RwLock}; -// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. use crate::errors::Error; use crate::utils::SegmentTree; use dashmap::DashMap; diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index 8c6fadea42..b0c9b9df3c 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -11,6 +11,7 @@ use std::{ }; use crate::{ + annotate, codec::Encoder, endpoint::Task, errors::Error, @@ -21,7 +22,7 @@ use crate::{ use super::errors::Result; -use engine_traits::{CfName, CF_DEFAULT, CF_WRITE}; +use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE}; use external_storage::{BackendConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; @@ -34,6 +35,7 @@ use kvproto::{ use openssl::hash::{Hasher, MessageDigest}; use protobuf::Message; use raftstore::coprocessor::CmdBatch; +use resolved_ts::Resolver; use slog_global::debug; use tidb_query_datatype::codec::table::decode_table_id; @@ -47,7 +49,7 @@ use tikv_util::{ use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use tokio::{fs::remove_file, fs::File}; -use txn_types::{Key, TimeStamp}; +use txn_types::{Key, Lock, TimeStamp}; #[derive(Debug)] pub struct ApplyEvent { @@ -64,7 +66,7 @@ impl ApplyEvent { /// Assuming the resolved ts of the region is `resolved_ts`. /// Note: the resolved ts cannot be advanced if there is no command, /// maybe we also need to update resolved_ts when flushing? - pub fn from_cmd_batch(cmd: CmdBatch, resolved_ts: u64) -> Vec { + pub fn from_cmd_batch(cmd: CmdBatch, resolver: &mut Resolver) -> Vec { let region_id = cmd.region_id; let mut result = vec![]; for req in cmd @@ -87,6 +89,7 @@ impl ApplyEvent { .flat_map(|mut cmd| cmd.request.take_requests().into_iter()) { let cmd_type = req.get_cmd_type(); + let (key, value, cf) = match utils::request_to_triple(req) { Either::Left(t) => t, Either::Right(req) => { @@ -95,12 +98,40 @@ impl ApplyEvent { continue; } }; + if cf == CF_LOCK { + match Lock::parse(&value).map_err(|err| { + annotate!( + err, + "failed to parse lock (value = {})", + utils::redact(&value) + ) + }) { + Ok(lock) => match cmd_type { + CmdType::Put => resolver.track_lock(lock.ts, key, None), + CmdType::Delete => resolver.untrack_lock(&key, None), + _ => continue, + }, + Err(err) => err.report(format!("region id = {}", region_id)), + }; + + continue; + } + // use the key ts as min_ts would be safe. + // - if it is uncommitted, the lock would be tracked, preventing resolved ts + // advanced incorrectly. + // - if it is committed, it is safe(hopefully) to advance resolved ts to it. + // (Will something like one PC break this?) + // note: maybe get this ts from PD? The current implement cannot advance the resolved ts + // if there is no write. + let region_resolved_ts = resolver + .resolve(Key::decode_ts_from(&key).unwrap_or_default()) + .into_inner(); let item = Self { key, value, cf, region_id, - region_resolved_ts: resolved_ts, + region_resolved_ts, cmd_type, }; if item.should_record() { @@ -340,14 +371,23 @@ impl RouterInner { Ok(()) } - pub async fn do_flush(&self, task_name: &str, store_id: u64) { + /// flush the specified task, once once success, return the min resolved ts of this flush. + /// returns `None` if failed. + pub async fn do_flush(&self, task_name: &str, store_id: u64) -> Option { debug!("backup stream do flush"; "task" => task_name); - if let Some(task_info) = self.tasks.lock().await.get(task_name) { - if let Err(e) = task_info.do_flush(store_id).await { - warn!("backup steam do flush fail"; "err" => ?e); + match self.tasks.lock().await.get(task_name) { + Some(task_info) => { + let result = task_info.do_flush(store_id).await; + if let Err(ref e) = result { + warn!("backup steam do flush fail"; "err" => ?e); + } + + // set false to flushing whether success or fail + task_info.set_flushing_status(false); + + result.ok().flatten() } - // set false to flushing whether success or fail - task_info.set_flushing_status(false); + _ => None, } } } @@ -458,8 +498,6 @@ pub struct StreamTaskInfo { min_resolved_ts: TimeStamp, /// Total size of all temporary files in byte. total_size: AtomicUsize, - /// Whether those files are already requested to be flushed. - /// /// This should only be set to `true` by `compare_and_set(current=false, value=ture)`. /// The thread who setting it to `true` takes the responsibility of sending the request to the /// scheduler for flushing the files then. @@ -497,7 +535,6 @@ impl StreamTaskInfo { }) } - // TODO: make a file-level lock for getting rid of the &mut. /// Append a event to the files. This wouldn't trigger `fsync` syscall. /// i.e. No guarantee of persistence. pub async fn on_event(&self, kv: ApplyEvent) -> Result<()> { @@ -636,10 +673,12 @@ impl StreamTaskInfo { Ok(()) } - pub async fn do_flush(&self, store_id: u64) -> Result<()> { + /// execute the flush: copy local files to external storage. + /// if success, return the last resolved ts of this flush. + pub async fn do_flush(&self, store_id: u64) -> Result> { // do nothing if not flushing status. if !self.is_flushing() { - return Ok(()); + return Ok(None); } // generage meta data and prepare to flush to storage @@ -648,6 +687,7 @@ impl StreamTaskInfo { .await .generate_metadata(store_id) .await?; + let rts = metadata_info.min_resolved_ts; // flush log file to storage. self.flush_log().await?; @@ -657,7 +697,7 @@ impl StreamTaskInfo { // clear flushing files self.clear_flushing_files().await; - Ok(()) + Ok(Some(rts)) } } diff --git a/components/br-stream/src/utils.rs b/components/br-stream/src/utils.rs index 59b8a107d2..0f6c32094c 100644 --- a/components/br-stream/src/utils.rs +++ b/components/br-stream/src/utils.rs @@ -6,7 +6,10 @@ use std::{ time::Duration, }; -use crate::errors::{Error, Result}; +use crate::{ + annotate, + errors::{Error, Result}, +}; use engine_traits::CF_DEFAULT; use futures::{channel::mpsc, executor::block_on, StreamExt}; @@ -28,11 +31,11 @@ pub fn wrap_key(v: Vec) -> Vec { /// decode ts from a key, and transform the error to the crate error. pub fn get_ts(key: &Key) -> Result { key.decode_ts().map_err(|err| { - Error::Other(box_err!( - "failed to get ts from key '{}': {}", - log_wrappers::Value::key(key.as_encoded().as_slice()), - err - )) + annotate!( + err, + "failed to get ts from key '{}'", + redact(&key.as_encoded()) + ) }) } From 229b627fd2e0d95a40c272a6fdae83a9b51bf9d0 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 25 Jan 2022 18:36:37 +0800 Subject: [PATCH 2/7] fix unexpected errors Signed-off-by: Yu Juncen --- components/br-stream/src/router.rs | 32 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index b0c9b9df3c..c08156f844 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -99,22 +99,22 @@ impl ApplyEvent { } }; if cf == CF_LOCK { - match Lock::parse(&value).map_err(|err| { - annotate!( - err, - "failed to parse lock (value = {})", - utils::redact(&value) - ) - }) { - Ok(lock) => match cmd_type { - CmdType::Put => resolver.track_lock(lock.ts, key, None), - CmdType::Delete => resolver.untrack_lock(&key, None), - _ => continue, - }, - Err(err) => err.report(format!("region id = {}", region_id)), - }; - - continue; + match cmd_type { + CmdType::Put => { + match Lock::parse(&value).map_err(|err| { + annotate!( + err, + "failed to parse lock (value = {})", + utils::redact(&value) + ) + }) { + Ok(lock) => resolver.track_lock(lock.ts, key, None), + Err(err) => err.report(format!("region id = {}", region_id)), + } + } + CmdType::Delete => resolver.untrack_lock(&key, None), + _ => continue, + } } // use the key ts as min_ts would be safe. // - if it is uncommitted, the lock would be tracked, preventing resolved ts From f7e0f7390cbb45f07fd7f236fb74b45aa12af26d Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 25 Jan 2022 18:41:37 +0800 Subject: [PATCH 3/7] fix compile Signed-off-by: Yu Juncen --- components/br-stream/src/router.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index c08156f844..0a98d7b715 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -113,8 +113,9 @@ impl ApplyEvent { } } CmdType::Delete => resolver.untrack_lock(&key, None), - _ => continue, + _ => {} } + continue; } // use the key ts as min_ts would be safe. // - if it is uncommitted, the lock would be tracked, preventing resolved ts From ff8b106de764f3c74bcfd62100dac0281d3b29a3 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 26 Jan 2022 10:10:25 +0800 Subject: [PATCH 4/7] added some metrics Signed-off-by: Yu Juncen --- components/br-stream/src/endpoint.rs | 8 ++++++++ components/br-stream/src/metrics.rs | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index 544ab1fd4d..ef152fd288 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -330,7 +330,15 @@ where if let Some(rts) = router.do_flush(&task, store_id).await { if let Err(err) = cli.step_task(&task, rts).await { err.report(format!("on flushing task {}", task)); + // we can advance the progress at next time. + // return early so we won't be mislead by the metrics. + return; } + metrics::STORE_CHECKPOINT_TS + // Currently, we only support one task at the same time, + // so use the task as label would be ok. + .with_label_values(&[task.as_str()]) + .set(rts as _) } }); } diff --git a/components/br-stream/src/metrics.rs b/components/br-stream/src/metrics.rs index ff5e5e8760..de917ada8e 100644 --- a/components/br-stream/src/metrics.rs +++ b/components/br-stream/src/metrics.rs @@ -47,4 +47,10 @@ lazy_static! { exponential_buckets(0.001, 2.0, 16).unwrap() ) .unwrap(); + pub static ref STORE_CHECKPOINT_TS: IntGaugeVec = register_int_gauge_vec!( + "tikv_stream_store_checkpoint_ts", + "The checkpoint ts (next backup ts) of task", + &["task"], + ) + .unwrap(); } From d0a818f3a27b8cc78a1f9ebfd60862428acd33fe Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 26 Jan 2022 15:08:41 +0800 Subject: [PATCH 5/7] reset resolver when update or delete Signed-off-by: Yu Juncen --- components/br-stream/src/endpoint.rs | 65 +++++++++++++++++++++------- components/br-stream/src/observer.rs | 52 +++++++++++++++------- components/br-stream/src/utils.rs | 23 ++++++++++ 3 files changed, 108 insertions(+), 32 deletions(-) diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index ef152fd288..ec1fa72937 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -344,21 +344,46 @@ where } /// Start observe over some region. - /// This would register the region to the RaftStore. - /// - /// > Note: This won't trigger a incremental scanning. - /// > When the follower progress faster than leader and then be elected, - /// > there is a risk of losing data. - pub fn on_observe_region(&self, region: Region) { + /// This would modify some internal state, and delegate the task to InitialLoader::observe_over. + fn observe_over(&self, region: &Region) -> Result<()> { let init = self.make_initial_loader(); let handle = ObserveHandle::new(); let region_id = region.get_id(); let ob = ChangeObserver::from_cdc(region_id, handle.clone()); - if let Err(e) = init.observe_over(®ion, ob) { - e.report(format!("register region {} to raftstore", region.get_id())); - } + init.observe_over(®ion, ob)?; self.observer.subs.register_region(region_id, handle); self.resolvers.insert(region.id, Resolver::new(region.id)); + Ok(()) + } + + /// Modify observe over some region. + /// This would register the region to the RaftStore. + /// + /// > Note: If using this to start observe, this won't trigger a incremental scanning. + /// > When the follower progress faster than leader and then be elected, + /// > there is a risk of losing data. + pub fn on_modify_observe(&self, op: ObserveOp) { + match op { + ObserveOp::Start { region } => { + if let Err(e) = self.observe_over(®ion) { + e.report(format!("register region {} to raftstore", region.get_id())); + } + } + ObserveOp::Stop { region } => { + self.observer.subs.deregister_region(region.id); + self.resolvers.as_ref().remove(®ion.id); + } + ObserveOp::RefreshResolver { region } => { + self.observer.subs.deregister_region(region.id); + self.resolvers.as_ref().remove(®ion.id); + if let Err(e) = self.observe_over(®ion) { + e.report(format!( + "register region {} to raftstore when refreshing", + region.get_id() + )); + } + } + } } pub fn do_backup(&mut self, events: Vec) { @@ -394,12 +419,23 @@ pub enum Task { ChangeConfig(ConfigChange), /// Flush the task with name. Flush(String), - /// Start observe over the region. - ObserverRegion { + /// Change the observe status of some region. + ModifyObserve(ObserveOp), +} + +#[derive(Debug)] +pub enum ObserveOp { + Start { region: Region, // Note: Maybe add the request for initial scanning too. // needs_initial_scanning: bool }, + Stop { + region: Region, + }, + RefreshResolver { + region: Region, + }, } impl fmt::Debug for Task { @@ -412,10 +448,7 @@ impl fmt::Debug for Task { .finish(), Self::ChangeConfig(arg0) => f.debug_tuple("ChangeConfig").field(arg0).finish(), Self::Flush(arg0) => f.debug_tuple("Flush").field(arg0).finish(), - Self::ObserverRegion { region } => f - .debug_struct("ObserverRegion") - .field("region", region) - .finish(), + Self::ModifyObserve(op) => f.debug_tuple("ModifyObserve").field(op).finish(), } } } @@ -440,8 +473,8 @@ where match task { Task::WatchTask(task) => self.on_register(task), Task::BatchEvent(events) => self.do_backup(events), - Task::ObserverRegion { region } => self.on_observe_region(region), Task::Flush(task) => self.on_flush(task, self.store_id), + Task::ModifyObserve(op) => self.on_modify_observe(op), _ => (), } } diff --git a/components/br-stream/src/observer.rs b/components/br-stream/src/observer.rs index 9b67619002..27b8dcd74b 100644 --- a/components/br-stream/src/observer.rs +++ b/components/br-stream/src/observer.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, RwLock}; use crate::errors::Error; +use crate::try_send; use crate::utils::SegmentTree; use dashmap::DashMap; use engine_traits::KvEngine; @@ -12,7 +13,7 @@ use tikv_util::worker::Scheduler; use tikv_util::{debug, warn}; use tikv_util::{info, HandyRwLock}; -use crate::endpoint::Task; +use crate::endpoint::{ObserveOp, Task}; /// An Observer for Backup Stream. /// @@ -52,9 +53,12 @@ impl BackupStreamObserver { /// The internal way to register a region. /// It delegate the initial scanning and modify of the subs to the endpoint. fn register_region(&self, region: &Region) { - if let Err(err) = self.scheduler.schedule(Task::ObserverRegion { - region: region.clone(), - }) { + if let Err(err) = self + .scheduler + .schedule(Task::ModifyObserve(ObserveOp::Start { + region: region.clone(), + })) + { Error::from(err).report(format_args!( "failed to schedule role change for region {}", region.get_id() @@ -150,9 +154,7 @@ impl CmdObserver for BackupStreamObserver { if cmd_batches.is_empty() { return; } - if let Err(e) = self.scheduler.schedule(Task::BatchEvent(cmd_batches)) { - warn!("backup stream schedule task failed"; "error" => ?e); - } + try_send!(self.scheduler, Task::BatchEvent(cmd_batches)); } fn on_applied_current_term(&self, role: StateRole, region: &Region) { @@ -165,8 +167,12 @@ impl CmdObserver for BackupStreamObserver { impl RoleObserver for BackupStreamObserver { fn on_role_change(&self, ctx: &mut ObserverContext<'_>, r: StateRole) { if r != StateRole::Leader { - let region = ctx.region(); - self.subs.deregister_region(region.get_id()); + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::Stop { + region: ctx.region().clone(), + }) + ); } } } @@ -182,10 +188,24 @@ impl RegionChangeObserver for BackupStreamObserver { // `on_applied_current_term`. // But should we deregister and register again when the region is `Update`d? - if matches!(event, RegionChangeEvent::Destroy) - && self.subs.should_observe(ctx.region().get_id()) - { - self.subs.deregister_region(ctx.region().get_id()) + match event { + RegionChangeEvent::Destroy if self.subs.should_observe(ctx.region().get_id()) => { + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::Stop { + region: ctx.region().clone(), + }) + ); + } + RegionChangeEvent::Update => { + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::RefreshResolver { + region: ctx.region().clone(), + }) + ); + } + _ => {} } } } @@ -207,7 +227,7 @@ mod tests { use tikv_util::worker::dummy_scheduler; use tikv_util::HandyRwLock; - use crate::endpoint::Task; + use crate::endpoint::{ObserveOp, Task}; use super::BackupStreamObserver; @@ -232,7 +252,7 @@ mod tests { o.register_region(&r); let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap(); let handle = ObserveHandle::new(); - if let Task::ObserverRegion { region } = task { + if let Task::ModifyObserve(ObserveOp::Start { region }) = task { o.subs.register_region(region.get_id(), handle.clone()) } else { panic!("unexpected message received: it is {}", task); @@ -256,7 +276,7 @@ mod tests { o.register_region(&r); let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap(); let handle = ObserveHandle::new(); - if let Task::ObserverRegion { region } = task { + if let Task::ModifyObserve(ObserveOp::Start { region }) = task { o.subs.register_region(region.get_id(), handle.clone()) } else { panic!("unexpected message received: it is {}", task); diff --git a/components/br-stream/src/utils.rs b/components/br-stream/src/utils.rs index 0f6c32094c..3b0c1433b5 100644 --- a/components/br-stream/src/utils.rs +++ b/components/br-stream/src/utils.rs @@ -207,6 +207,29 @@ pub fn request_to_triple(mut req: Request) -> Either<(Vec, Vec, String), Either::Left((key, value, cf)) } +/// `try_send!(s: Scheduler, task: T)` tries to send a task to the scheduler, +/// once meet an error, would report it, with the current file and line (so it is made as a macro). +/// returns whether it success. +#[macro_export(crate)] +macro_rules! try_send { + ($s: expr, $task: expr) => { + match $s.schedule($task) { + Err(err) => { + $crate::errors::Error::from(err).report(concat!( + "[", + file!(), + ":", + line!(), + "]", + "failed to schedule task" + )); + false + } + Ok(_) => true, + } + }; +} + #[cfg(test)] mod test { use super::SegmentTree; From 6a81258f0cf702369ea979579505746c4100a680 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Mon, 7 Feb 2022 11:19:45 +0800 Subject: [PATCH 6/7] move some comments Signed-off-by: Yu Juncen --- components/br-stream/src/observer.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/components/br-stream/src/observer.rs b/components/br-stream/src/observer.rs index 27b8dcd74b..ec034b2cea 100644 --- a/components/br-stream/src/observer.rs +++ b/components/br-stream/src/observer.rs @@ -184,10 +184,6 @@ impl RegionChangeObserver for BackupStreamObserver { event: RegionChangeEvent, _role: StateRole, ) { - // No need for handling `Create` -- once it becomes leader, it would start by - // `on_applied_current_term`. - // But should we deregister and register again when the region is `Update`d? - match event { RegionChangeEvent::Destroy if self.subs.should_observe(ctx.region().get_id()) => { try_send!( @@ -205,6 +201,8 @@ impl RegionChangeObserver for BackupStreamObserver { }) ); } + // No need for handling `Create` -- once it becomes leader, it would start by + // `on_applied_current_term`. _ => {} } } From 72fb1a198e82501a8f93f6a332cb471ca12e7716 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 8 Feb 2022 10:18:10 +0800 Subject: [PATCH 7/7] make clippy happy Signed-off-by: Yu Juncen --- components/br-stream/src/endpoint.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index 0e896db8e6..7946c0ff6c 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -389,7 +389,7 @@ where let handle = ObserveHandle::new(); let region_id = region.get_id(); let ob = ChangeObserver::from_cdc(region_id, handle.clone()); - init.observe_over(®ion, ob)?; + init.observe_over(region, ob)?; self.observer.subs.register_region(region_id, handle); self.resolvers.insert(region.id, Resolver::new(region.id)); Ok(())