From b5e056363311ba0dc157a12525028d386bfc10e2 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Tue, 20 Feb 2024 20:37:57 +0800 Subject: [PATCH] service/kv: Support sending health feedback via BatchCommandResponse (#16498) ref tikv/tikv#16297 Support sending health feedback information to the client via BatchCommandResponse Signed-off-by: MyonKeminta Signed-off-by: dbsid --- Cargo.lock | 2 +- components/health_controller/src/lib.rs | 6 +- components/health_controller/src/reporters.rs | 20 ++- src/server/config.rs | 7 ++ src/server/server.rs | 8 ++ src/server/service/kv.rs | 119 +++++++++++++++++- tests/integrations/config/mod.rs | 1 + tests/integrations/config/test-custom.toml | 1 + tests/integrations/server/kv_service.rs | 27 ++++ 9 files changed, 187 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d007ae9125b..ac9b460722b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2838,7 +2838,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#05a3758a1d248e7678a6a7f0c5578c3ca1ee5796" +source = "git+https://github.com/pingcap/kvproto.git#a554af8ee11ffdf0adbbf0efac11220e416ddacf" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/health_controller/src/lib.rs b/components/health_controller/src/lib.rs index 4e5504932e2..baf7f794b85 100644 --- a/components/health_controller/src/lib.rs +++ b/components/health_controller/src/lib.rs @@ -100,7 +100,7 @@ impl HealthControllerInner { let health_service = HealthService::default(); health_service.set_serving_status("", grpcio_health::ServingStatus::NotServing); Self { - raftstore_slow_score: AtomicU64::new(1), + raftstore_slow_score: AtomicU64::new(f64::to_bits(1.0)), raftstore_slow_trend: RollingRetriever::new(), health_service, @@ -329,6 +329,10 @@ mod tests { #[test] fn test_health_controller_update_service_status() { let h = HealthController::new(); + + // Initial value of slow score + assert_eq!(h.get_raftstore_slow_score(), 1.0); + assert_eq!( h.get_serving_status(), grpcio_health::ServingStatus::NotServing diff --git a/components/health_controller/src/reporters.rs b/components/health_controller/src/reporters.rs index c80bb96057c..96514cf5414 100644 --- a/components/health_controller/src/reporters.rs +++ b/components/health_controller/src/reporters.rs @@ -54,7 +54,7 @@ impl RaftstoreReporter { const MODULE_NAME: &'static str = "raftstore"; pub fn new(health_controller: &HealthController, cfg: RaftstoreReporterConfig) -> Self { - RaftstoreReporter { + Self { health_controller_inner: health_controller.inner.clone(), slow_score: SlowScore::new(cfg.inspect_interval), slow_trend: SlowTrendStatistics::new(cfg), @@ -242,3 +242,21 @@ impl SlowTrendStatistics { self.slow_cause.record(latency, Instant::now()); } } + +/// A reporter that can set states directly, for testing purposes. +pub struct TestReporter { + health_controller_inner: Arc, +} + +impl TestReporter { + pub fn new(health_controller: &HealthController) -> Self { + Self { + health_controller_inner: health_controller.inner.clone(), + } + } + + pub fn set_raftstore_slow_score(&self, slow_score: f64) { + self.health_controller_inner + .update_raftstore_slow_score(slow_score); + } +} diff --git a/src/server/config.rs b/src/server/config.rs index 4e66e5802c0..55d384e238a 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -183,6 +183,12 @@ pub struct Config { #[doc(hidden)] pub simplify_metrics: bool, + #[doc(hidden)] + #[online_config(skip)] + /// Minimum interval to send health feedback information in each + /// `BatchCommands` gRPC stream. 0 to disable sending health feedback. + pub health_feedback_interval: ReadableDuration, + // Server labels to specify some attributes about this server. #[online_config(skip)] pub labels: HashMap, @@ -264,6 +270,7 @@ impl Default for Config { // Go tikv client uses 4 as well. forward_max_connections_per_address: 4, simplify_metrics: false, + health_feedback_interval: ReadableDuration::secs(1), } } } diff --git a/src/server/server.rs b/src/server/server.rs index c54a0cb75be..8f3808f990c 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -186,6 +186,12 @@ where let lazy_worker = snap_worker.lazy_build("snap-handler"); let raft_ext = storage.get_engine().raft_extension(); + let health_feedback_interval = if cfg.value().health_feedback_interval.0.is_zero() { + None + } else { + Some(cfg.value().health_feedback_interval.0) + }; + let proxy = Proxy::new(security_mgr.clone(), &env, Arc::new(cfg.value().clone())); let kv_service = KvService::new( store_id, @@ -200,6 +206,8 @@ where proxy, cfg.value().reject_messages_on_memory_ratio, resource_manager, + health_controller.clone(), + health_feedback_interval, ); let builder_factory = Box::new(BuilderFactory::new( kv_service, diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 875293be50e..2151e37d1c2 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -1,7 +1,14 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. // #[PerformanceCriticalPath]: TiKV gRPC APIs implementation -use std::{mem, sync::Arc, time::Duration}; +use std::{ + mem, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; use api_version::KvFormat; use fail::fail_point; @@ -15,6 +22,7 @@ use grpcio::{ ClientStreamingSink, DuplexSink, Error as GrpcError, RequestStream, Result as GrpcResult, RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, UnarySink, WriteFlags, }; +use health_controller::HealthController; use kvproto::{coprocessor::*, kvrpcpb::*, mpp::*, raft_serverpb::*, tikvpb::*}; use protobuf::RepeatedField; use raft::eraftpb::MessageType; @@ -89,6 +97,10 @@ pub struct Service { reject_messages_on_memory_ratio: f64, resource_manager: Option>, + + health_controller: HealthController, + health_feedback_interval: Option, + health_feedback_seq: Arc, } impl Drop for Service { @@ -112,6 +124,9 @@ impl Clone for Service Service { proxy: Proxy, reject_messages_on_memory_ratio: f64, resource_manager: Option>, + health_controller: HealthController, + health_feedback_interval: Option, ) -> Self { + let now_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; Service { store_id, gc_worker, @@ -145,6 +166,9 @@ impl Service { proxy, reject_messages_on_memory_ratio, resource_manager, + health_controller, + health_feedback_interval, + health_feedback_seq: Arc::new(AtomicU64::new(now_unix)), } } @@ -935,6 +959,12 @@ impl Tikv for Service { let pool_size = storage.get_normal_pool_size(); let batch_builder = BatcherBuilder::new(self.enable_req_batch, pool_size); let resource_manager = self.resource_manager.clone(); + let mut health_feedback_attacher = HealthFeedbackAttacher::new( + self.store_id, + self.health_controller.clone(), + self.health_feedback_seq.clone(), + self.health_feedback_interval, + ); let request_handler = stream.try_for_each(move |mut req| { let request_ids = req.take_request_ids(); let requests: Vec<_> = req.take_requests().into(); @@ -978,6 +1008,7 @@ impl Tikv for Service { GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64); // TODO: per thread load is more reasonable for batching. r.set_transport_layer_load(grpc_thread_load.total_load() as u64); + health_feedback_attacher.attach_if_needed(&mut r); GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok(( r, WriteFlags::default().buffer_hint(false), @@ -2431,6 +2462,56 @@ fn needs_reject_raft_append(reject_messages_on_memory_ratio: f64) -> bool { false } +struct HealthFeedbackAttacher { + store_id: u64, + health_controller: HealthController, + last_feedback_time: Option, + seq: Arc, + feedback_interval: Option, +} + +impl HealthFeedbackAttacher { + fn new( + store_id: u64, + health_controller: HealthController, + seq: Arc, + feedback_interval: Option, + ) -> Self { + Self { + store_id, + health_controller, + last_feedback_time: None, + seq, + feedback_interval, + } + } + + fn attach_if_needed(&mut self, resp: &mut BatchCommandsResponse) { + let feedback_interval = match self.feedback_interval { + Some(i) => i, + None => return, + }; + + let now = Instant::now_coarse(); + + if let Some(last_feedback_time) = self.last_feedback_time + && now - last_feedback_time < feedback_interval + { + return; + } + + self.attach(resp, now); + } + + fn attach(&mut self, resp: &mut BatchCommandsResponse, now: Instant) { + self.last_feedback_time = Some(now); + let feedback = resp.mut_health_feedback(); + feedback.set_store_id(self.store_id); + feedback.set_feedback_seq_no(self.seq.fetch_add(1, Ordering::Relaxed)); + feedback.set_slow_score(self.health_controller.get_raftstore_slow_score() as i32); + } +} + #[cfg(test)] mod tests { use std::thread; @@ -2486,4 +2567,40 @@ mod tests { poll_future_notify(task); assert_eq!(block_on(rx1).unwrap(), 200); } + + #[test] + fn test_health_feedback_attacher() { + let health_controller = HealthController::new(); + let test_reporter = health_controller::reporters::TestReporter::new(&health_controller); + let seq = Arc::new(AtomicU64::new(1)); + + let mut a = HealthFeedbackAttacher::new(1, health_controller.clone(), seq.clone(), None); + let mut resp = BatchCommandsResponse::default(); + a.attach_if_needed(&mut resp); + assert!(!resp.has_health_feedback()); + + let mut a = + HealthFeedbackAttacher::new(1, health_controller, seq, Some(Duration::from_secs(1))); + resp = BatchCommandsResponse::default(); + a.attach_if_needed(&mut resp); + assert!(resp.has_health_feedback()); + assert_eq!(resp.get_health_feedback().get_store_id(), 1); + assert_eq!(resp.get_health_feedback().get_feedback_seq_no(), 1); + assert_eq!(resp.get_health_feedback().get_slow_score(), 1); + + // Skips attaching feedback because last attaching was just done. + test_reporter.set_raftstore_slow_score(50.); + resp = BatchCommandsResponse::default(); + a.attach_if_needed(&mut resp); + assert!(!resp.has_health_feedback()); + + // Simulate elapsing enough time by changing the recorded last update time. + *a.last_feedback_time.as_mut().unwrap() -= Duration::from_millis(1001); + a.attach_if_needed(&mut resp); + assert!(resp.has_health_feedback()); + assert_eq!(resp.get_health_feedback().get_store_id(), 1); + // Seq no increased. + assert_eq!(resp.get_health_feedback().get_feedback_seq_no(), 2); + assert_eq!(resp.get_health_feedback().get_slow_score(), 50); + } } diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 155e2133c21..16c391120f6 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -123,6 +123,7 @@ fn test_serde_custom_tikv_config() { forward_max_connections_per_address: 5, reject_messages_on_memory_ratio: 0.8, simplify_metrics: false, + health_feedback_interval: ReadableDuration::secs(2), ..Default::default() }; value.readpool = ReadPoolConfig { diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index 3d6a9075db9..45caf552fb0 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -84,6 +84,7 @@ background-thread-count = 999 end-point-slow-log-threshold = "1s" forward-max-connections-per-address = 5 reject-messages-on-memory-ratio = 0.8 +health-feedback-interval = "2s" [server.labels] a = "b" diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index 88bb589461f..80cc7e0b200 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -1758,6 +1758,33 @@ fn test_batch_commands() { rx.recv_timeout(Duration::from_secs(1)).unwrap(); } +#[test_case(test_raftstore::must_new_cluster_and_kv_client)] +#[test_case(test_raftstore_v2::must_new_cluster_and_kv_client)] +fn test_health_feedback() { + let (_cluster, client, _ctx) = new_cluster(); + let (mut sender, mut receiver) = client.batch_commands().unwrap(); + + let mut batch_req = BatchCommandsRequest::default(); + batch_req.mut_requests().push(Default::default()); + batch_req.mut_request_ids().push(1); + + block_on(sender.send((batch_req.clone(), WriteFlags::default()))).unwrap(); + let resp = block_on(receiver.next()).unwrap().unwrap(); + assert!(resp.has_health_feedback()); + + block_on(sender.send((batch_req.clone(), WriteFlags::default()))).unwrap(); + let resp = block_on(receiver.next()).unwrap().unwrap(); + assert!(!resp.has_health_feedback()); + + thread::sleep(Duration::from_millis(1100)); + block_on(sender.send((batch_req, WriteFlags::default()))).unwrap(); + let resp = block_on(receiver.next()).unwrap().unwrap(); + assert!(resp.has_health_feedback()); + + block_on(sender.close()).unwrap(); + block_on(receiver.for_each(|_| future::ready(()))); +} + #[test_case(test_raftstore::must_new_cluster_and_kv_client)] #[test_case(test_raftstore_v2::must_new_cluster_and_kv_client)] fn test_empty_commands() {