Skip to content

Commit

Permalink
Added resolved timestamp uploading (#14)
Browse files Browse the repository at this point in the history
* introduce resolved ts

Signed-off-by: Yu Juncen <yu745514916@live.com>

* fix unexpected errors

Signed-off-by: Yu Juncen <yu745514916@live.com>

* fix compile

Signed-off-by: Yu Juncen <yu745514916@live.com>

* added some metrics

Signed-off-by: Yu Juncen <yu745514916@live.com>

* reset resolver when update or delete

Signed-off-by: Yu Juncen <yu745514916@live.com>

* move some comments

Signed-off-by: Yu Juncen <yu745514916@live.com>

* make clippy happy

Signed-off-by: Yu Juncen <yu745514916@live.com>
  • Loading branch information
YuJuncen authored Feb 9, 2022
1 parent 2a63578 commit 6eaa365
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/br-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ raft = { version = "0.6.0-alpha", default-features = false, features = ["protobu
raftstore = { path = "../raftstore", default-features = false }
tikv = { path = "../../", default-features = false }
online_config = { path = "../online_config" }
resolved_ts = { path = "../resolved_ts"}

[dev-dependencies]
rand = "0.8.0"
Expand Down
110 changes: 91 additions & 19 deletions components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use std::convert::AsRef;
use std::fmt;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use engine_traits::KvEngine;

use kvproto::metapb::Region;
use raftstore::router::RaftStoreRouter;
use raftstore::store::fsm::ChangeObserver;
use resolved_ts::Resolver;
use tikv_util::time::Instant;

use crossbeam_channel::tick;
Expand Down Expand Up @@ -53,6 +56,7 @@ pub struct Endpoint<S: MetaStore + 'static, R, E, RT> {
regions: R,
engine: PhantomData<E>,
router: RT,
resolvers: Arc<DashMap<u64, Resolver>>,
}

impl<R, E, RT> Endpoint<EtcdStore, R, E, RT>
Expand Down Expand Up @@ -111,6 +115,7 @@ where
));
}

info!("the endpoint of stream backup started"; "path" => %config.streaming_path);
Endpoint {
config,
meta_client,
Expand All @@ -122,6 +127,7 @@ where
regions: accessor,
engine: PhantomData,
router,
resolvers: Default::default(),
}
}
}
Expand Down Expand Up @@ -198,7 +204,17 @@ where
fn backup_batch(&self, batch: CmdBatch) {
let mut sw = StopWatch::new();
let region_id = batch.region_id;
let kvs = ApplyEvent::from_cmd_batch(batch, /* TODO */ 0);
let mut resolver = match self.resolvers.as_ref().get_mut(&region_id) {
Some(rts) => rts,
None => {
warn!("BUG: the region isn't registered (no resolver found) but sent to backup_batch."; "region_id" => %region_id);
return;
}
};

let kvs = ApplyEvent::from_cmd_batch(batch, resolver.value_mut());
drop(resolver);

HANDLE_EVENT_DURATION_HISTOGRAM
.with_label_values(&["to_stream_event"])
.observe(sw.lap().as_secs_f64());
Expand Down Expand Up @@ -287,6 +303,7 @@ where
let start = Instant::now_coarse();
let start_ts = task.info.get_start_ts();
let ob = self.observer.clone();
let rs = self.resolvers.clone();
let success = self
.observer
.ranges
Expand All @@ -305,7 +322,11 @@ where
start_key.clone(),
end_key.clone(),
TimeStamp::new(start_ts),
|region_id, handle| ob.subs.register_region(region_id, handle),
|region_id, handle| {
// Note: maybe we'd better schedule a "register region" here?
ob.subs.register_region(region_id, handle);
rs.insert(region_id, Resolver::new(region_id));
},
);
match range_init_result {
Ok(stat) => {
Expand Down Expand Up @@ -339,26 +360,69 @@ where

pub fn on_flush(&self, task: String, store_id: u64) {
let router = self.range_router.clone();
let cli = self
.meta_client
.as_ref()
.expect("on_flush: executed from an endpoint without cli")
.clone();
self.pool.spawn(async move {
router.do_flush(&task, store_id).await;
if let Some(rts) = router.do_flush(&task, store_id).await {
if let Err(err) = cli.step_task(&task, rts).await {
err.report(format!("on flushing task {}", task));
// we can advance the progress at next time.
// return early so we won't be mislead by the metrics.
return;
}
metrics::STORE_CHECKPOINT_TS
// Currently, we only support one task at the same time,
// so use the task as label would be ok.
.with_label_values(&[task.as_str()])
.set(rts as _)
}
});
}

/// Start observe over some region.
/// This would register the region to the RaftStore.
///
/// > Note: This won't trigger a incremental scanning.
/// > When the follower progress faster than leader and then be elected,
/// > there is a risk of losing data.
pub fn on_observe_region(&self, region: Region) {
/// This would modify some internal state, and delegate the task to InitialLoader::observe_over.
fn observe_over(&self, region: &Region) -> Result<()> {
let init = self.make_initial_loader();
let handle = ObserveHandle::new();
let region_id = region.get_id();
let ob = ChangeObserver::from_cdc(region_id, handle.clone());
if let Err(e) = init.observe_over(&region, ob) {
e.report(format!("register region {} to raftstore", region.get_id()));
}
init.observe_over(region, ob)?;
self.observer.subs.register_region(region_id, handle);
self.resolvers.insert(region.id, Resolver::new(region.id));
Ok(())
}

/// Modify observe over some region.
/// This would register the region to the RaftStore.
///
/// > Note: If using this to start observe, this won't trigger a incremental scanning.
/// > When the follower progress faster than leader and then be elected,
/// > there is a risk of losing data.
pub fn on_modify_observe(&self, op: ObserveOp) {
match op {
ObserveOp::Start { region } => {
if let Err(e) = self.observe_over(&region) {
e.report(format!("register region {} to raftstore", region.get_id()));
}
}
ObserveOp::Stop { region } => {
self.observer.subs.deregister_region(region.id);
self.resolvers.as_ref().remove(&region.id);
}
ObserveOp::RefreshResolver { region } => {
self.observer.subs.deregister_region(region.id);
self.resolvers.as_ref().remove(&region.id);
if let Err(e) = self.observe_over(&region) {
e.report(format!(
"register region {} to raftstore when refreshing",
region.get_id()
));
}
}
}
}

pub fn do_backup(&mut self, events: Vec<CmdBatch>) {
Expand Down Expand Up @@ -394,12 +458,23 @@ pub enum Task {
ChangeConfig(ConfigChange),
/// Flush the task with name.
Flush(String),
/// Start observe over the region.
ObserverRegion {
/// Change the observe status of some region.
ModifyObserve(ObserveOp),
}

#[derive(Debug)]
pub enum ObserveOp {
Start {
region: Region,
// Note: Maybe add the request for initial scanning too.
// needs_initial_scanning: bool
},
Stop {
region: Region,
},
RefreshResolver {
region: Region,
},
}

impl fmt::Debug for Task {
Expand All @@ -412,10 +487,7 @@ impl fmt::Debug for Task {
.finish(),
Self::ChangeConfig(arg0) => f.debug_tuple("ChangeConfig").field(arg0).finish(),
Self::Flush(arg0) => f.debug_tuple("Flush").field(arg0).finish(),
Self::ObserverRegion { region } => f
.debug_struct("ObserverRegion")
.field("region", region)
.finish(),
Self::ModifyObserve(op) => f.debug_tuple("ModifyObserve").field(op).finish(),
}
}
}
Expand All @@ -440,8 +512,8 @@ where
match task {
Task::WatchTask(task) => self.on_register(task),
Task::BatchEvent(events) => self.do_backup(events),
Task::ObserverRegion { region } => self.on_observe_region(region),
Task::Flush(task) => self.on_flush(task, self.store_id),
Task::ModifyObserve(op) => self.on_modify_observe(op),
_ => (),
}
}
Expand Down
4 changes: 4 additions & 0 deletions components/br-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,16 @@ where
}

/// Initialize the region: register it to the raftstore and the observer.
/// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`)
/// and generate the corresponding ApplyEvent to the sink directly.
pub fn initialize_region(
&self,
region: &Region,
start_ts: TimeStamp,
cmd: ChangeObserver,
) -> Result<Statistics> {
let snap = self.observe_over(region, cmd)?;
// It is ok to sink more data than needed. So scan to +inf TS for convenance.
let mut event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?;
let mut stats = StatisticsSummary::default();
loop {
Expand All @@ -207,6 +210,7 @@ where
Ok(stats.stat)
}

/// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`].
pub fn initialize_range(
&self,
start_key: Vec<u8>,
Expand Down
6 changes: 6 additions & 0 deletions components/br-stream/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ lazy_static! {
exponential_buckets(0.001, 2.0, 16).unwrap()
)
.unwrap();
pub static ref STORE_CHECKPOINT_TS: IntGaugeVec = register_int_gauge_vec!(
"tikv_stream_store_checkpoint_ts",
"The checkpoint ts (next backup ts) of task",
&["task"],
)
.unwrap();
}
58 changes: 38 additions & 20 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::{Arc, RwLock};

use crate::errors::Error;
use crate::try_send;
use crate::utils::SegmentTree;
use dashmap::DashMap;
use engine_traits::KvEngine;
Expand All @@ -12,7 +13,7 @@ use tikv_util::worker::Scheduler;
use tikv_util::{debug, warn};
use tikv_util::{info, HandyRwLock};

use crate::endpoint::Task;
use crate::endpoint::{ObserveOp, Task};

/// An Observer for Backup Stream.
///
Expand Down Expand Up @@ -52,9 +53,12 @@ impl BackupStreamObserver {
/// The internal way to register a region.
/// It delegate the initial scanning and modify of the subs to the endpoint.
fn register_region(&self, region: &Region) {
if let Err(err) = self.scheduler.schedule(Task::ObserverRegion {
region: region.clone(),
}) {
if let Err(err) = self
.scheduler
.schedule(Task::ModifyObserve(ObserveOp::Start {
region: region.clone(),
}))
{
Error::from(err).report(format_args!(
"failed to schedule role change for region {}",
region.get_id()
Expand Down Expand Up @@ -150,9 +154,7 @@ impl<E: KvEngine> CmdObserver<E> for BackupStreamObserver {
if cmd_batches.is_empty() {
return;
}
if let Err(e) = self.scheduler.schedule(Task::BatchEvent(cmd_batches)) {
warn!("backup stream schedule task failed"; "error" => ?e);
}
try_send!(self.scheduler, Task::BatchEvent(cmd_batches));
}

fn on_applied_current_term(&self, role: StateRole, region: &Region) {
Expand All @@ -165,8 +167,12 @@ impl<E: KvEngine> CmdObserver<E> for BackupStreamObserver {
impl RoleObserver for BackupStreamObserver {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, r: StateRole) {
if r != StateRole::Leader {
let region = ctx.region();
self.subs.deregister_region(region.get_id());
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Stop {
region: ctx.region().clone(),
})
);
}
}
}
Expand All @@ -178,14 +184,26 @@ impl RegionChangeObserver for BackupStreamObserver {
event: RegionChangeEvent,
_role: StateRole,
) {
// No need for handling `Create` -- once it becomes leader, it would start by
// `on_applied_current_term`.
// But should we deregister and register again when the region is `Update`d?

if matches!(event, RegionChangeEvent::Destroy)
&& self.subs.should_observe(ctx.region().get_id())
{
self.subs.deregister_region(ctx.region().get_id())
match event {
RegionChangeEvent::Destroy if self.subs.should_observe(ctx.region().get_id()) => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Stop {
region: ctx.region().clone(),
})
);
}
RegionChangeEvent::Update => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::RefreshResolver {
region: ctx.region().clone(),
})
);
}
// No need for handling `Create` -- once it becomes leader, it would start by
// `on_applied_current_term`.
_ => {}
}
}
}
Expand All @@ -207,7 +225,7 @@ mod tests {
use tikv_util::worker::dummy_scheduler;
use tikv_util::HandyRwLock;

use crate::endpoint::Task;
use crate::endpoint::{ObserveOp, Task};

use super::BackupStreamObserver;

Expand All @@ -232,7 +250,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ObserverRegion { region } = task {
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
Expand All @@ -256,7 +274,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ObserverRegion { region } = task {
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
Expand Down
Loading

0 comments on commit 6eaa365

Please sign in to comment.