Skip to content

Commit

Permalink
metrics: Cherry pick tikv#15118 to release-6.5: add min_safe_ts, min_…
Browse files Browse the repository at this point in the history
…safe_ts_region and min_safe_ts_gap (tikv#15212)

ref tikv#15082

metrics: add min_safe_ts, min_safe_ts_region and min_safe_ts_gap

Signed-off-by: ekexium <eke@fastmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
ekexium and ti-chi-bot[bot] authored Jul 28, 2023
1 parent 6a0631c commit af2117e
Show file tree
Hide file tree
Showing 5 changed files with 3,197 additions and 2,999 deletions.
10 changes: 9 additions & 1 deletion components/resolved_ts/src/advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use txn_types::TimeStamp;

use crate::{endpoint::Task, metrics::*};

const DEFAULT_CHECK_LEADER_TIMEOUT_DURATION: Duration = Duration::from_secs(5); // 5s
pub(crate) const DEFAULT_CHECK_LEADER_TIMEOUT_DURATION: Duration = Duration::from_secs(5); // 5s
const DEFAULT_GRPC_GZIP_COMPRESSION_LEVEL: usize = 2;
const DEFAULT_GRPC_MIN_MESSAGE_SIZE_TO_COMPRESS: usize = 4096;

Expand All @@ -61,6 +61,9 @@ pub struct AdvanceTsWorker {
/// The concurrency manager for transactions. It's needed for CDC to check
/// locks when calculating resolved_ts.
concurrency_manager: ConcurrencyManager,

// cache the last pd tso, used to approximate the next timestamp w/o an actual TSO RPC
pub(crate) last_pd_tso: Arc<std::sync::Mutex<Option<(TimeStamp, Instant)>>>,
}

impl AdvanceTsWorker {
Expand All @@ -85,6 +88,7 @@ impl AdvanceTsWorker {
advance_ts_interval,
timer: SteadyTimer::default(),
concurrency_manager,
last_pd_tso: Arc::new(std::sync::Mutex::new(None)),
}
}
}
Expand All @@ -107,9 +111,13 @@ impl AdvanceTsWorker {
self.advance_ts_interval,
));

let last_pd_tso = self.last_pd_tso.clone();
let fut = async move {
// Ignore get tso errors since we will retry every `advdance_ts_interval`.
let mut min_ts = pd_client.get_tso().await.unwrap_or_default();
if let Ok(mut last_pd_tso) = last_pd_tso.try_lock() {
*last_pd_tso = Some((min_ts, Instant::now()));
}

// Sync with concurrency manager so that it can work correctly when
// optimizations like async commit is enabled.
Expand Down
71 changes: 63 additions & 8 deletions components/resolved_ts/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ use tokio::sync::Notify;
use txn_types::{Key, TimeStamp};

use crate::{
advance::{AdvanceTsWorker, LeadershipResolver},
advance::{AdvanceTsWorker, LeadershipResolver, DEFAULT_CHECK_LEADER_TIMEOUT_DURATION},
cmd::{ChangeLog, ChangeRow},
metrics::*,
resolver::Resolver,
scanner::{ScanEntry, ScanMode, ScanTask, ScannerPool},
};

/// grace period for logging safe-ts and resolved-ts gap in slow log
const SLOW_LOG_GRACE_PERIOD_MS: u64 = 1000;

enum ResolverStatus {
Pending {
tracked_index: u64,
Expand Down Expand Up @@ -763,9 +766,18 @@ where
let store_id = self.get_or_init_store_id();
let (mut oldest_ts, mut oldest_region, mut zero_ts_count) = (u64::MAX, 0, 0);
let (mut oldest_leader_ts, mut oldest_leader_region) = (u64::MAX, 0);
let (mut oldest_safe_ts, mut oldest_safe_ts_region) = (u64::MAX, 0);
self.region_read_progress.with(|registry| {
for (region_id, read_progress) in registry {
let safe_ts = read_progress.safe_ts();
if safe_ts > 0 && safe_ts < oldest_safe_ts {
oldest_safe_ts = safe_ts;
oldest_safe_ts_region = *region_id;
}

let (leader_info, leader_store_id) = read_progress.dump_leader_info();
// this is maximum resolved-ts pushed to region_read_progress, namely candidates
// of safe_ts. It may not be the safe_ts yet
let ts = leader_info.get_read_state().get_safe_ts();
if ts == 0 {
zero_ts_count += 1;
Expand Down Expand Up @@ -803,19 +815,62 @@ where
}
}
}
// approximate a TSO from PD. It is better than local timestamp when clock skew
// exists.
let now: u64 = self
.advance_worker
.last_pd_tso
.try_lock()
.map(|opt| {
opt.map(|(pd_ts, instant)| {
pd_ts.physical() + instant.saturating_elapsed().as_millis() as u64
})
.unwrap_or_else(|| TimeStamp::physical_now())
})
.unwrap_or_else(|_| TimeStamp::physical_now());

RTS_MIN_SAFE_TS.set(oldest_safe_ts as i64);
RTS_MIN_SAFE_TS_REGION.set(oldest_safe_ts_region as i64);
let safe_ts_gap = now.saturating_sub(TimeStamp::from(oldest_safe_ts).physical());
if safe_ts_gap
> self.cfg.advance_ts_interval.as_millis()
+ DEFAULT_CHECK_LEADER_TIMEOUT_DURATION.as_millis() as u64
+ SLOW_LOG_GRACE_PERIOD_MS
{
let mut lock_num = None;
let mut min_start_ts = None;
if let Some(ob) = self.regions.get(&oldest_safe_ts_region) {
min_start_ts = ob
.resolver
.locks()
.keys()
.next()
.cloned()
.map(TimeStamp::into_inner);
lock_num = Some(ob.resolver.locks_by_key.len());
}
info!(
"the max gap of safe-ts is large";
"gap" => safe_ts_gap,
"oldest safe-ts" => ?oldest_safe_ts,
"region id" => oldest_safe_ts_region,
"advance-ts-interval" => ?self.cfg.advance_ts_interval,
"lock num" => lock_num,
"min start ts" => min_start_ts,
);
}
RTS_MIN_SAFE_TS_GAP.set(safe_ts_gap as i64);

RTS_MIN_RESOLVED_TS_REGION.set(oldest_region as i64);
RTS_MIN_RESOLVED_TS.set(oldest_ts as i64);
RTS_ZERO_RESOLVED_TS.set(zero_ts_count as i64);
RTS_MIN_RESOLVED_TS_GAP.set(
TimeStamp::physical_now().saturating_sub(TimeStamp::from(oldest_ts).physical()) as i64,
);
RTS_MIN_RESOLVED_TS_GAP
.set(now.saturating_sub(TimeStamp::from(oldest_ts).physical()) as i64);

RTS_MIN_LEADER_RESOLVED_TS_REGION.set(oldest_leader_region as i64);
RTS_MIN_LEADER_RESOLVED_TS.set(oldest_leader_ts as i64);
RTS_MIN_LEADER_RESOLVED_TS_GAP.set(
TimeStamp::physical_now().saturating_sub(TimeStamp::from(oldest_leader_ts).physical())
as i64,
);
RTS_MIN_LEADER_RESOLVED_TS_GAP
.set(now.saturating_sub(TimeStamp::from(oldest_leader_ts).physical()) as i64);

RTS_LOCK_HEAP_BYTES_GAUGE.set(lock_heap_size as i64);
RTS_REGION_RESOLVE_STATUS_GAUGE_VEC
Expand Down
25 changes: 20 additions & 5 deletions components/resolved_ts/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ lazy_static! {
.unwrap();
pub static ref RTS_MIN_RESOLVED_TS_GAP: IntGauge = register_int_gauge!(
"tikv_resolved_ts_min_resolved_ts_gap_millis",
"The minimal (non-zero) resolved ts gap for observe regions"
"The minimal (non-zero) resolved ts gap for observed regions"
)
.unwrap();
pub static ref RTS_RESOLVED_FAIL_ADVANCE_VEC: IntCounterVec = register_int_counter_vec!(
Expand Down Expand Up @@ -66,22 +66,37 @@ lazy_static! {
.unwrap();
pub static ref RTS_MIN_RESOLVED_TS: IntGauge = register_int_gauge!(
"tikv_resolved_ts_min_resolved_ts",
"The minimal (non-zero) resolved ts for observe regions"
"The minimal (non-zero) resolved ts for observed regions"
)
.unwrap();
pub static ref RTS_MIN_SAFE_TS_REGION: IntGauge = register_int_gauge!(
"tikv_resolved_ts_min_safe_ts_region",
"The region which has minimal safe ts"
)
.unwrap();
pub static ref RTS_MIN_SAFE_TS: IntGauge = register_int_gauge!(
"tikv_resolved_ts_min_safe_ts",
"The minimal (non-zero) safe ts for observed regions"
)
.unwrap();
pub static ref RTS_MIN_SAFE_TS_GAP: IntGauge = register_int_gauge!(
"tikv_resolved_ts_min_safe_ts_gap_millis",
"The minimal (non-zero) safe ts gap for observed regions"
)
.unwrap();
pub static ref RTS_ZERO_RESOLVED_TS: IntGauge = register_int_gauge!(
"tikv_resolved_ts_zero_resolved_ts",
"The number of zero resolved ts for observe regions"
"The number of zero resolved ts for observed regions"
)
.unwrap();
pub static ref RTS_LOCK_HEAP_BYTES_GAUGE: IntGauge = register_int_gauge!(
"tikv_resolved_ts_lock_heap_bytes",
"Total bytes in memory of resolved-ts observe regions's lock heap"
"Total bytes in memory of resolved-ts observed regions's lock heap"
)
.unwrap();
pub static ref RTS_REGION_RESOLVE_STATUS_GAUGE_VEC: IntGaugeVec = register_int_gauge_vec!(
"tikv_resolved_ts_region_resolve_status",
"The status of resolved-ts observe regions",
"The status of resolved-ts observed regions",
&["type"]
)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion components/resolved_ts/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::metrics::RTS_RESOLVED_FAIL_ADVANCE_VEC;
pub struct Resolver {
region_id: u64,
// key -> start_ts
locks_by_key: HashMap<Arc<[u8]>, TimeStamp>,
pub(crate) locks_by_key: HashMap<Arc<[u8]>, TimeStamp>,
// start_ts -> locked keys.
lock_ts_heap: BTreeMap<TimeStamp, HashSet<Arc<[u8]>>>,
// The timestamps that guarantees no more commit will happen before.
Expand Down
Loading

0 comments on commit af2117e

Please sign in to comment.