Skip to content

Commit

Permalink
ctl: add get_region_read_progress (release-6.5) (tikv#15204)
Browse files Browse the repository at this point in the history
ref tikv#15082

Signed-off-by: ekexium <eke@fastmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
ekexium and ti-chi-bot[bot] authored Jul 31, 2023
1 parent af2117e commit 9bb7bd6
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions cmd/tikv-ctl/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,24 @@ pub enum Cmd {
/// hex end key
end: String,
},
/// Get diagnosis info about resolved-ts and safe-ts
GetRegionReadProgress {
#[structopt(short = "r", long)]
/// The target region id
region: u64,

#[structopt(long)]
/// When specified, prints the locks associated with the transaction
/// that has the smallest 'start_ts' in the resolver, which is
/// preventing the 'resolved_ts' from advancing.
log: bool,

#[structopt(long, requires = "log")]
/// The smallest start_ts of the target transaction. Namely, only the
/// transaction whose start_ts is greater than or equal to this value
/// can be recorded in TiKV logs.
min_start_ts: Option<u64>,
},
}

#[derive(StructOpt)]
Expand Down
79 changes: 79 additions & 0 deletions cmd/tikv-ctl/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ pub trait DebugExecutor {
_start_ts: u64,
_commit_ts: u64,
) -> Result<(), KeyRange>;

fn get_region_read_progress(&self, region_id: u64, log: bool, min_start_ts: u64);
}

impl DebugExecutor for DebugClient {
Expand Down Expand Up @@ -912,6 +914,78 @@ impl DebugExecutor for DebugClient {
}
}
}

fn get_region_read_progress(&self, region_id: u64, log: bool, min_start_ts: u64) {
let mut req = GetRegionReadProgressRequest::default();
req.set_region_id(region_id);
req.set_log_locks(log);
req.set_min_start_ts(min_start_ts);
let opt = grpcio::CallOption::default().timeout(Duration::from_secs(10));
let resp = self
.get_region_read_progress_opt(&req, opt)
.unwrap_or_else(|e| perror_and_exit("DebugClient::get_region_read_progress", e));
if !resp.get_error().is_empty() {
println!("error: {}", resp.get_error());
}
let fields = [
("Region read progress:", "".to_owned()),
("exist", resp.get_region_read_progress_exist().to_string()),
("safe_ts", resp.get_safe_ts().to_string()),
("applied_index", resp.get_applied_index().to_string()),
("read_state.ts", resp.get_read_state_ts().to_string()),
(
"read_state.apply_index",
resp.get_read_state_apply_index().to_string(),
),
(
"pending front item (oldest) ts",
resp.get_pending_front_ts().to_string(),
),
(
"pending front item (oldest) applied index",
resp.get_pending_front_applied_index().to_string(),
),
(
"pending back item (latest) ts",
resp.get_pending_back_ts().to_string(),
),
(
"pending back item (latest) applied index",
resp.get_pending_back_applied_index().to_string(),
),
("paused", resp.get_region_read_progress_paused().to_string()),
("discarding", resp.get_discard().to_string()),
// TODO: figure out the performance impact here before implementing it.
// (
// "duration to last update_safe_ts",
// format!("{} ms", resp.get_duration_to_last_update_safe_ts_ms()),
// ),
// (
// "duration to last consume_leader_info",
// format!("{} ms", resp.get_duration_to_last_consume_leader_ms()),
// ),
("Resolver:", "".to_owned()),
("exist", resp.get_resolver_exist().to_string()),
("resolved_ts", resp.get_resolved_ts().to_string()),
(
"tracked index",
resp.get_resolver_tracked_index().to_string(),
),
("number of locks", resp.get_num_locks().to_string()),
(
"number of transactions",
resp.get_num_transactions().to_string(),
),
("stopped", resp.get_resolver_stopped().to_string()),
];
for (name, value) in &fields {
if value.is_empty() {
println!("{}", name);
} else {
println!(" {}: {}, ", name, value);
}
}
}
}

impl<ER, E, L, K> DebugExecutor for Debugger<ER, E, L, K>
Expand Down Expand Up @@ -1163,6 +1237,11 @@ where
) -> Result<(), KeyRange> {
unimplemented!("only available for remote mode");
}

fn get_region_read_progress(&self, _region_id: u64, _log: bool, _min_start_ts: u64) {
println!("only available for remote mode");
tikv_util::logger::exit_process_gracefully(-1);
}
}

fn handle_engine_error(err: EngineError) -> ! {
Expand Down
21 changes: 16 additions & 5 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
#[macro_use]
extern crate log;

mod cmd;
mod executor;
mod fork_readonly_tikv;
mod util;

use std::{
borrow::ToOwned,
fs::{self, File, OpenOptions},
Expand Down Expand Up @@ -58,6 +53,11 @@ use txn_types::Key;

use crate::{cmd::*, executor::*, util::*};

mod cmd;
mod executor;
mod fork_readonly_tikv;
mod util;

fn main() {
let opt = Opt::from_args();

Expand Down Expand Up @@ -639,6 +639,17 @@ fn main() {
debug_executor.dump_cluster_info();
}
Cmd::ResetToVersion { version } => debug_executor.reset_to_version(version),
Cmd::GetRegionReadProgress {
region,
log,
min_start_ts,
} => {
debug_executor.get_region_read_progress(
region,
log,
min_start_ts.unwrap_or_default(),
);
}
_ => {
unreachable!()
}
Expand Down
20 changes: 20 additions & 0 deletions components/raftstore/src/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,26 @@ impl RegionReadProgressCore {
pub fn get_local_leader_info(&self) -> &LocalLeaderInfo {
&self.leader_info
}

pub fn applied_index(&self) -> u64 {
self.applied_index
}

pub fn paused(&self) -> bool {
self.pause
}

pub fn pending_items(&self) -> &VecDeque<ReadState> {
&self.pending_items
}

pub fn read_state(&self) -> &ReadState {
&self.read_state
}

pub fn discarding(&self) -> bool {
self.discard
}
}

/// Represent the duration of all stages of raftstore recorded by one
Expand Down
40 changes: 40 additions & 0 deletions components/resolved_ts/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,29 @@ where
meta.store_id
})
}

fn handle_get_diagnosis_info(
&self,
region_id: u64,
log_locks: bool,
min_start_ts: u64,
callback: tikv::server::service::ResolvedTsDiagnosisCallback,
) {
if let Some(r) = self.regions.get(&region_id) {
if log_locks {
r.resolver.log_locks(min_start_ts);
}
callback(Some((
r.resolver.stopped(),
r.resolver.resolved_ts().into_inner(),
r.resolver.tracked_index(),
r.resolver.num_locks(),
r.resolver.num_transactions(),
)));
} else {
callback(None);
}
}
}

pub enum Task {
Expand Down Expand Up @@ -634,6 +657,12 @@ pub enum Task {
ChangeConfig {
change: ConfigChange,
},
GetDiagnosisInfo {
region_id: u64,
log_locks: bool,
min_start_ts: u64,
callback: tikv::server::service::ResolvedTsDiagnosisCallback,
},
}

impl fmt::Debug for Task {
Expand Down Expand Up @@ -691,6 +720,11 @@ impl fmt::Debug for Task {
.field("name", &"change_config")
.field("change", &change)
.finish(),
Task::GetDiagnosisInfo { region_id, .. } => de
.field("name", &"get_diagnosis_info")
.field("region_id", &region_id)
.field("callback", &"callback")
.finish(),
}
}
}
Expand Down Expand Up @@ -734,6 +768,12 @@ where
apply_index,
} => self.handle_scan_locks(region_id, observe_id, entries, apply_index),
Task::ChangeConfig { change } => self.handle_change_config(change),
Task::GetDiagnosisInfo {
region_id,
log_locks,
min_start_ts,
callback,
} => self.handle_get_diagnosis_info(region_id, log_locks, min_start_ts, callback),
}
}
}
Expand Down
39 changes: 39 additions & 0 deletions components/resolved_ts/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use txn_types::TimeStamp;

use crate::metrics::RTS_RESOLVED_FAIL_ADVANCE_VEC;

const MAX_NUMBER_OF_LOCKS_IN_LOG: usize = 10;

// Resolver resolves timestamps that guarantee no more commit will happen before
// the timestamp.
pub struct Resolver {
Expand Down Expand Up @@ -74,6 +76,14 @@ impl Resolver {
self.resolved_ts
}

pub fn tracked_index(&self) -> u64 {
self.tracked_index
}

pub fn stopped(&self) -> bool {
self.stopped
}

pub fn size(&self) -> usize {
self.locks_by_key.keys().map(|k| k.len()).sum::<usize>()
+ self
Expand Down Expand Up @@ -190,6 +200,35 @@ impl Resolver {

self.resolved_ts
}

pub(crate) fn log_locks(&self, min_start_ts: u64) {
// log lock with the minimum start_ts >= min_start_ts
if let Some((start_ts, keys)) = self
.lock_ts_heap
.range(TimeStamp::new(min_start_ts)..)
.next()
{
let keys_for_log = keys
.iter()
.map(|key| log_wrappers::Value::key(key))
.take(MAX_NUMBER_OF_LOCKS_IN_LOG)
.collect::<Vec<_>>();
info!(
"locks with the minimum start_ts in resolver";
"region_id" => self.region_id,
"start_ts" => start_ts,
"sampled keys" => ?keys_for_log,
);
}
}

pub(crate) fn num_locks(&self) -> u64 {
self.locks_by_key.len() as u64
}

pub(crate) fn num_transactions(&self) -> u64 {
self.lock_ts_heap.len() as u64
}
}

#[cfg(test)]
Expand Down
21 changes: 21 additions & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use raftstore::{
},
RaftRouterCompactedEventSender,
};
use resolved_ts::Task;
use security::SecurityManager;
use service::{service_event::ServiceEvent, service_manager::GrpcServiceManager};
use snap_recovery::RecoveryService;
Expand Down Expand Up @@ -271,6 +272,7 @@ struct TikvServer<ER: RaftEngine, F: KvFormat> {
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
tablet_factory: Option<Arc<dyn TabletFactory<RocksEngine> + Send + Sync>>,
br_snap_recovery_mode: bool, // use for br snapshot recovery
resolved_ts_scheduler: Option<Scheduler<Task>>,
grpc_service_mgr: GrpcServiceManager,
}

Expand Down Expand Up @@ -426,6 +428,7 @@ where
causal_ts_provider,
tablet_factory: None,
br_snap_recovery_mode: is_recovering_marked,
resolved_ts_scheduler: None,
grpc_service_mgr: GrpcServiceManager::new(tx),
}
}
Expand Down Expand Up @@ -1194,6 +1197,7 @@ where
server.env(),
self.security_mgr.clone(),
);
self.resolved_ts_scheduler = Some(rts_worker.scheduler());
rts_worker.start_with_timer(rts_endpoint);
self.to_stop.push(rts_worker);
}
Expand Down Expand Up @@ -1249,10 +1253,27 @@ where
}

// Debug service.
let resolved_ts_scheduler = Arc::new(self.resolved_ts_scheduler.clone());
let debug_service = DebugService::new(
servers.debugger.clone(),
servers.server.get_debug_thread_pool().clone(),
engines.engine.raft_extension().clone(),
self.engines.as_ref().unwrap().store_meta.clone(),
Arc::new(
move |region_id, log_locks, min_start_ts, callback| -> bool {
if let Some(s) = resolved_ts_scheduler.as_ref() {
let res = s.schedule(Task::GetDiagnosisInfo {
region_id,
log_locks,
min_start_ts,
callback,
});
res.is_ok()
} else {
false
}
},
),
);
if servers
.server
Expand Down
Loading

0 comments on commit 9bb7bd6

Please sign in to comment.