Skip to content

Commit

Permalink
snap_recovery: added some metrics (tikv#15111)
Browse files Browse the repository at this point in the history
close tikv#15110

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
YuJuncen and ti-chi-bot[bot] committed Aug 2, 2023
1 parent 9bb7bd6 commit 456c857
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 12 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion components/snap_recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ engine_traits = { workspace = true }
futures = { version = "0.3", features = ["executor"] }
grpcio = { workspace = true }
keys = { workspace = true }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-6.5" }
kvproto = { workspace = true }
lazy_static = "1.4"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
pd_client = { workspace = true }
prometheus = { version = "0.13", default_features = false, features = ["nightly"] }
prometheus-static-metric = "0.5"
protobuf = { version = "2.8", features = ["bytes"] }
raft_log_engine = { workspace = true }
raftstore = { workspace = true }
Expand All @@ -30,3 +33,4 @@ tikv_alloc = { workspace = true }
tikv_util = { workspace = true }
toml = "0.5"
txn_types = { workspace = true }

1 change: 1 addition & 0 deletions components/snap_recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub use init_cluster::{enter_snap_recovery_mode, start_recovery};
pub use services::RecoveryService;

mod data_resolver;
mod metrics;
mod region_meta_collector;
41 changes: 41 additions & 0 deletions components/snap_recovery/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use lazy_static::*;
use prometheus::*;
use prometheus_static_metric::*;

lazy_static! {
pub static ref REGION_EVENT_COUNTER: RegionEvent = register_static_int_counter_vec!(
RegionEvent,
"tikv_snap_restore_region_event",
"the total count of some events that each happened to one region. (But the counter counts all regions' events.)",
&["event"]
)
.unwrap();

// NOTE: should we handle the concurrent case by adding a tid parameter?
pub static ref CURRENT_WAIT_APPLY_LEADER: IntGauge = register_int_gauge!(
"tikv_current_waiting_leader_apply",
"the current leader we are awaiting."
).unwrap();

pub static ref CURRENT_WAIT_ELECTION_LEADER : IntGauge = register_int_gauge!(
"tikv_current_waiting_leader_election",
"the current leader we are awaiting."
).unwrap();

}

make_static_metric! {
pub label_enum RegionEventType {
collect_meta,
promote_to_leader,
keep_follower,
start_wait_leader_apply,
finish_wait_leader_apply,
}

pub struct RegionEvent : IntCounter {
"event" => RegionEventType,
}
}
3 changes: 3 additions & 0 deletions components/snap_recovery/src/region_meta_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use kvproto::{
use thiserror::Error;
use tikv_util::sys::thread::StdThreadBuildWrapper;

use crate::metrics::REGION_EVENT_COUNTER;

pub type Result<T> = result::Result<T, Error>;

#[allow(dead_code)]
Expand Down Expand Up @@ -146,6 +148,7 @@ impl<ER: RaftEngine> CollectWorker<ER> {
// send to br
let response = region_state.to_region_meta();

REGION_EVENT_COUNTER.collect_meta.inc();
if let Err(e) = self.tx.unbounded_send(response) {
warn!("send the region meta failure";
"err" => ?e);
Expand Down
39 changes: 28 additions & 11 deletions components/snap_recovery/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ use raftstore::{
use thiserror::Error;
use tikv_util::sys::thread::{StdThreadBuildWrapper, ThreadBuildWrapper};

use crate::{data_resolver::DataResolverManager, region_meta_collector::RegionMetaCollector};
use crate::{
data_resolver::DataResolverManager,
metrics::{CURRENT_WAIT_APPLY_LEADER, CURRENT_WAIT_ELECTION_LEADER, REGION_EVENT_COUNTER},
region_meta_collector::RegionMetaCollector,
};

pub type Result<T> = result::Result<T, Error>;

Expand Down Expand Up @@ -203,7 +207,7 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
}
.map(|res: Result<()>| match res {
Ok(_) => {
info!("collect region meta done");
debug!("collect region meta done");
}
Err(e) => {
error!("rcollect region meta failure"; "error" => ?e);
Expand All @@ -229,7 +233,10 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
while let Some(req) = stream.next().await {
let req = req.map_err(|e| eprintln!("rpc recv fail: {}", e)).unwrap();
if req.as_leader {
REGION_EVENT_COUNTER.promote_to_leader.inc();
leaders.push(req.region_id);
} else {
REGION_EVENT_COUNTER.keep_follower.inc();
}
}

Expand All @@ -242,8 +249,7 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
"err" => ?e);
continue;
} else {
info!("region starts to campaign";
"region_id" => region_id);
debug!("region starts to campaign"; "region_id" => region_id);
}

let (tx, rx) = sync_channel(1);
Expand All @@ -260,27 +266,32 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
rxs.push(Some(rx));
}

info!("send assign leader request done"; "count" => %leaders.len());

// leader is campaign and be ensured as leader
for (_rid, rx) in leaders.iter().zip(rxs) {
for (rid, rx) in leaders.iter().zip(rxs) {
if let Some(rx) = rx {
CURRENT_WAIT_ELECTION_LEADER.set(*rid as _);
match rx.recv() {
Ok(_id) => {
info!("leader is assigned for region");
Ok(id) => {
debug!("leader is assigned for region"; "region_id" => %id);
}
Err(e) => {
error!("check leader failed"; "error" => ?e);
}
}
}
}
CURRENT_WAIT_ELECTION_LEADER.set(0);

info!("all region leader assigned done");
info!("all region leader assigned done"; "count" => %leaders.len());

let now = Instant::now();
// wait apply to the last log
let mut rx_apply = Vec::with_capacity(leaders.len());
for &region_id in &leaders {
let (tx, rx) = sync_channel(1);
REGION_EVENT_COUNTER.start_wait_leader_apply.inc();
let wait_apply = SnapshotRecoveryWaitApplySyncer::new(region_id, tx.clone());
if let Err(e) = raft_router.significant_send(
region_id,
Expand All @@ -296,22 +307,25 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
}

// leader apply to last log
for (_rid, rx) in leaders.iter().zip(rx_apply) {
for (rid, rx) in leaders.iter().zip(rx_apply) {
if let Some(rx) = rx {
CURRENT_WAIT_APPLY_LEADER.set(*rid as _);
match rx.recv() {
Ok(region_id) => {
info!("leader apply to last log"; "error" => region_id);
debug!("leader apply to last log"; "region_id" => region_id);
}
Err(e) => {
error!("leader failed to apply to last log"; "error" => ?e);
}
}
REGION_EVENT_COUNTER.finish_wait_leader_apply.inc();
}
}
CURRENT_WAIT_APPLY_LEADER.set(0);

info!(
"all region leader apply to last log";
"spent_time" => now.elapsed().as_secs(),
"spent_time" => now.elapsed().as_secs(), "count" => %leaders.len(),
);

let mut resp = RecoverRegionResponse::default();
Expand All @@ -337,6 +351,9 @@ impl<ER: RaftEngine> RecoverData for RecoveryService<ER> {
info!("wait_apply start");
let task = async move {
let now = Instant::now();
// FIXME: this function will exit once the first region finished apply.
// BUT for the flashback resolve KV implementation, that is fine because the
// raft log stats is consistent.
let (tx, rx) = sync_channel(1);
RecoveryService::wait_apply_last(router, tx.clone());
match rx.recv() {
Expand Down

0 comments on commit 456c857

Please sign in to comment.