Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resolved timestamp uploading #14

Merged
merged 9 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/br-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ raft = { version = "0.6.0-alpha", default-features = false, features = ["protobu
raftstore = { path = "../raftstore", default-features = false }
tikv = { path = "../../", default-features = false }
online_config = { path = "../online_config" }
resolved_ts = { path = "../resolved_ts"}

[dev-dependencies]
rand = "0.8.0"
Expand Down
110 changes: 91 additions & 19 deletions components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use std::convert::AsRef;
use std::fmt;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use engine_traits::KvEngine;

use kvproto::metapb::Region;
use raftstore::router::RaftStoreRouter;
use raftstore::store::fsm::ChangeObserver;
use resolved_ts::Resolver;
use tikv_util::time::Instant;

use crossbeam_channel::tick;
Expand Down Expand Up @@ -53,6 +56,7 @@ pub struct Endpoint<S: MetaStore + 'static, R, E, RT> {
regions: R,
engine: PhantomData<E>,
router: RT,
resolvers: Arc<DashMap<u64, Resolver>>,
}

impl<R, E, RT> Endpoint<EtcdStore, R, E, RT>
Expand Down Expand Up @@ -111,6 +115,7 @@ where
));
}

info!("the endpoint of stream backup started"; "path" => %config.streaming_path);
Endpoint {
config,
meta_client,
Expand All @@ -122,6 +127,7 @@ where
regions: accessor,
engine: PhantomData,
router,
resolvers: Default::default(),
}
}
}
Expand Down Expand Up @@ -198,7 +204,17 @@ where
fn backup_batch(&self, batch: CmdBatch) {
let mut sw = StopWatch::new();
let region_id = batch.region_id;
let kvs = ApplyEvent::from_cmd_batch(batch, /* TODO */ 0);
let mut resolver = match self.resolvers.as_ref().get_mut(&region_id) {
Some(rts) => rts,
None => {
warn!("BUG: the region isn't registered (no resolver found) but sent to backup_batch."; "region_id" => %region_id);
return;
}
};

let kvs = ApplyEvent::from_cmd_batch(batch, resolver.value_mut());
drop(resolver);

HANDLE_EVENT_DURATION_HISTOGRAM
.with_label_values(&["to_stream_event"])
.observe(sw.lap().as_secs_f64());
Expand Down Expand Up @@ -287,6 +303,7 @@ where
let start = Instant::now_coarse();
let start_ts = task.info.get_start_ts();
let ob = self.observer.clone();
let rs = self.resolvers.clone();
let success = self
.observer
.ranges
Expand All @@ -305,7 +322,11 @@ where
start_key.clone(),
end_key.clone(),
TimeStamp::new(start_ts),
|region_id, handle| ob.subs.register_region(region_id, handle),
|region_id, handle| {
// Note: maybe we'd better schedule a "register region" here?
ob.subs.register_region(region_id, handle);
rs.insert(region_id, Resolver::new(region_id));
},
);
match range_init_result {
Ok(stat) => {
Expand Down Expand Up @@ -339,26 +360,69 @@ where

pub fn on_flush(&self, task: String, store_id: u64) {
let router = self.range_router.clone();
let cli = self
.meta_client
.as_ref()
.expect("on_flush: executed from an endpoint without cli")
.clone();
self.pool.spawn(async move {
router.do_flush(&task, store_id).await;
if let Some(rts) = router.do_flush(&task, store_id).await {
if let Err(err) = cli.step_task(&task, rts).await {
err.report(format!("on flushing task {}", task));
// we can advance the progress at next time.
// return early so we won't be mislead by the metrics.
return;
}
metrics::STORE_CHECKPOINT_TS
// Currently, we only support one task at the same time,
// so use the task as label would be ok.
.with_label_values(&[task.as_str()])
.set(rts as _)
}
});
}

/// Start observe over some region.
/// This would register the region to the RaftStore.
///
/// > Note: This won't trigger a incremental scanning.
/// > When the follower progress faster than leader and then be elected,
/// > there is a risk of losing data.
pub fn on_observe_region(&self, region: Region) {
/// This would modify some internal state, and delegate the task to InitialLoader::observe_over.
fn observe_over(&self, region: &Region) -> Result<()> {
let init = self.make_initial_loader();
let handle = ObserveHandle::new();
let region_id = region.get_id();
let ob = ChangeObserver::from_cdc(region_id, handle.clone());
if let Err(e) = init.observe_over(&region, ob) {
e.report(format!("register region {} to raftstore", region.get_id()));
}
init.observe_over(region, ob)?;
self.observer.subs.register_region(region_id, handle);
self.resolvers.insert(region.id, Resolver::new(region.id));
Ok(())
}

/// Modify observe over some region.
/// This would register the region to the RaftStore.
///
/// > Note: If using this to start observe, this won't trigger a incremental scanning.
/// > When the follower progress faster than leader and then be elected,
/// > there is a risk of losing data.
pub fn on_modify_observe(&self, op: ObserveOp) {
match op {
ObserveOp::Start { region } => {
if let Err(e) = self.observe_over(&region) {
e.report(format!("register region {} to raftstore", region.get_id()));
}
}
ObserveOp::Stop { region } => {
self.observer.subs.deregister_region(region.id);
self.resolvers.as_ref().remove(&region.id);
}
ObserveOp::RefreshResolver { region } => {
self.observer.subs.deregister_region(region.id);
self.resolvers.as_ref().remove(&region.id);
if let Err(e) = self.observe_over(&region) {
e.report(format!(
"register region {} to raftstore when refreshing",
region.get_id()
));
}
}
}
}

pub fn do_backup(&mut self, events: Vec<CmdBatch>) {
Expand Down Expand Up @@ -394,12 +458,23 @@ pub enum Task {
ChangeConfig(ConfigChange),
/// Flush the task with name.
Flush(String),
/// Start observe over the region.
ObserverRegion {
/// Change the observe status of some region.
ModifyObserve(ObserveOp),
}

#[derive(Debug)]
pub enum ObserveOp {
Start {
region: Region,
// Note: Maybe add the request for initial scanning too.
// needs_initial_scanning: bool
},
Stop {
region: Region,
},
RefreshResolver {
region: Region,
},
}

impl fmt::Debug for Task {
Expand All @@ -412,10 +487,7 @@ impl fmt::Debug for Task {
.finish(),
Self::ChangeConfig(arg0) => f.debug_tuple("ChangeConfig").field(arg0).finish(),
Self::Flush(arg0) => f.debug_tuple("Flush").field(arg0).finish(),
Self::ObserverRegion { region } => f
.debug_struct("ObserverRegion")
.field("region", region)
.finish(),
Self::ModifyObserve(op) => f.debug_tuple("ModifyObserve").field(op).finish(),
}
}
}
Expand All @@ -440,8 +512,8 @@ where
match task {
Task::WatchTask(task) => self.on_register(task),
Task::BatchEvent(events) => self.do_backup(events),
Task::ObserverRegion { region } => self.on_observe_region(region),
Task::Flush(task) => self.on_flush(task, self.store_id),
Task::ModifyObserve(op) => self.on_modify_observe(op),
_ => (),
}
}
Expand Down
4 changes: 4 additions & 0 deletions components/br-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ where
}

/// Initialize the region: register it to the raftstore and the observer.
/// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`)
/// and generate the corresponding ApplyEvent to the sink directly.
pub fn initialize_region(
&self,
region: &Region,
start_ts: TimeStamp,
cmd: ChangeObserver,
) -> Result<Statistics> {
let snap = self.observe_over(region, cmd)?;
// It is ok to sink more data than needed. So scan to +inf TS for convenance.
let mut event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?;
let mut stats = StatisticsSummary::default();
loop {
Expand All @@ -207,6 +210,7 @@ where
Ok(stats.stat)
}

/// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`].
pub fn initialize_range(
&self,
start_key: Vec<u8>,
Expand Down
6 changes: 6 additions & 0 deletions components/br-stream/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ lazy_static! {
exponential_buckets(0.001, 2.0, 16).unwrap()
)
.unwrap();
pub static ref STORE_CHECKPOINT_TS: IntGaugeVec = register_int_gauge_vec!(
"tikv_stream_store_checkpoint_ts",
"The checkpoint ts (next backup ts) of task",
&["task"],
)
.unwrap();
}
58 changes: 38 additions & 20 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::{Arc, RwLock};

use crate::errors::Error;
use crate::try_send;
use crate::utils::SegmentTree;
use dashmap::DashMap;
use engine_traits::KvEngine;
Expand All @@ -12,7 +13,7 @@ use tikv_util::worker::Scheduler;
use tikv_util::{debug, warn};
use tikv_util::{info, HandyRwLock};

use crate::endpoint::Task;
use crate::endpoint::{ObserveOp, Task};

/// An Observer for Backup Stream.
///
Expand Down Expand Up @@ -52,9 +53,12 @@ impl BackupStreamObserver {
/// The internal way to register a region.
/// It delegate the initial scanning and modify of the subs to the endpoint.
fn register_region(&self, region: &Region) {
if let Err(err) = self.scheduler.schedule(Task::ObserverRegion {
region: region.clone(),
}) {
if let Err(err) = self
.scheduler
.schedule(Task::ModifyObserve(ObserveOp::Start {
region: region.clone(),
}))
{
Error::from(err).report(format_args!(
"failed to schedule role change for region {}",
region.get_id()
Expand Down Expand Up @@ -150,9 +154,7 @@ impl<E: KvEngine> CmdObserver<E> for BackupStreamObserver {
if cmd_batches.is_empty() {
return;
}
if let Err(e) = self.scheduler.schedule(Task::BatchEvent(cmd_batches)) {
warn!("backup stream schedule task failed"; "error" => ?e);
}
try_send!(self.scheduler, Task::BatchEvent(cmd_batches));
}

fn on_applied_current_term(&self, role: StateRole, region: &Region) {
Expand All @@ -165,8 +167,12 @@ impl<E: KvEngine> CmdObserver<E> for BackupStreamObserver {
impl RoleObserver for BackupStreamObserver {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, r: StateRole) {
if r != StateRole::Leader {
let region = ctx.region();
self.subs.deregister_region(region.get_id());
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Stop {
region: ctx.region().clone(),
})
);
}
}
}
Expand All @@ -178,14 +184,26 @@ impl RegionChangeObserver for BackupStreamObserver {
event: RegionChangeEvent,
_role: StateRole,
) {
// No need for handling `Create` -- once it becomes leader, it would start by
// `on_applied_current_term`.
// But should we deregister and register again when the region is `Update`d?

if matches!(event, RegionChangeEvent::Destroy)
&& self.subs.should_observe(ctx.region().get_id())
{
self.subs.deregister_region(ctx.region().get_id())
match event {
RegionChangeEvent::Destroy if self.subs.should_observe(ctx.region().get_id()) => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Stop {
region: ctx.region().clone(),
})
);
}
RegionChangeEvent::Update => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::RefreshResolver {
region: ctx.region().clone(),
})
);
}
// No need for handling `Create` -- once it becomes leader, it would start by
// `on_applied_current_term`.
_ => {}
}
}
}
Expand All @@ -207,7 +225,7 @@ mod tests {
use tikv_util::worker::dummy_scheduler;
use tikv_util::HandyRwLock;

use crate::endpoint::Task;
use crate::endpoint::{ObserveOp, Task};

use super::BackupStreamObserver;

Expand All @@ -232,7 +250,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ObserverRegion { region } = task {
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
Expand All @@ -256,7 +274,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ObserverRegion { region } = task {
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
Expand Down
Loading