Skip to content

Commit

Permalink
service/kv: Support sending health feedback via BatchCommandResponse (t…
Browse files Browse the repository at this point in the history
…ikv#16498)

ref tikv#16297

Support sending health feedback information to the client via BatchCommandResponse

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: dbsid <chenhuansheng@pingcap.com>
  • Loading branch information
MyonKeminta authored and dbsid committed Mar 24, 2024
1 parent 4a01ee4 commit b5e0563
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion components/health_controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl HealthControllerInner {
let health_service = HealthService::default();
health_service.set_serving_status("", grpcio_health::ServingStatus::NotServing);
Self {
raftstore_slow_score: AtomicU64::new(1),
raftstore_slow_score: AtomicU64::new(f64::to_bits(1.0)),
raftstore_slow_trend: RollingRetriever::new(),

health_service,
Expand Down Expand Up @@ -329,6 +329,10 @@ mod tests {
#[test]
fn test_health_controller_update_service_status() {
let h = HealthController::new();

// Initial value of slow score
assert_eq!(h.get_raftstore_slow_score(), 1.0);

assert_eq!(
h.get_serving_status(),
grpcio_health::ServingStatus::NotServing
Expand Down
20 changes: 19 additions & 1 deletion components/health_controller/src/reporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl RaftstoreReporter {
const MODULE_NAME: &'static str = "raftstore";

pub fn new(health_controller: &HealthController, cfg: RaftstoreReporterConfig) -> Self {
RaftstoreReporter {
Self {
health_controller_inner: health_controller.inner.clone(),
slow_score: SlowScore::new(cfg.inspect_interval),
slow_trend: SlowTrendStatistics::new(cfg),
Expand Down Expand Up @@ -242,3 +242,21 @@ impl SlowTrendStatistics {
self.slow_cause.record(latency, Instant::now());
}
}

/// A reporter that can set states directly, for testing purposes.
pub struct TestReporter {
health_controller_inner: Arc<HealthControllerInner>,
}

impl TestReporter {
pub fn new(health_controller: &HealthController) -> Self {
Self {
health_controller_inner: health_controller.inner.clone(),
}
}

pub fn set_raftstore_slow_score(&self, slow_score: f64) {
self.health_controller_inner
.update_raftstore_slow_score(slow_score);
}
}
7 changes: 7 additions & 0 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ pub struct Config {
#[doc(hidden)]
pub simplify_metrics: bool,

#[doc(hidden)]
#[online_config(skip)]
/// Minimum interval to send health feedback information in each
/// `BatchCommands` gRPC stream. 0 to disable sending health feedback.
pub health_feedback_interval: ReadableDuration,

// Server labels to specify some attributes about this server.
#[online_config(skip)]
pub labels: HashMap<String, String>,
Expand Down Expand Up @@ -264,6 +270,7 @@ impl Default for Config {
// Go tikv client uses 4 as well.
forward_max_connections_per_address: 4,
simplify_metrics: false,
health_feedback_interval: ReadableDuration::secs(1),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ where
let lazy_worker = snap_worker.lazy_build("snap-handler");
let raft_ext = storage.get_engine().raft_extension();

let health_feedback_interval = if cfg.value().health_feedback_interval.0.is_zero() {
None
} else {
Some(cfg.value().health_feedback_interval.0)
};

let proxy = Proxy::new(security_mgr.clone(), &env, Arc::new(cfg.value().clone()));
let kv_service = KvService::new(
store_id,
Expand All @@ -200,6 +206,8 @@ where
proxy,
cfg.value().reject_messages_on_memory_ratio,
resource_manager,
health_controller.clone(),
health_feedback_interval,
);
let builder_factory = Box::new(BuilderFactory::new(
kv_service,
Expand Down
119 changes: 118 additions & 1 deletion src/server/service/kv.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.

// #[PerformanceCriticalPath]: TiKV gRPC APIs implementation
use std::{mem, sync::Arc, time::Duration};
use std::{
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use api_version::KvFormat;
use fail::fail_point;
Expand All @@ -15,6 +22,7 @@ use grpcio::{
ClientStreamingSink, DuplexSink, Error as GrpcError, RequestStream, Result as GrpcResult,
RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, UnarySink, WriteFlags,
};
use health_controller::HealthController;
use kvproto::{coprocessor::*, kvrpcpb::*, mpp::*, raft_serverpb::*, tikvpb::*};
use protobuf::RepeatedField;
use raft::eraftpb::MessageType;
Expand Down Expand Up @@ -89,6 +97,10 @@ pub struct Service<E: Engine, L: LockManager, F: KvFormat> {
reject_messages_on_memory_ratio: f64,

resource_manager: Option<Arc<ResourceGroupManager>>,

health_controller: HealthController,
health_feedback_interval: Option<Duration>,
health_feedback_seq: Arc<AtomicU64>,
}

impl<E: Engine, L: LockManager, F: KvFormat> Drop for Service<E, L, F> {
Expand All @@ -112,6 +124,9 @@ impl<E: Engine + Clone, L: LockManager + Clone, F: KvFormat> Clone for Service<E
proxy: self.proxy.clone(),
reject_messages_on_memory_ratio: self.reject_messages_on_memory_ratio,
resource_manager: self.resource_manager.clone(),
health_controller: self.health_controller.clone(),
health_feedback_seq: self.health_feedback_seq.clone(),
health_feedback_interval: self.health_feedback_interval,
}
}
}
Expand All @@ -131,7 +146,13 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
proxy: Proxy,
reject_messages_on_memory_ratio: f64,
resource_manager: Option<Arc<ResourceGroupManager>>,
health_controller: HealthController,
health_feedback_interval: Option<Duration>,
) -> Self {
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Service {
store_id,
gc_worker,
Expand All @@ -145,6 +166,9 @@ impl<E: Engine, L: LockManager, F: KvFormat> Service<E, L, F> {
proxy,
reject_messages_on_memory_ratio,
resource_manager,
health_controller,
health_feedback_interval,
health_feedback_seq: Arc::new(AtomicU64::new(now_unix)),
}
}

Expand Down Expand Up @@ -935,6 +959,12 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
let pool_size = storage.get_normal_pool_size();
let batch_builder = BatcherBuilder::new(self.enable_req_batch, pool_size);
let resource_manager = self.resource_manager.clone();
let mut health_feedback_attacher = HealthFeedbackAttacher::new(
self.store_id,
self.health_controller.clone(),
self.health_feedback_seq.clone(),
self.health_feedback_interval,
);
let request_handler = stream.try_for_each(move |mut req| {
let request_ids = req.take_request_ids();
let requests: Vec<_> = req.take_requests().into();
Expand Down Expand Up @@ -978,6 +1008,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64);
// TODO: per thread load is more reasonable for batching.
r.set_transport_layer_load(grpc_thread_load.total_load() as u64);
health_feedback_attacher.attach_if_needed(&mut r);
GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok((
r,
WriteFlags::default().buffer_hint(false),
Expand Down Expand Up @@ -2431,6 +2462,56 @@ fn needs_reject_raft_append(reject_messages_on_memory_ratio: f64) -> bool {
false
}

struct HealthFeedbackAttacher {
store_id: u64,
health_controller: HealthController,
last_feedback_time: Option<Instant>,
seq: Arc<AtomicU64>,
feedback_interval: Option<Duration>,
}

impl HealthFeedbackAttacher {
fn new(
store_id: u64,
health_controller: HealthController,
seq: Arc<AtomicU64>,
feedback_interval: Option<Duration>,
) -> Self {
Self {
store_id,
health_controller,
last_feedback_time: None,
seq,
feedback_interval,
}
}

fn attach_if_needed(&mut self, resp: &mut BatchCommandsResponse) {
let feedback_interval = match self.feedback_interval {
Some(i) => i,
None => return,
};

let now = Instant::now_coarse();

if let Some(last_feedback_time) = self.last_feedback_time
&& now - last_feedback_time < feedback_interval
{
return;
}

self.attach(resp, now);
}

fn attach(&mut self, resp: &mut BatchCommandsResponse, now: Instant) {
self.last_feedback_time = Some(now);
let feedback = resp.mut_health_feedback();
feedback.set_store_id(self.store_id);
feedback.set_feedback_seq_no(self.seq.fetch_add(1, Ordering::Relaxed));
feedback.set_slow_score(self.health_controller.get_raftstore_slow_score() as i32);
}
}

#[cfg(test)]
mod tests {
use std::thread;
Expand Down Expand Up @@ -2486,4 +2567,40 @@ mod tests {
poll_future_notify(task);
assert_eq!(block_on(rx1).unwrap(), 200);
}

#[test]
fn test_health_feedback_attacher() {
let health_controller = HealthController::new();
let test_reporter = health_controller::reporters::TestReporter::new(&health_controller);
let seq = Arc::new(AtomicU64::new(1));

let mut a = HealthFeedbackAttacher::new(1, health_controller.clone(), seq.clone(), None);
let mut resp = BatchCommandsResponse::default();
a.attach_if_needed(&mut resp);
assert!(!resp.has_health_feedback());

let mut a =
HealthFeedbackAttacher::new(1, health_controller, seq, Some(Duration::from_secs(1)));
resp = BatchCommandsResponse::default();
a.attach_if_needed(&mut resp);
assert!(resp.has_health_feedback());
assert_eq!(resp.get_health_feedback().get_store_id(), 1);
assert_eq!(resp.get_health_feedback().get_feedback_seq_no(), 1);
assert_eq!(resp.get_health_feedback().get_slow_score(), 1);

// Skips attaching feedback because last attaching was just done.
test_reporter.set_raftstore_slow_score(50.);
resp = BatchCommandsResponse::default();
a.attach_if_needed(&mut resp);
assert!(!resp.has_health_feedback());

// Simulate elapsing enough time by changing the recorded last update time.
*a.last_feedback_time.as_mut().unwrap() -= Duration::from_millis(1001);
a.attach_if_needed(&mut resp);
assert!(resp.has_health_feedback());
assert_eq!(resp.get_health_feedback().get_store_id(), 1);
// Seq no increased.
assert_eq!(resp.get_health_feedback().get_feedback_seq_no(), 2);
assert_eq!(resp.get_health_feedback().get_slow_score(), 50);
}
}
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn test_serde_custom_tikv_config() {
forward_max_connections_per_address: 5,
reject_messages_on_memory_ratio: 0.8,
simplify_metrics: false,
health_feedback_interval: ReadableDuration::secs(2),
..Default::default()
};
value.readpool = ReadPoolConfig {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ background-thread-count = 999
end-point-slow-log-threshold = "1s"
forward-max-connections-per-address = 5
reject-messages-on-memory-ratio = 0.8
health-feedback-interval = "2s"

[server.labels]
a = "b"
Expand Down
27 changes: 27 additions & 0 deletions tests/integrations/server/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,33 @@ fn test_batch_commands() {
rx.recv_timeout(Duration::from_secs(1)).unwrap();
}

#[test_case(test_raftstore::must_new_cluster_and_kv_client)]
#[test_case(test_raftstore_v2::must_new_cluster_and_kv_client)]
fn test_health_feedback() {
let (_cluster, client, _ctx) = new_cluster();
let (mut sender, mut receiver) = client.batch_commands().unwrap();

let mut batch_req = BatchCommandsRequest::default();
batch_req.mut_requests().push(Default::default());
batch_req.mut_request_ids().push(1);

block_on(sender.send((batch_req.clone(), WriteFlags::default()))).unwrap();
let resp = block_on(receiver.next()).unwrap().unwrap();
assert!(resp.has_health_feedback());

block_on(sender.send((batch_req.clone(), WriteFlags::default()))).unwrap();
let resp = block_on(receiver.next()).unwrap().unwrap();
assert!(!resp.has_health_feedback());

thread::sleep(Duration::from_millis(1100));
block_on(sender.send((batch_req, WriteFlags::default()))).unwrap();
let resp = block_on(receiver.next()).unwrap().unwrap();
assert!(resp.has_health_feedback());

block_on(sender.close()).unwrap();
block_on(receiver.for_each(|_| future::ready(())));
}

#[test_case(test_raftstore::must_new_cluster_and_kv_client)]
#[test_case(test_raftstore_v2::must_new_cluster_and_kv_client)]
fn test_empty_commands() {
Expand Down

0 comments on commit b5e0563

Please sign in to comment.