From 0c1c5d3267418ee5f9d5f64de6cbf895c1a383ed Mon Sep 17 00:00:00 2001 From: tabokie Date: Fri, 11 Nov 2022 14:55:18 +0800 Subject: [PATCH 1/2] add pd worker Signed-off-by: tabokie --- Cargo.lock | 4 + components/raftstore-v2/Cargo.toml | 4 + components/raftstore-v2/src/batch/store.rs | 28 +- components/raftstore-v2/src/lib.rs | 2 + components/raftstore-v2/src/worker/mod.rs | 5 + .../raftstore-v2/src/worker/pd/heartbeat.rs | 154 +++++++++ components/raftstore-v2/src/worker/pd/mod.rs | 197 ++++++++++++ .../raftstore-v2/src/worker/pd/split.rs | 99 ++++++ .../src/worker/pd/store_heartbeat.rs | 293 ++++++++++++++++++ .../tests/integrations/cluster.rs | 9 +- 10 files changed, 790 insertions(+), 5 deletions(-) create mode 100644 components/raftstore-v2/src/worker/mod.rs create mode 100644 components/raftstore-v2/src/worker/pd/heartbeat.rs create mode 100644 components/raftstore-v2/src/worker/pd/mod.rs create mode 100644 components/raftstore-v2/src/worker/pd/split.rs create mode 100644 components/raftstore-v2/src/worker/pd/store_heartbeat.rs diff --git a/Cargo.lock b/Cargo.lock index a1b238d0148..986a30c72d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4280,15 +4280,18 @@ dependencies = [ "error_code", "fail", "file_system", + "fs2", "futures 0.3.15", "keys", "kvproto", "log_wrappers", "pd_client", + "prometheus", "protobuf", "raft", "raft-proto", "raftstore", + "resource_metering", "slog", "slog-global", "smallvec", @@ -4299,6 +4302,7 @@ dependencies = [ "time", "tracker", "txn_types", + "yatp", ] [[package]] diff --git a/components/raftstore-v2/Cargo.toml b/components/raftstore-v2/Cargo.toml index 8bb91b40bb9..aa92a4f9f83 100644 --- a/components/raftstore-v2/Cargo.toml +++ b/components/raftstore-v2/Cargo.toml @@ -36,21 +36,25 @@ engine_traits = { workspace = true } error_code = { workspace = true } fail = "0.5" file_system = { workspace = true } +fs2 = "0.4" futures = { version = "0.3", features = ["compat"] } keys = { workspace = true } kvproto = { git = "https://github.com/pingcap/kvproto.git" } log_wrappers = { workspace = true } pd_client = { workspace = true } +prometheus = { version = "0.13", features = ["nightly"] } protobuf = { version = "2.8", features = ["bytes"] } raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] } raft-proto = { version = "0.7.0" } raftstore = { workspace = true } +resource_metering = { workspace = true } slog = "2.3" smallvec = "1.4" tikv_util = { workspace = true } time = "0.1" tracker = { workspace = true } txn_types = { workspace = true } +yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } [dev-dependencies] engine_test = { workspace = true } diff --git a/components/raftstore-v2/src/batch/store.rs b/components/raftstore-v2/src/batch/store.rs index 76d4fd16bea..25b03ff652f 100644 --- a/components/raftstore-v2/src/batch/store.rs +++ b/components/raftstore-v2/src/batch/store.rs @@ -18,6 +18,7 @@ use kvproto::{ metapb::Store, raft_serverpb::{PeerState, RaftMessage}, }; +use pd_client::PdClient; use raft::INVALID_ID; use raftstore::store::{ fsm::store::PeerTickBatch, local_metrics::RaftMetrics, Config, ReadRunner, ReadTask, @@ -42,6 +43,7 @@ use crate::{ fsm::{PeerFsm, PeerFsmDelegate, SenderFsmPair, StoreFsm, StoreFsmDelegate, StoreMeta}, raft::Storage, router::{PeerMsg, PeerTick, StoreMsg}, + worker::{PdRunner, PdTask}, Error, Result, }; @@ -216,6 +218,7 @@ struct StorePollerBuilder { trans: T, router: StoreRouter, read_scheduler: Scheduler>, + pd_scheduler: Scheduler, write_senders: WriteSenders, apply_pool: FuturePool, logger: Logger, @@ -231,6 +234,7 @@ impl StorePollerBuilder { trans: T, router: StoreRouter, read_scheduler: Scheduler>, + pd_scheduler: Scheduler, store_writers: &mut StoreWriters, logger: Logger, store_meta: Arc>>, @@ -253,6 +257,7 @@ impl StorePollerBuilder { trans, router, read_scheduler, + pd_scheduler, apply_pool, logger, write_senders: store_writers.senders(), @@ -336,6 +341,7 @@ where struct Workers { /// Worker for fetching raft logs asynchronously async_read_worker: Worker, + pd_worker: Worker, store_writers: StoreWriters, } @@ -343,6 +349,7 @@ impl Default for Workers { fn default() -> Self { Self { async_read_worker: Worker::new("async-read-worker"), + pd_worker: Worker::new("pd-worker"), store_writers: StoreWriters::default(), } } @@ -356,19 +363,26 @@ pub struct StoreSystem { } impl StoreSystem { - pub fn start( + pub fn start( &mut self, store_id: u64, cfg: Arc>, raft_engine: ER, tablet_factory: Arc>, trans: T, + pd_client: Arc, router: &StoreRouter, store_meta: Arc>>, ) -> Result<()> where T: Transport + 'static, + C: PdClient + 'static, { + let router_clone = router.clone(); + // pd_client.handle_reconnect(move || { + // router_clone.broadcast_normal(|| PeerMsg::HeartbeatPd); + // }); + let mut workers = Workers::default(); workers .store_writers @@ -377,6 +391,17 @@ impl StoreSystem { "async-read-worker", ReadRunner::new(router.clone(), raft_engine.clone()), ); + let pd_scheduler = workers.pd_worker.start( + "pd-worker", + PdRunner::new( + pd_client, + raft_engine.clone(), + tablet_factory.clone(), + router.clone(), + workers.pd_worker.remote(), + self.logger.clone(), + ), + ); let mut builder = StorePollerBuilder::new( cfg.clone(), @@ -386,6 +411,7 @@ impl StoreSystem { trans, router.clone(), read_scheduler, + pd_scheduler, &mut workers.store_writers, self.logger.clone(), store_meta.clone(), diff --git a/components/raftstore-v2/src/lib.rs b/components/raftstore-v2/src/lib.rs index 2f30ee9873d..4441b74ca5a 100644 --- a/components/raftstore-v2/src/lib.rs +++ b/components/raftstore-v2/src/lib.rs @@ -24,6 +24,7 @@ #![allow(unused)] #![feature(let_else)] #![feature(array_windows)] +#![feature(div_duration)] mod batch; mod bootstrap; @@ -32,6 +33,7 @@ mod operation; mod raft; pub mod router; mod tablet; +mod worker; pub(crate) use batch::StoreContext; pub use batch::{create_store_batch_system, StoreRouter, StoreSystem}; diff --git a/components/raftstore-v2/src/worker/mod.rs b/components/raftstore-v2/src/worker/mod.rs new file mode 100644 index 00000000000..8c164c9cf96 --- /dev/null +++ b/components/raftstore-v2/src/worker/mod.rs @@ -0,0 +1,5 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +mod pd; + +pub use pd::{Runner as PdRunner, Task as PdTask}; diff --git a/components/raftstore-v2/src/worker/pd/heartbeat.rs b/components/raftstore-v2/src/worker/pd/heartbeat.rs new file mode 100644 index 00000000000..1403cb49bcb --- /dev/null +++ b/components/raftstore-v2/src/worker/pd/heartbeat.rs @@ -0,0 +1,154 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::time::Duration; + +use engine_traits::{KvEngine, RaftEngine}; +use kvproto::{metapb, pdpb}; +use pd_client::{PdClient, RegionStat}; +use slog::debug; +use tikv_util::{store::QueryStats, time::UnixSecs}; + +use super::Runner; + +pub struct Task { + pub term: u64, + pub region: metapb::Region, + pub peer: metapb::Peer, + pub down_peers: Vec, + pub pending_peers: Vec, + pub written_bytes: u64, + pub written_keys: u64, + pub approximate_size: Option, + pub approximate_keys: Option, + pub wait_data_peers: Vec, + // TODO: RegionReplicationStatus +} + +#[derive(Default)] +pub struct PeerStat { + pub read_bytes: u64, + pub read_keys: u64, + pub query_stats: QueryStats, + // last_region_report_attributes records the state of the last region heartbeat + pub last_region_report_read_bytes: u64, + pub last_region_report_read_keys: u64, + pub last_region_report_query_stats: QueryStats, + pub last_region_report_written_bytes: u64, + pub last_region_report_written_keys: u64, + pub last_region_report_ts: UnixSecs, + // last_store_report_attributes records the state of the last store heartbeat + pub last_store_report_read_bytes: u64, + pub last_store_report_read_keys: u64, + pub last_store_report_query_stats: QueryStats, + pub approximate_keys: u64, + pub approximate_size: u64, +} + +impl Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + pub fn handle_heartbeat(&mut self, task: Task) { + // HACK! In order to keep the compatible of protos, we use 0 to identify + // the size uninitialized regions, and use 1 to identify the empty regions. + // + // See tikv/tikv#11114 for details. + let approximate_size = match task.approximate_size { + Some(0) => 1, + Some(v) => v, + None => 0, // size uninitialized + }; + let approximate_keys = task.approximate_keys.unwrap_or_default(); + let region_id = task.region.get_id(); + + let peer_stat = self + .region_peers + .entry(region_id) + .or_insert_with(PeerStat::default); + peer_stat.approximate_size = approximate_size; + peer_stat.approximate_keys = approximate_keys; + + let read_bytes_delta = peer_stat.read_bytes - peer_stat.last_region_report_read_bytes; + let read_keys_delta = peer_stat.read_keys - peer_stat.last_region_report_read_keys; + let written_bytes_delta = task.written_bytes - peer_stat.last_region_report_written_bytes; + let written_keys_delta = task.written_keys - peer_stat.last_region_report_written_keys; + let query_stats = peer_stat + .query_stats + .sub_query_stats(&peer_stat.last_region_report_query_stats); + let mut last_report_ts = peer_stat.last_region_report_ts; + if last_report_ts.is_zero() { + last_report_ts = self.start_ts; + } + peer_stat.last_region_report_written_bytes = task.written_bytes; + peer_stat.last_region_report_written_keys = task.written_keys; + peer_stat.last_region_report_read_bytes = peer_stat.read_bytes; + peer_stat.last_region_report_read_keys = peer_stat.read_keys; + peer_stat.last_region_report_query_stats = peer_stat.query_stats.clone(); + let unix_secs_now = UnixSecs::now(); + peer_stat.last_region_report_ts = unix_secs_now; + + // Calculate the CPU usage since the last region heartbeat. + let cpu_usage = { + // Take out the region CPU record. + let cpu_time_duration = Duration::from_millis( + self.region_cpu_records.remove(®ion_id).unwrap_or(0) as u64, + ); + let interval_second = unix_secs_now.into_inner() - last_report_ts.into_inner(); + // Keep consistent with the calculation of cpu_usages in a store heartbeat. + // See components/tikv_util/src/metrics/threads_linux.rs for more details. + if interval_second > 0 { + ((cpu_time_duration.as_secs_f64() * 100.0) / interval_second as f64) as u64 + } else { + 0 + } + }; + + let region_stat = RegionStat { + down_peers: task.down_peers, + pending_peers: task.pending_peers, + written_bytes: written_bytes_delta, + written_keys: written_keys_delta, + read_bytes: read_bytes_delta, + read_keys: read_keys_delta, + query_stats: query_stats.0, + approximate_size, + approximate_keys, + last_report_ts, + cpu_usage, + }; + self.store_stat + .region_bytes_written + .observe(region_stat.written_bytes as f64); + self.store_stat + .region_keys_written + .observe(region_stat.written_keys as f64); + self.store_stat + .region_bytes_read + .observe(region_stat.read_bytes as f64); + self.store_stat + .region_keys_read + .observe(region_stat.read_keys as f64); + + let resp = self.pd_client.region_heartbeat( + task.term, + task.region.clone(), + task.peer, + region_stat, + None, + ); + let logger = self.logger.clone(); + let f = async move { + if let Err(e) = resp.await { + debug!( + logger, + "failed to send heartbeat"; + "region_id" => task.region.get_id(), + "err" => ?e + ); + } + }; + self.remote.spawn(f); + } +} diff --git a/components/raftstore-v2/src/worker/pd/mod.rs b/components/raftstore-v2/src/worker/pd/mod.rs new file mode 100644 index 00000000000..85a83ccee19 --- /dev/null +++ b/components/raftstore-v2/src/worker/pd/mod.rs @@ -0,0 +1,197 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + fmt::{self, Display, Formatter}, + sync::Arc, +}; + +use collections::HashMap; +use engine_traits::{KvEngine, RaftEngine, TabletFactory}; +use kvproto::{ + metapb, pdpb, + raft_cmdpb::{AdminRequest, RaftCmdRequest}, +}; +use pd_client::PdClient; +use raftstore::store::util::KeysInfoFormatter; +use slog::{error, info, Logger}; +use tikv_util::{time::UnixSecs, worker::Runnable}; +use yatp::{task::future::TaskCell, Remote}; + +mod heartbeat; +mod split; +mod store_heartbeat; + +pub use heartbeat::Task as HeartbeatTask; + +use crate::{batch::StoreRouter, router::PeerMsg}; + +pub enum Task { + Heartbeat(HeartbeatTask), + StoreHeartbeat { + stats: pdpb::StoreStats, + // TODO: StoreReport, StoreDrAutoSyncStatus + }, + DestroyPeer { + region_id: u64, + }, + AskBatchSplit { + region: metapb::Region, + split_keys: Vec>, + peer: metapb::Peer, + right_derive: bool, + }, + ReportBatchSplit { + regions: Vec, + }, +} + +impl Display for Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match *self { + Task::Heartbeat(ref hb_task) => write!( + f, + "heartbeat for region {:?}, leader {}", + hb_task.region, + hb_task.peer.get_id(), + ), + Task::StoreHeartbeat { ref stats, .. } => { + write!(f, "store heartbeat stats: {:?}", stats) + } + Task::DestroyPeer { ref region_id } => { + write!(f, "destroy peer of region {}", region_id) + } + Task::AskBatchSplit { + ref region, + ref split_keys, + .. + } => write!( + f, + "ask split region {} with {}", + region.get_id(), + KeysInfoFormatter(split_keys.iter()) + ), + Task::ReportBatchSplit { ref regions } => write!(f, "report split {:?}", regions), + } + } +} + +pub struct Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + pd_client: Arc, + raft_engine: ER, + tablet_factory: Arc>, + router: StoreRouter, + + remote: Remote, + + region_peers: HashMap, + + start_ts: UnixSecs, + store_stat: store_heartbeat::StoreStat, + + region_cpu_records: HashMap, + + logger: Logger, +} + +impl Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + pub fn new( + pd_client: Arc, + raft_engine: ER, + tablet_factory: Arc>, + router: StoreRouter, + remote: Remote, + logger: Logger, + ) -> Self { + Self { + pd_client, + raft_engine, + tablet_factory, + router, + remote, + region_peers: HashMap::default(), + start_ts: UnixSecs::zero(), + store_stat: store_heartbeat::StoreStat::default(), + region_cpu_records: HashMap::default(), + logger, + } + } +} +impl Runnable for Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + type Task = Task; + + fn run(&mut self, task: Task) { + match task { + Task::Heartbeat(task) => self.handle_heartbeat(task), + Task::StoreHeartbeat { stats } => self.handle_store_heartbeat(stats), + Task::DestroyPeer { region_id } => self.handle_destroy_peer(region_id), + Task::AskBatchSplit { + region, + split_keys, + peer, + right_derive, + } => self.handle_ask_batch_split(region, split_keys, peer, right_derive), + Task::ReportBatchSplit { regions } => self.handle_report_batch_split(regions), + _ => unimplemented!(), + } + } +} + +impl Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + fn handle_destroy_peer(&mut self, region_id: u64) { + match self.region_peers.remove(®ion_id) { + None => {} + Some(_) => { + info!(self.logger, "remove peer statistic record in pd"; "region_id" => region_id) + } + } + } +} + +pub fn send_admin_request( + logger: &Logger, + router: &StoreRouter, + region_id: u64, + epoch: metapb::RegionEpoch, + peer: metapb::Peer, + request: AdminRequest, +) where + EK: KvEngine, + ER: RaftEngine, +{ + let cmd_type = request.get_cmd_type(); + + let mut req = RaftCmdRequest::default(); + req.mut_header().set_region_id(region_id); + req.mut_header().set_region_epoch(epoch); + req.mut_header().set_peer(peer); + req.set_admin_request(request); + + let (msg, _) = PeerMsg::raft_command(req); + if let Err(e) = router.send(region_id, msg) { + error!( + logger, + "send request failed"; + "region_id" => region_id, "cmd_type" => ?cmd_type, "err" => ?e, + ); + } +} diff --git a/components/raftstore-v2/src/worker/pd/split.rs b/components/raftstore-v2/src/worker/pd/split.rs new file mode 100644 index 00000000000..7bcb3326eb6 --- /dev/null +++ b/components/raftstore-v2/src/worker/pd/split.rs @@ -0,0 +1,99 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::{KvEngine, RaftEngine}; +use kvproto::{ + metapb, pdpb, + raft_cmdpb::{AdminCmdType, AdminRequest, SplitRequest}, +}; +use pd_client::PdClient; +use slog::{info, warn}; + +use super::{send_admin_request, Runner}; + +fn new_batch_split_region_request( + split_keys: Vec>, + ids: Vec, + right_derive: bool, +) -> AdminRequest { + let mut req = AdminRequest::default(); + req.set_cmd_type(AdminCmdType::BatchSplit); + req.mut_splits().set_right_derive(right_derive); + let mut requests = Vec::with_capacity(ids.len()); + for (mut id, key) in ids.into_iter().zip(split_keys) { + let mut split = SplitRequest::default(); + split.set_split_key(key); + split.set_new_region_id(id.get_new_region_id()); + split.set_new_peer_ids(id.take_new_peer_ids()); + requests.push(split); + } + req.mut_splits().set_requests(requests.into()); + req +} + +impl Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + pub fn handle_ask_batch_split( + &mut self, + mut region: metapb::Region, + split_keys: Vec>, + peer: metapb::Peer, + right_derive: bool, + ) { + if split_keys.is_empty() { + info!(self.logger, "empty split key, skip ask batch split"; + "region_id" => region.get_id()); + return; + } + let resp = self + .pd_client + .ask_batch_split(region.clone(), split_keys.len()); + let router = self.router.clone(); + let logger = self.logger.clone(); + let f = async move { + match resp.await { + Ok(mut resp) => { + info!( + logger, + "try to batch split region"; + "region_id" => region.get_id(), + "new_region_ids" => ?resp.get_ids(), + "region" => ?region, + ); + + let req = new_batch_split_region_request( + split_keys, + resp.take_ids().into(), + right_derive, + ); + let region_id = region.get_id(); + let epoch = region.take_region_epoch(); + send_admin_request(&logger, &router, region_id, epoch, peer, req); + } + Err(e) => { + warn!( + logger, + "ask batch split failed"; + "region_id" => region.get_id(), + "err" => ?e, + ); + } + } + }; + self.remote.spawn(f); + } + + pub fn handle_report_batch_split(&mut self, regions: Vec) { + let resp = self.pd_client.report_batch_split(regions); + let logger = self.logger.clone(); + let f = async move { + if let Err(e) = resp.await { + warn!(logger, "report split failed"; "err" => ?e); + } + }; + self.remote.spawn(f); + } +} diff --git a/components/raftstore-v2/src/worker/pd/store_heartbeat.rs b/components/raftstore-v2/src/worker/pd/store_heartbeat.rs new file mode 100644 index 00000000000..1caa96a5225 --- /dev/null +++ b/components/raftstore-v2/src/worker/pd/store_heartbeat.rs @@ -0,0 +1,293 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::cmp; + +use collections::HashMap; +use engine_traits::{KvEngine, RaftEngine}; +use fail::fail_point; +use kvproto::pdpb; +use pd_client::{ + metrics::{ + REGION_READ_BYTES_HISTOGRAM, REGION_READ_KEYS_HISTOGRAM, REGION_WRITTEN_BYTES_HISTOGRAM, + REGION_WRITTEN_KEYS_HISTOGRAM, STORE_SIZE_GAUGE_VEC, + }, + PdClient, +}; +use prometheus::local::LocalHistogram; +use slog::{error, warn}; +use tikv_util::{metrics::RecordPairVec, store::QueryStats, time::UnixSecs, topn::TopN}; + +use super::Runner; + +const HOTSPOT_REPORT_CAPACITY: usize = 1000; + +fn hotspot_key_report_threshold() -> u64 { + const HOTSPOT_KEY_RATE_THRESHOLD: u64 = 128; + fail_point!("mock_hotspot_threshold", |_| { 0 }); + HOTSPOT_KEY_RATE_THRESHOLD * 10 +} + +fn hotspot_byte_report_threshold() -> u64 { + const HOTSPOT_BYTE_RATE_THRESHOLD: u64 = 8 * 1024; + fail_point!("mock_hotspot_threshold", |_| { 0 }); + HOTSPOT_BYTE_RATE_THRESHOLD * 10 +} + +fn hotspot_query_num_report_threshold() -> u64 { + const HOTSPOT_QUERY_RATE_THRESHOLD: u64 = 128; + fail_point!("mock_hotspot_threshold", |_| { 0 }); + HOTSPOT_QUERY_RATE_THRESHOLD * 10 +} + +pub struct StoreStat { + pub engine_total_bytes_read: u64, + pub engine_total_keys_read: u64, + pub engine_total_query_num: QueryStats, + pub engine_last_total_bytes_read: u64, + pub engine_last_total_keys_read: u64, + pub engine_last_query_num: QueryStats, + pub last_report_ts: UnixSecs, + + pub region_bytes_read: LocalHistogram, + pub region_keys_read: LocalHistogram, + pub region_bytes_written: LocalHistogram, + pub region_keys_written: LocalHistogram, + + pub store_cpu_usages: RecordPairVec, + pub store_read_io_rates: RecordPairVec, + pub store_write_io_rates: RecordPairVec, +} + +impl Default for StoreStat { + fn default() -> StoreStat { + StoreStat { + region_bytes_read: REGION_READ_BYTES_HISTOGRAM.local(), + region_keys_read: REGION_READ_KEYS_HISTOGRAM.local(), + region_bytes_written: REGION_WRITTEN_BYTES_HISTOGRAM.local(), + region_keys_written: REGION_WRITTEN_KEYS_HISTOGRAM.local(), + + last_report_ts: UnixSecs::zero(), + engine_total_bytes_read: 0, + engine_total_keys_read: 0, + engine_last_total_bytes_read: 0, + engine_last_total_keys_read: 0, + engine_total_query_num: QueryStats::default(), + engine_last_query_num: QueryStats::default(), + + store_cpu_usages: RecordPairVec::default(), + store_read_io_rates: RecordPairVec::default(), + store_write_io_rates: RecordPairVec::default(), + } + } +} + +#[derive(Default, Clone)] +struct PeerCmpReadStat { + pub region_id: u64, + pub report_stat: u64, +} + +impl Ord for PeerCmpReadStat { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.report_stat.cmp(&other.report_stat) + } +} + +impl Eq for PeerCmpReadStat {} + +impl PartialEq for PeerCmpReadStat { + fn eq(&self, other: &Self) -> bool { + self.report_stat == other.report_stat + } +} + +impl PartialOrd for PeerCmpReadStat { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.report_stat.cmp(&other.report_stat)) + } +} + +fn collect_report_read_peer_stats( + capacity: usize, + mut report_read_stats: HashMap, + mut stats: pdpb::StoreStats, +) -> pdpb::StoreStats { + if report_read_stats.len() < capacity * 3 { + for (_, read_stat) in report_read_stats { + stats.peer_stats.push(read_stat); + } + return stats; + } + let mut keys_topn_report = TopN::new(capacity); + let mut bytes_topn_report = TopN::new(capacity); + let mut stats_topn_report = TopN::new(capacity); + for read_stat in report_read_stats.values() { + let mut cmp_stat = PeerCmpReadStat::default(); + cmp_stat.region_id = read_stat.region_id; + let mut key_cmp_stat = cmp_stat.clone(); + key_cmp_stat.report_stat = read_stat.read_keys; + keys_topn_report.push(key_cmp_stat); + let mut byte_cmp_stat = cmp_stat.clone(); + byte_cmp_stat.report_stat = read_stat.read_bytes; + bytes_topn_report.push(byte_cmp_stat); + let mut query_cmp_stat = cmp_stat.clone(); + query_cmp_stat.report_stat = get_read_query_num(read_stat.get_query_stats()); + stats_topn_report.push(query_cmp_stat); + } + + for x in keys_topn_report { + if let Some(report_stat) = report_read_stats.remove(&x.region_id) { + stats.peer_stats.push(report_stat); + } + } + + for x in bytes_topn_report { + if let Some(report_stat) = report_read_stats.remove(&x.region_id) { + stats.peer_stats.push(report_stat); + } + } + + for x in stats_topn_report { + if let Some(report_stat) = report_read_stats.remove(&x.region_id) { + stats.peer_stats.push(report_stat); + } + } + stats +} + +fn get_read_query_num(stat: &pdpb::QueryStats) -> u64 { + stat.get_get() + stat.get_coprocessor() + stat.get_scan() +} + +impl Runner +where + EK: KvEngine, + ER: RaftEngine, + T: PdClient + 'static, +{ + pub fn handle_store_heartbeat(&mut self, mut stats: pdpb::StoreStats) { + let mut report_peers = HashMap::default(); + for (region_id, region_peer) in &mut self.region_peers { + let read_bytes = region_peer.read_bytes - region_peer.last_store_report_read_bytes; + let read_keys = region_peer.read_keys - region_peer.last_store_report_read_keys; + let query_stats = region_peer + .query_stats + .sub_query_stats(®ion_peer.last_store_report_query_stats); + region_peer.last_store_report_read_bytes = region_peer.read_bytes; + region_peer.last_store_report_read_keys = region_peer.read_keys; + region_peer + .last_store_report_query_stats + .fill_query_stats(®ion_peer.query_stats); + if read_bytes < hotspot_byte_report_threshold() + && read_keys < hotspot_key_report_threshold() + && query_stats.get_read_query_num() < hotspot_query_num_report_threshold() + { + continue; + } + let mut read_stat = pdpb::PeerStat::default(); + read_stat.set_region_id(*region_id); + read_stat.set_read_keys(read_keys); + read_stat.set_read_bytes(read_bytes); + read_stat.set_query_stats(query_stats.0); + report_peers.insert(*region_id, read_stat); + } + + stats = collect_report_read_peer_stats(HOTSPOT_REPORT_CAPACITY, report_peers, stats); + let (capacity, used_size, available) = self.collect_engine_size().unwrap_or_default(); + if available == 0 { + warn!(self.logger, "no available space"); + } + + stats.set_capacity(capacity); + stats.set_used_size(used_size); + stats.set_available(available); + stats.set_bytes_read( + self.store_stat.engine_total_bytes_read - self.store_stat.engine_last_total_bytes_read, + ); + stats.set_keys_read( + self.store_stat.engine_total_keys_read - self.store_stat.engine_last_total_keys_read, + ); + + self.store_stat + .engine_total_query_num + .add_query_stats(stats.get_query_stats()); // add write query stat + let res = self + .store_stat + .engine_total_query_num + .sub_query_stats(&self.store_stat.engine_last_query_num); + stats.set_query_stats(res.0); + + stats.set_cpu_usages(self.store_stat.store_cpu_usages.clone().into()); + stats.set_read_io_rates(self.store_stat.store_read_io_rates.clone().into()); + stats.set_write_io_rates(self.store_stat.store_write_io_rates.clone().into()); + + let mut interval = pdpb::TimeInterval::default(); + interval.set_start_timestamp(self.store_stat.last_report_ts.into_inner()); + stats.set_interval(interval); + self.store_stat.engine_last_total_bytes_read = self.store_stat.engine_total_bytes_read; + self.store_stat.engine_last_total_keys_read = self.store_stat.engine_total_keys_read; + self.store_stat + .engine_last_query_num + .fill_query_stats(&self.store_stat.engine_total_query_num); + self.store_stat.last_report_ts = UnixSecs::now(); + self.store_stat.region_bytes_written.flush(); + self.store_stat.region_keys_written.flush(); + self.store_stat.region_bytes_read.flush(); + self.store_stat.region_keys_read.flush(); + + STORE_SIZE_GAUGE_VEC + .with_label_values(&["capacity"]) + .set(capacity as i64); + STORE_SIZE_GAUGE_VEC + .with_label_values(&["available"]) + .set(available as i64); + STORE_SIZE_GAUGE_VEC + .with_label_values(&["used"]) + .set(used_size as i64); + + // TODO: slow score + + let router = self.router.clone(); + let resp = self.pd_client.store_heartbeat(stats, None, None); + let logger = self.logger.clone(); + let f = async move { + if let Err(e) = resp.await { + error!(logger, "store heartbeat failed"; "err" => ?e); + } + }; + self.remote.spawn(f); + } + + /// Returns (capacity, used, available). + fn collect_engine_size(&self) -> Option<(u64, u64, u64)> { + let disk_stats = match fs2::statvfs(self.tablet_factory.tablets_path()) { + Err(e) => { + error!( + self.logger, + "get disk stat for rocksdb failed"; + "engine_path" => self.tablet_factory.tablets_path().display(), + "err" => ?e + ); + return None; + } + Ok(stats) => stats, + }; + let disk_cap = disk_stats.total_space(); + // TODO: custom capacity. + let capacity = disk_cap; + // TODO: accurate snapshot size and kv engines size. + let snap_size = 0; + let kv_size = 0; + let used_size = snap_size + + kv_size + + self + .raft_engine + .get_engine_size() + .expect("raft engine used size"); + let mut available = capacity.checked_sub(used_size).unwrap_or_default(); + // We only care about rocksdb SST file size, so we should check disk available + // here. + available = cmp::min(available, disk_stats.available_space()); + Some((capacity, used_size, available)) + } +} diff --git a/components/raftstore-v2/tests/integrations/cluster.rs b/components/raftstore-v2/tests/integrations/cluster.rs index 1d458d7a73e..92b23c8b7cc 100644 --- a/components/raftstore-v2/tests/integrations/cluster.rs +++ b/components/raftstore-v2/tests/integrations/cluster.rs @@ -160,7 +160,7 @@ pub struct RunningState { impl RunningState { fn new( - pd_client: &RpcClient, + pd_client: &Arc, path: &Path, cfg: Arc>, transport: TestTransport, @@ -179,7 +179,7 @@ impl RunningState { let raft_engine = engine_test::raft::new_engine(&format!("{}", path.join("raft").display()), None) .unwrap(); - let mut bootstrap = Bootstrap::new(&raft_engine, 0, pd_client, logger.clone()); + let mut bootstrap = Bootstrap::new(&raft_engine, 0, pd_client.as_ref(), logger.clone()); let store_id = bootstrap.bootstrap_store().unwrap(); let mut store = Store::default(); store.set_id(store_id); @@ -214,6 +214,7 @@ impl RunningState { raft_engine.clone(), factory.clone(), transport.clone(), + pd_client.clone(), router.store_router(), store_meta.clone(), ) @@ -239,7 +240,7 @@ impl Drop for RunningState { } pub struct TestNode { - pd_client: RpcClient, + pd_client: Arc, path: TempDir, running_state: Option, logger: Logger, @@ -247,7 +248,7 @@ pub struct TestNode { impl TestNode { fn with_pd(pd_server: &test_pd::Server, logger: Logger) -> TestNode { - let pd_client = test_pd::util::new_client(pd_server.bind_addrs(), None); + let pd_client = Arc::new(test_pd::util::new_client(pd_server.bind_addrs(), None)); let path = TempDir::new().unwrap(); TestNode { From 38778ad06ea7a951ba03ba25335b307c84d5e903 Mon Sep 17 00:00:00 2001 From: tabokie Date: Fri, 11 Nov 2022 16:59:17 +0800 Subject: [PATCH 2/2] schedule pd heartbeat ticks Signed-off-by: tabokie --- components/raftstore-v2/src/batch/store.rs | 2 + components/raftstore-v2/src/fsm/peer.rs | 2 +- components/raftstore-v2/src/fsm/store.rs | 40 +++++- components/raftstore-v2/src/operation/mod.rs | 1 + components/raftstore-v2/src/operation/pd.rs | 136 ++++++++++++++++++ .../raftstore-v2/src/operation/ready/mod.rs | 1 + components/raftstore-v2/src/worker/mod.rs | 2 +- 7 files changed, 175 insertions(+), 9 deletions(-) create mode 100644 components/raftstore-v2/src/operation/pd.rs diff --git a/components/raftstore-v2/src/batch/store.rs b/components/raftstore-v2/src/batch/store.rs index 25b03ff652f..a82eefab61a 100644 --- a/components/raftstore-v2/src/batch/store.rs +++ b/components/raftstore-v2/src/batch/store.rs @@ -71,6 +71,7 @@ pub struct StoreContext { pub tablet_factory: Arc>, pub apply_pool: FuturePool, pub read_scheduler: Scheduler>, + pub pd_scheduler: Scheduler, } /// A [`PollHandler`] that handles updates of [`StoreFsm`]s and [`PeerFsm`]s. @@ -330,6 +331,7 @@ where tablet_factory: self.tablet_factory.clone(), apply_pool: self.apply_pool.clone(), read_scheduler: self.read_scheduler.clone(), + pd_scheduler: self.pd_scheduler.clone(), }; let cfg_tracker = self.cfg.clone().tracker("raftstore".to_string()); StorePoller::new(poll_ctx, cfg_tracker) diff --git a/components/raftstore-v2/src/fsm/peer.rs b/components/raftstore-v2/src/fsm/peer.rs index 7083a9e529c..25bbee28660 100644 --- a/components/raftstore-v2/src/fsm/peer.rs +++ b/components/raftstore-v2/src/fsm/peer.rs @@ -193,9 +193,9 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER, fn on_tick(&mut self, tick: PeerTick) { match tick { PeerTick::Raft => self.on_raft_tick(), + PeerTick::PdHeartbeat => self.on_pd_heartbeat(), PeerTick::RaftLogGc => unimplemented!(), PeerTick::SplitRegionCheck => unimplemented!(), - PeerTick::PdHeartbeat => unimplemented!(), PeerTick::CheckMerge => unimplemented!(), PeerTick::CheckPeerStaleState => unimplemented!(), PeerTick::EntryCacheEvict => unimplemented!(), diff --git a/components/raftstore-v2/src/fsm/store.rs b/components/raftstore-v2/src/fsm/store.rs index 3be571bdfbc..b109f6ebf43 100644 --- a/components/raftstore-v2/src/fsm/store.rs +++ b/components/raftstore-v2/src/fsm/store.rs @@ -1,13 +1,18 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use batch_system::Fsm; use collections::HashMap; use engine_traits::{KvEngine, RaftEngine}; +use futures::{compat::Future01CompatExt, FutureExt}; use raftstore::store::{Config, ReadDelegate}; -use slog::{o, Logger}; -use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; +use slog::{info, o, Logger}; +use tikv_util::{ + future::poll_future_notify, + is_zero_duration, + mpsc::{self, LooseBoundedSender, Receiver}, +}; use crate::{ batch::StoreContext, @@ -74,7 +79,7 @@ impl Store { } pub struct StoreFsm { - store: Store, + pub store: Store, receiver: Receiver, } @@ -118,8 +123,8 @@ impl Fsm for StoreFsm { } pub struct StoreFsmDelegate<'a, EK: KvEngine, ER: RaftEngine, T> { - fsm: &'a mut StoreFsm, - store_ctx: &'a mut StoreContext, + pub fsm: &'a mut StoreFsm, + pub store_ctx: &'a mut StoreContext, } impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> { @@ -139,8 +144,29 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> { ); } + pub fn schedule_tick(&mut self, tick: StoreTick, timeout: Duration) { + if !is_zero_duration(&timeout) { + let mb = self.store_ctx.router.control_mailbox(); + let logger = self.fsm.store.logger().clone(); + let delay = self.store_ctx.timer.delay(timeout).compat().map(move |_| { + if let Err(e) = mb.force_send(StoreMsg::Tick(tick)) { + info!( + logger, + "failed to schedule store tick, are we shutting down?"; + "tick" => ?tick, + "err" => ?e + ); + } + }); + poll_future_notify(delay); + } + } + fn on_tick(&mut self, tick: StoreTick) { - unimplemented!() + match tick { + StoreTick::PdStoreHeartbeat => self.on_pd_store_heartbeat(), + _ => unimplemented!(), + } } pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec) { diff --git a/components/raftstore-v2/src/operation/mod.rs b/components/raftstore-v2/src/operation/mod.rs index 5b19db91b71..762de212119 100644 --- a/components/raftstore-v2/src/operation/mod.rs +++ b/components/raftstore-v2/src/operation/mod.rs @@ -2,6 +2,7 @@ mod command; mod life; +mod pd; mod query; mod ready; diff --git a/components/raftstore-v2/src/operation/pd.rs b/components/raftstore-v2/src/operation/pd.rs new file mode 100644 index 00000000000..2c26a07746f --- /dev/null +++ b/components/raftstore-v2/src/operation/pd.rs @@ -0,0 +1,136 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +//! This module implements the interactions with pd. + +use std::cmp; + +use engine_traits::{KvEngine, RaftEngine}; +use fail::fail_point; +use kvproto::pdpb; +use raftstore::store::Transport; +use slog::error; + +use crate::{ + batch::StoreContext, + fsm::{PeerFsmDelegate, Store, StoreFsmDelegate}, + raft::Peer, + router::{PeerTick, StoreTick}, + worker::{PdHeartbeatTask, PdTask}, +}; + +impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> { + pub fn on_pd_store_heartbeat(&mut self) { + self.fsm.store.store_heartbeat_pd(self.store_ctx); + self.schedule_tick( + StoreTick::PdStoreHeartbeat, + self.store_ctx.cfg.pd_store_heartbeat_tick_interval.0, + ); + } +} + +impl Store { + pub fn store_heartbeat_pd(&mut self, ctx: &mut StoreContext) + where + EK: KvEngine, + ER: RaftEngine, + { + let mut stats = pdpb::StoreStats::default(); + + stats.set_store_id(self.store_id()); + { + let meta = ctx.store_meta.lock().unwrap(); + stats.set_region_count(meta.tablet_caches.len() as u32); + } + + stats.set_sending_snap_count(0); + stats.set_receiving_snap_count(0); + + stats.set_start_time(self.start_time().unwrap() as u32); + + stats.set_bytes_written(0); + stats.set_keys_written(0); + stats.set_is_busy(false); + + // stats.set_query_stats(query_stats); + + let task = PdTask::StoreHeartbeat { stats }; + if let Err(e) = ctx.pd_scheduler.schedule(task) { + error!(self.logger(), "notify pd failed"; + "store_id" => self.store_id(), + "err" => ?e + ); + } + } +} + +impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER, T> { + pub fn on_pd_heartbeat(&mut self) { + self.fsm.peer_mut().heartbeat_pd(self.store_ctx); + self.schedule_tick(PeerTick::PdHeartbeat); + } +} + +impl Peer { + pub fn heartbeat_pd(&mut self, ctx: &mut StoreContext) { + let task = PdTask::Heartbeat(PdHeartbeatTask { + term: self.term(), + region: self.region().clone(), + down_peers: Vec::new(), + peer: self.peer().clone(), + pending_peers: Vec::new(), + written_bytes: 0, + written_keys: 0, + approximate_size: None, + approximate_keys: None, + wait_data_peers: Vec::new(), + }); + if let Err(e) = ctx.pd_scheduler.schedule(task) { + error!( + self.logger, + "failed to notify pd"; + "region_id" => self.region_id(), + "peer_id" => self.peer_id(), + "err" => ?e, + ); + return; + } + fail_point!("schedule_check_split"); + } + + pub fn destroy_peer_pd(&mut self, ctx: &mut StoreContext) { + let task = PdTask::DestroyPeer { + region_id: self.region_id(), + }; + if let Err(e) = ctx.pd_scheduler.schedule(task) { + error!( + self.logger, + "failed to notify pd"; + "region_id" => self.region_id(), + "peer_id" => self.peer_id(), + "err" => %e, + ); + } + } + + pub fn ask_batch_split_pd( + &mut self, + ctx: &mut StoreContext, + split_keys: Vec>, + ) { + let task = PdTask::AskBatchSplit { + region: self.region().clone(), + split_keys, + peer: self.peer().clone(), + right_derive: ctx.cfg.right_derive_when_split, + }; + if let Err(e) = ctx.pd_scheduler.schedule(task) { + error!( + self.logger, + "failed to notify pd to split"; + "region_id" => self.region_id(), + "peer_id" => self.peer_id(), + "err" => %e, + ); + } + } +} diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index 62cb42ef253..91cc875764e 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -41,6 +41,7 @@ use crate::{ raft::{Peer, Storage}, router::{ApplyTask, PeerTick}, }; + impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER, T> { /// Raft relies on periodic ticks to keep the state machine sync with other /// peers. diff --git a/components/raftstore-v2/src/worker/mod.rs b/components/raftstore-v2/src/worker/mod.rs index 8c164c9cf96..bc5bc5945b4 100644 --- a/components/raftstore-v2/src/worker/mod.rs +++ b/components/raftstore-v2/src/worker/mod.rs @@ -2,4 +2,4 @@ mod pd; -pub use pd::{Runner as PdRunner, Task as PdTask}; +pub use pd::{HeartbeatTask as PdHeartbeatTask, Runner as PdRunner, Task as PdTask};