Skip to content

Commit

Permalink
txn-file,storage: Add low priority scheduler for txn file commands (t…
Browse files Browse the repository at this point in the history
…ikv#1859)

* add low priority scheduler

Signed-off-by: Ping Yu <yuping@pingcap.cn>

* make low priority pool optional

Signed-off-by: Ping Yu <yuping@pingcap.cn>

---------

Signed-off-by: Ping Yu <yuping@pingcap.cn>
  • Loading branch information
pingyu authored Sep 19, 2024
1 parent fdc680f commit af2b8ee
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 5 deletions.
7 changes: 7 additions & 0 deletions src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct Config {
pub scheduler_concurrency: usize,
pub scheduler_worker_pool_size: usize,
#[online_config(skip)]
pub scheduler_low_priority_worker_pool_size: Option<usize>,
#[online_config(skip)]
pub scheduler_pending_write_threshold: ReadableSize,
#[online_config(skip)]
// Reserve disk space to make tikv would have enough space to compact when disk is full.
Expand Down Expand Up @@ -86,6 +88,7 @@ impl Default for Config {
max_key_size: DEFAULT_MAX_KEY_SIZE,
scheduler_concurrency: DEFAULT_SCHED_CONCURRENCY,
scheduler_worker_pool_size: (cpu_num / 2.).clamp(1., 8.) as usize,
scheduler_low_priority_worker_pool_size: None,
scheduler_pending_write_threshold: ReadableSize::mb(DEFAULT_SCHED_PENDING_WRITE_MB),
reserve_space: ReadableSize::gb(DEFAULT_RESERVED_SPACE_GB),
reserve_raft_space: ReadableSize::gb(DEFAULT_RESERVED_RAFT_SPACE_GB),
Expand Down Expand Up @@ -133,6 +136,10 @@ impl Config {
).into()
);
}
if self.scheduler_low_priority_worker_pool_size.is_none() {
let pool_size = std::cmp::max(1, self.scheduler_worker_pool_size / 4);
self.scheduler_low_priority_worker_pool_size = Some(pool_size);
}
self.flow_control.validate()?;
self.io_rate_limit.validate()?;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,9 @@ impl Command {
if self.command_ext().is_sys_cmd() {
return CommandPri::High;
}
if let Command::TxnFile(txn_file) = self {
return txn_file.priority();
}
self.command_ext().get_ctx().get_priority()
}

Expand Down
11 changes: 10 additions & 1 deletion src/storage/txn/commands/txn_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kvengine::{
Iterator, SnapAccess, UserMeta, LOCK_CF, WRITE_CF,
};
use kvenginepb::TxnFileRef;
use kvproto::kvrpcpb::WriteConflictReason;
use kvproto::kvrpcpb::{CommandPri, WriteConflictReason};
use log_wrappers::Value as LogValue;
use protobuf::Message;
use tikv_kv::{Snapshot, WriteData};
Expand Down Expand Up @@ -65,6 +65,15 @@ impl TxnFileCommand {
})
}

pub fn priority(&self) -> CommandPri {
// Force to low priority for prewrite as it's of high throughput but latency
// insensitive.
match self.inner_cmd.as_ref().unwrap() {
box Command::Prewrite(_) => CommandPri::Low,
cmd => cmd.priority(),
}
}

fn build_prewrite_txn_file_ref(req: &Prewrite, snap: &SnapAccess) -> TxnFileRef {
let mut lock = txn_types::Lock::new(
LockType::Put,
Expand Down
30 changes: 26 additions & 4 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ struct SchedulerInner<L: LockManager> {
// high priority commands and system commands will be delivered to this pool
high_priority_pool: SchedPool,

// low priority pool for high throughput but latency insensitive workloads.
// TODO: Remove `Option`.
low_priority_pool: Option<SchedPool>,

// used to control write flow
running_write_bytes: CachePadded<AtomicUsize>,

Expand Down Expand Up @@ -453,6 +457,19 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {

let lock_wait_queues = LockWaitQueues::new(lock_mgr.clone());

let low_priority_pool_size = config
.scheduler_low_priority_worker_pool_size
.unwrap_or_default();
let low_priority_pool = (low_priority_pool_size > 0).then(|| {
SchedPool::new(
engine.clone(),
low_priority_pool_size,
reporter.clone(),
feature_gate.clone(),
"sched-low-pri-pool",
)
});

let inner = Arc::new(SchedulerInner {
task_slots,
id_alloc: AtomicU64::new(0).into(),
Expand All @@ -474,6 +491,7 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
feature_gate.clone(),
"sched-high-pri-pool",
),
low_priority_pool,
lock_mgr,
concurrency_manager,
pipelined_pessimistic_lock: dynamic_configs.pipelined_pessimistic_lock,
Expand Down Expand Up @@ -691,10 +709,14 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {

// pub for test
pub fn get_sched_pool(&self, priority: CommandPri) -> &SchedPool {
if priority == CommandPri::High {
&self.inner.high_priority_pool
} else {
&self.inner.worker_pool
match priority {
CommandPri::Normal => &self.inner.worker_pool,
CommandPri::High => &self.inner.high_priority_pool,
CommandPri::Low => self
.inner
.low_priority_pool
.as_ref()
.unwrap_or(&self.inner.worker_pool),
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ fn test_serde_custom_tikv_config() {
max_key_size: 4096,
scheduler_concurrency: 123,
scheduler_worker_pool_size: 1,
scheduler_low_priority_worker_pool_size: None,
scheduler_pending_write_threshold: ReadableSize::kb(123),
reserve_space: ReadableSize::gb(10),
reserve_raft_space: ReadableSize::gb(2),
Expand Down
3 changes: 3 additions & 0 deletions tests/random/test_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use test_pd_client::{PdClientExt, PdWrapper};
use tikv_util::{
config::{ReadableDuration, ReadableSize},
info,
sys::SysQuota,
time::Instant,
warn,
};
Expand Down Expand Up @@ -317,6 +318,7 @@ fn prepare_cluster(
let mut rng = rand::thread_rng();
let nodes = alloc_node_id_vec(nodes_count);
let dfs_config = Arc::new(dfs_config.clone());
let cpu_cores = SysQuota::cpu_cores_quota() as usize;

let big_region_size_keyspaces = (0..initial_keyspace_count as u32)
.choose_multiple(&mut rng, BIG_REGION_SIZE_KEYSPACE_COUNT);
Expand Down Expand Up @@ -355,6 +357,7 @@ fn prepare_cluster(
conf.kvengine.flush_split_l0 = true;
conf.kvengine.per_keyspace_configs = per_keyspace_configs.clone();
conf.storage.flow_control.enable = true;
conf.storage.scheduler_worker_pool_size = cpu_cores;
};
let pd_wrapper = PdWrapper::new_test(1, security_conf, None);
let mut cluster = ServerCluster::new_opt(nodes, update_conf_fn, pd_wrapper);
Expand Down
3 changes: 3 additions & 0 deletions tests/random/test_tidb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tikv::config::TikvConfig;
use tikv_util::{
config::{ReadableDuration, ReadableSize},
info,
sys::SysQuota,
time::Instant,
};

Expand Down Expand Up @@ -367,6 +368,7 @@ fn prepare_cluster(
let mut rng = rand::thread_rng();
let nodes = alloc_node_id_vec(nodes_count);
let dfs_config = Arc::new(dfs_config.clone());
let cpu_cores = SysQuota::cpu_cores_quota() as usize;
let update_conf_fn = move |node_id: u16, conf: &mut TikvConfig| {
conf.dfs = (*dfs_config).clone();
conf.coprocessor.region_split_size = REGION_SIZE;
Expand All @@ -387,6 +389,7 @@ fn prepare_cluster(
conf.kvengine.compaction_tombs_count = 100;
conf.kvengine.max_del_range_delay = ReadableDuration(Duration::from_secs(3));
conf.storage.flow_control.enable = true;
conf.storage.scheduler_worker_pool_size = cpu_cores;

if use_remote_cop {
let cop_worker_url = tikv_worker_cop_url(node_id % TIKV_WORKERS_COUNT as u16);
Expand Down

0 comments on commit af2b8ee

Please sign in to comment.