Skip to content

Commit

Permalink
Foreground Quota Limiter: limit the cpu, bandwidth to provide stable …
Browse files Browse the repository at this point in the history
…QPS (tikv#12151)

close tikv#11855, ref tikv#11856

Add a global forefront quota limiter based speed limiter to record the CPU resource overhead occupied by various requests. When the limiter reaches the limit quota, the request will be forced to sleep for a period of time to compensate. Statistics are only done on the part of process and are therefore approximate.

Signed-off-by: tonyxuqqi <tonyxuqi@outlook.com>
Signed-off-by: qi.xu <tonxuqi@outlook.com>
Signed-off-by: zhangjinpeng1987 <zhangjinpeng@pingcap.com>
Signed-off-by: Xintao <hunterlxt@live.com>

Co-authored-by: tonyxuqqi <tonyxuqi@outlook.com>
Co-authored-by: qi.xu <tonxuqi@outlook.com>
Co-authored-by: zhangjinpeng1987 <zhangjinpeng@pingcap.com>
  • Loading branch information
4 people authored Mar 17, 2022
1 parent ea61ca2 commit db9420f
Show file tree
Hide file tree
Showing 26 changed files with 1,021 additions and 251 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use tikv_util::{
check_environment_variables,
config::{ensure_dir_exist, RaftDataStateMachine, VersionTrack},
math::MovingAvgU32,
quota_limiter::QuotaLimiter,
sys::{disk, register_memory_usage_high_water, SysQuota},
thread_group::GroupProperties,
time::{Instant, Monitor},
Expand Down Expand Up @@ -189,6 +190,7 @@ struct TiKVServer<ER: RaftEngine> {
concurrency_manager: ConcurrencyManager,
env: Arc<Environment>,
background_worker: Worker,
quota_limiter: Arc<QuotaLimiter>,
}

struct TiKVEngines<EK: KvEngine, ER: RaftEngine> {
Expand Down Expand Up @@ -256,6 +258,12 @@ impl<ER: RaftEngine> TiKVServer<ER> {
let latest_ts = block_on(pd_client.get_tso()).expect("failed to get timestamp from PD");
let concurrency_manager = ConcurrencyManager::new(latest_ts);

let quota_limiter = Arc::new(QuotaLimiter::new(
config.quota.foreground_cpu_time,
config.quota.foreground_write_bandwidth,
config.quota.foreground_read_bandwidth,
));

TiKVServer {
config,
cfg_controller: Some(cfg_controller),
Expand All @@ -279,6 +287,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
background_worker,
flow_info_sender: None,
flow_info_receiver: None,
quota_limiter,
}
}

Expand Down Expand Up @@ -666,6 +675,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
flow_controller.clone(),
pd_sender.clone(),
resource_tag_factory.clone(),
Arc::clone(&self.quota_limiter),
self.pd_client.feature_gate().clone(),
)
.unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e));
Expand Down Expand Up @@ -776,6 +786,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
self.concurrency_manager.clone(),
engine_rocks::raw_util::to_raw_perf_level(self.config.coprocessor.perf_level),
resource_tag_factory,
Arc::clone(&self.quota_limiter),
),
coprocessor_v2::Endpoint::new(&self.config.coprocessor_v2),
self.router.clone(),
Expand Down
4 changes: 4 additions & 0 deletions components/test_coprocessor/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use super::*;

use std::sync::Arc;

use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::{ApiVersion, Context};

Expand All @@ -15,6 +17,7 @@ use tikv::server::Config;
use tikv::storage::kv::RocksEngine;
use tikv::storage::lock_manager::DummyLockManager;
use tikv::storage::{Engine, TestEngineBuilder, TestStorageBuilder};
use tikv_util::quota_limiter::QuotaLimiter;
use tikv_util::thread_group::GroupProperties;

#[derive(Clone)]
Expand Down Expand Up @@ -107,6 +110,7 @@ pub fn init_data_with_details<E: Engine>(
cm,
PerfLevel::EnableCount,
ResourceTagFactory::new_for_test(),
Arc::new(QuotaLimiter::default()),
);
(store, copr)
}
Expand Down
8 changes: 8 additions & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use tikv::storage::txn::flow_controller::FlowController;
use tikv::storage::{self, Engine};
use tikv::{config::ConfigController, server::raftkv::ReplicaReadLockChecker};
use tikv_util::config::VersionTrack;
use tikv_util::quota_limiter::QuotaLimiter;
use tikv_util::time::ThreadReadId;
use tikv_util::worker::{Builder as WorkerBuilder, LazyWorker};
use tikv_util::HandyRwLock;
Expand Down Expand Up @@ -343,6 +344,11 @@ impl Simulator for ServerCluster {
let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner);

let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn);
let quota_limiter = Arc::new(QuotaLimiter::new(
cfg.quota.foreground_cpu_time,
cfg.quota.foreground_write_bandwidth,
cfg.quota.foreground_read_bandwidth,
));
let store = create_raft_storage(
engine,
&cfg.storage,
Expand All @@ -353,6 +359,7 @@ impl Simulator for ServerCluster {
Arc::new(FlowController::empty()),
pd_sender,
res_tag_factory.clone(),
quota_limiter.clone(),
self.pd_client.feature_gate().clone(),
)?;
self.storages.insert(node_id, raft_engine);
Expand Down Expand Up @@ -403,6 +410,7 @@ impl Simulator for ServerCluster {
concurrency_manager.clone(),
PerfLevel::EnableCount,
res_tag_factory,
quota_limiter,
);
let copr_v2 = coprocessor_v2::Endpoint::new(&cfg.coprocessor_v2);
let mut server = None;
Expand Down
1 change: 1 addition & 0 deletions components/tidb_query_executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ collections = { path = "../collections" }
tipb = { git = "https://github.com/pingcap/tipb.git" }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }
itertools = "0.10"
futures = { version = "0.3", features = ["compat"] }

[dev-dependencies]
anyhow = "1.0"
Expand Down
33 changes: 25 additions & 8 deletions components/tidb_query_executors/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ use yatp::task::future::reschedule;

use super::interface::{BatchExecutor, ExecuteStats};
use super::*;

use tidb_query_common::execute_stats::ExecSummary;
use tidb_query_common::metrics::*;
use tidb_query_common::storage::{IntervalRange, Storage};
use tidb_query_common::Result;
use tidb_query_datatype::expr::{EvalConfig, EvalContext, EvalWarnings};
use tikv_util::metrics::{ThrottleType, NON_TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC_STATIC};
use tikv_util::quota_limiter::QuotaLimiter;

// TODO: The value is chosen according to some very subjective experience, which is not tuned
// carefully. We need to benchmark to find a best value. Also we may consider accepting this value
Expand Down Expand Up @@ -69,6 +70,8 @@ pub struct BatchExecutorsRunner<SS> {

/// If it's a paging request, paging_size indicates to the required size for current page.
paging_size: Option<u64>,

quota_limiter: Arc<QuotaLimiter>,
}

// We assign a dummy type `()` so that we can omit the type when calling `check_supported`.
Expand Down Expand Up @@ -363,6 +366,7 @@ impl<SS: 'static> BatchExecutorsRunner<SS> {
stream_row_limit: usize,
is_streaming: bool,
paging_size: Option<u64>,
quota_limiter: Arc<QuotaLimiter>,
) -> Result<Self> {
let executors_len = req.get_executors().len();
let collect_exec_summary = req.get_collect_execution_summaries();
Expand Down Expand Up @@ -407,6 +411,7 @@ impl<SS: 'static> BatchExecutorsRunner<SS> {
stream_row_limit,
encode_type,
paging_size,
quota_limiter,
})
}

Expand Down Expand Up @@ -438,13 +443,24 @@ impl<SS: 'static> BatchExecutorsRunner<SS> {

let mut chunk = Chunk::default();

let (drained, record_len) = self.internal_handle_request(
false,
batch_size,
&mut chunk,
&mut warnings,
&mut ctx,
)?;
let mut sample = self.quota_limiter.new_sample();
let (drained, record_len) = {
let _guard = sample.observe_cpu();
self.internal_handle_request(
false,
batch_size,
&mut chunk,
&mut warnings,
&mut ctx,
)?
};

let quota_delay = self.quota_limiter.async_consume(sample).await;
if !quota_delay.is_zero() {
NON_TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC_STATIC
.get(ThrottleType::dag)
.inc_by(quota_delay.as_micros() as u64);
}

if record_len > 0 {
chunks.push(chunk);
Expand Down Expand Up @@ -511,6 +527,7 @@ impl<SS: 'static> BatchExecutorsRunner<SS> {
// record count less than batch size and is not drained
while record_len < self.stream_row_limit && !is_drained {
let mut current_chunk = Chunk::default();
// TODO: Streaming coprocessor on TiKV is just not enabled in TiDB now.
let (drained, len) = self.internal_handle_request(
true,
batch_size.min(self.stream_row_limit - record_len),
Expand Down
12 changes: 12 additions & 0 deletions components/tikv_kv/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ impl Statistics {
}
}

pub fn cf_statistics(&self, cf: &str) -> &CfStatistics {
if cf.is_empty() {
return &self.data;
}
match cf {
CF_DEFAULT => &self.data,
CF_LOCK => &self.lock,
CF_WRITE => &self.write,
_ => unreachable!(),
}
}

pub fn write_scan_detail(&self, detail_v2: &mut ScanDetailV2) {
detail_v2.set_processed_versions(self.write.processed_keys as u64);
detail_v2.set_total_versions(self.write.total_op_count() as u64);
Expand Down
2 changes: 2 additions & 0 deletions components/tikv_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ num_cpus = "1"
num-traits = "0.2"
openssl = "0.10"
prometheus = { version = "0.13", features = ["nightly"] }
prometheus-static-metric = "0.5"
rand = "0.8"
rusoto_core = "0.46.0"
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -54,6 +55,7 @@ tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hot
url = "2"
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
protobuf = "2"
cpu-time = "1.0.0"
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions components/tikv_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod math;
pub mod memory;
pub mod metrics;
pub mod mpsc;
pub mod quota_limiter;
pub mod stream;
pub mod sys;
pub mod thread_group;
Expand Down
25 changes: 25 additions & 0 deletions components/tikv_util/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use lazy_static::lazy_static;
use prometheus::*;
use prometheus_static_metric::*;

#[cfg(target_os = "linux")]
mod threads_linux;
Expand Down Expand Up @@ -47,13 +48,37 @@ pub fn dump() -> String {
String::from_utf8(buffer).unwrap()
}

make_auto_flush_static_metric! {
// Some non-txn related types are placed here.
// ref `TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC`.
pub label_enum ThrottleType {
dag,
analyze_full_sampling,
}

pub struct NonTxnCommandThrottleTimeCounterVec: LocalIntCounter {
"type" => ThrottleType,
}
}

lazy_static! {
pub static ref CRITICAL_ERROR: IntCounterVec = register_int_counter_vec!(
"tikv_critical_error_total",
"Counter of critical error.",
&["type"]
)
.unwrap();
pub static ref NON_TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC: IntCounterVec =
register_int_counter_vec!(
"tikv_non_txn_command_throttle_time_total",
"Total throttle time (microsecond) of non txn processing.",
&["type"]
)
.unwrap();
pub static ref NON_TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC_STATIC: NonTxnCommandThrottleTimeCounterVec = auto_flush_from!(
NON_TXN_COMMAND_THROTTLE_TIME_COUNTER_VEC,
NonTxnCommandThrottleTimeCounterVec
);
}

pub fn convert_record_pairs(m: HashMap<String, u64>) -> RecordPairVec {
Expand Down
Loading

0 comments on commit db9420f

Please sign in to comment.