From f3dd8f27129a418a0d30642f8e7da2235d37cfaf Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 19 Sep 2024 17:34:03 +0200 Subject: [PATCH] Debounce shard pruning request (#5374) * Add debounce to shard prune request * Use pruning interval as control plane cooldown * Address smaller review comments * Simplify debounce logic * Use LruCache for debounce map * Refactor and test CooldownMap * Revert grpc number reuse * Rename grpc field interval to interval_secs --- quickwit/Cargo.lock | 1 + quickwit/quickwit-control-plane/Cargo.toml | 1 + .../src/control_plane.rs | 45 +++- .../src/cooldown_map.rs | 164 ++++++++++++ .../quickwit-control-plane/src/debouncer.rs | 14 +- quickwit/quickwit-control-plane/src/lib.rs | 1 + .../src/source/queue_sources/shared_state.rs | 1 + .../src/metastore/control_plane_metastore.rs | 18 +- .../file_backed/file_backed_index/mod.rs | 3 +- .../file_backed/file_backed_index/shards.rs | 32 +-- .../src/metastore/file_backed/mod.rs | 16 +- .../src/metastore/postgres/metastore.rs | 17 +- .../quickwit-metastore/src/tests/shard.rs | 26 +- .../protos/quickwit/control_plane.proto | 3 + .../protos/quickwit/metastore.proto | 9 +- .../quickwit/quickwit.control_plane.rs | 245 ++++++++++++++++++ .../codegen/quickwit/quickwit.metastore.rs | 62 ++--- quickwit/quickwit-proto/src/getters.rs | 1 - 18 files changed, 533 insertions(+), 126 deletions(-) create mode 100644 quickwit/quickwit-control-plane/src/cooldown_map.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6ad038dc458..233fa27d139 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5865,6 +5865,7 @@ dependencies = [ "fnv", "futures", "itertools 0.13.0", + "lru", "mockall", "once_cell", "proptest", diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 024704736e0..a9b2da54615 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -17,6 +17,7 @@ bytesize = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +lru = { workspace = true } mockall = { workspace = true, optional = true } once_cell = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 418c538c642..b24764c8ec9 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -20,6 +20,7 @@ use std::collections::{BTreeSet, HashMap}; use std::fmt; use std::fmt::Formatter; +use std::num::NonZeroUsize; use std::time::Duration; use anyhow::Context; @@ -49,14 +50,15 @@ use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest, IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, - MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest, + MetastoreServiceClient, PruneShardsRequest, ToggleSourceRequest, UpdateIndexRequest, }; -use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid}; use serde::Serialize; use serde_json::{json, Value as JsonValue}; use tokio::sync::watch; use tracing::{debug, error, info}; +use crate::cooldown_map::{CooldownMap, CooldownStatus}; use crate::debouncer::Debouncer; use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::ingest::ingest_controller::{IngestControllerStats, RebalanceShardsCallback}; @@ -71,13 +73,16 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur Duration::from_secs(5) }; +/// Minimum period between two identical shard pruning operations. +const PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD: Duration = Duration::from_secs(120); + /// Minimum period between two rebuild plan operations. const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2); #[derive(Debug)] struct ControlPlanLoop; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] struct RebuildPlan; pub struct ControlPlane { @@ -94,6 +99,7 @@ pub struct ControlPlane { ingest_controller: IngestController, metastore: MetastoreServiceClient, model: ControlPlaneModel, + prune_shard_cooldown: CooldownMap<(IndexId, SourceId)>, rebuild_plan_debouncer: Debouncer, readiness_tx: watch::Sender, // Disables the control loop. This is useful for unit testing. @@ -177,6 +183,7 @@ impl ControlPlane { ingest_controller, metastore: metastore.clone(), model: Default::default(), + prune_shard_cooldown: CooldownMap::new(NonZeroUsize::new(1024).unwrap()), rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD), readiness_tx, disable_control_loop, @@ -772,6 +779,38 @@ impl Handler for ControlPlane { } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = ControlPlaneResult; + + async fn handle( + &mut self, + request: PruneShardsRequest, + _ctx: &ActorContext, + ) -> Result, ActorExitStatus> { + let interval = request + .interval_secs + .map(|interval_secs| Duration::from_secs(interval_secs as u64)) + .unwrap_or_else(|| PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD); + + // A very basic debounce is enough here, missing one call to the pruning API is fine + let status = self.prune_shard_cooldown.update( + ( + request.index_uid().index_id.clone(), + request.source_id.clone(), + ), + interval, + ); + if let CooldownStatus::Ready = status { + if let Err(metastore_error) = self.metastore.prune_shards(request).await { + return convert_metastore_error(metastore_error); + }; + } + // Return ok regardless of whether the call was successful or debounced + Ok(Ok(EmptyResponse {})) + } +} + // This is neither a proxied call nor a metastore callback. #[async_trait] impl Handler for ControlPlane { diff --git a/quickwit/quickwit-control-plane/src/cooldown_map.rs b/quickwit/quickwit-control-plane/src/cooldown_map.rs new file mode 100644 index 00000000000..98acc5a8e98 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/cooldown_map.rs @@ -0,0 +1,164 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt::Debug; +use std::hash::Hash; +use std::num::NonZeroUsize; +use std::time::{Duration, Instant}; + +use lru::LruCache; + +/// A map that keeps track of a cooldown deadline for each of its keys. +/// +/// Internally it uses an [`LruCache`] to prune the oldest entries when the +/// capacity is reached. If the capacity is reached but the oldest entry is not +/// outdated, the capacity is extended (2x). +pub struct CooldownMap(LruCache); + +#[derive(Debug, PartialEq)] +pub enum CooldownStatus { + Ready, + InCooldown, +} + +impl CooldownMap { + pub fn new(capacity: NonZeroUsize) -> Self { + Self(LruCache::new(capacity)) + } + + /// Updates the deadline for the given key if it isn't currently in cooldown. + /// + /// The status returned is the one before the update (after an update, the + /// status is always `InCooldown`). + pub fn update(&mut self, key: K, cooldown_interval: Duration) -> CooldownStatus { + let deadline_opt = self.0.get_mut(&key); + let now = Instant::now(); + if let Some(deadline) = deadline_opt { + if *deadline > now { + CooldownStatus::InCooldown + } else { + *deadline = now + cooldown_interval; + CooldownStatus::Ready + } + } else { + let capacity: usize = self.0.cap().into(); + if self.0.len() == capacity { + if let Some((_, deadline)) = self.0.peek_lru() { + if *deadline > now { + // the oldest entry is not outdated, grow the LRU + self.0.resize(NonZeroUsize::new(capacity * 2).unwrap()); + } + } + } + self.0.push(key, now + cooldown_interval); + CooldownStatus::Ready + } + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cooldown_map_resize() { + let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap()); + let cooldown_interval = Duration::from_secs(1); + assert_eq!( + cooldown_map.update("test_key1", cooldown_interval), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key1", cooldown_interval), + CooldownStatus::InCooldown + ); + assert_eq!( + cooldown_map.update("test_key2", cooldown_interval), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key2", cooldown_interval), + CooldownStatus::InCooldown + ); + // Hitting the capacity, the map should grow transparently + assert_eq!( + cooldown_map.update("test_key3", cooldown_interval), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key1", cooldown_interval), + CooldownStatus::InCooldown + ); + assert_eq!( + cooldown_map.update("test_key2", cooldown_interval), + CooldownStatus::InCooldown + ); + assert_eq!(cooldown_map.0.cap(), NonZeroUsize::new(4).unwrap()); + } + + #[test] + fn test_cooldown_map_expired() { + let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap()); + let cooldown_interval_short = Duration::from_millis(100); + let cooldown_interval_long = Duration::from_secs(5); + + assert_eq!( + cooldown_map.update("test_key_short", cooldown_interval_short), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key_long", cooldown_interval_long), + CooldownStatus::Ready + ); + + std::thread::sleep(cooldown_interval_short.mul_f32(1.1)); + assert_eq!( + cooldown_map.update("test_key_short", cooldown_interval_short), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key_long", cooldown_interval_long), + CooldownStatus::InCooldown + ); + } + + #[test] + fn test_cooldown_map_eviction() { + let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap()); + let cooldown_interval_short = Duration::from_millis(100); + let cooldown_interval_long = Duration::from_secs(5); + + assert_eq!( + cooldown_map.update("test_key_short", cooldown_interval_short), + CooldownStatus::Ready + ); + assert_eq!( + cooldown_map.update("test_key_long_1", cooldown_interval_long), + CooldownStatus::Ready + ); + + // after the cooldown period `test_key_short` should be evicted when adding a new key + std::thread::sleep(cooldown_interval_short.mul_f32(1.1)); + assert_eq!(cooldown_map.0.len(), 2); + assert_eq!( + cooldown_map.update("test_key_long_2", cooldown_interval_long), + CooldownStatus::Ready + ); + assert_eq!(cooldown_map.0.len(), 2); + } +} diff --git a/quickwit/quickwit-control-plane/src/debouncer.rs b/quickwit/quickwit-control-plane/src/debouncer.rs index 9b560ebf557..401fecda679 100644 --- a/quickwit/quickwit-control-plane/src/debouncer.rs +++ b/quickwit/quickwit-control-plane/src/debouncer.rs @@ -20,7 +20,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; -use quickwit_actors::{Actor, ActorContext, DeferableReplyHandler, Handler}; +use quickwit_actors::{Actor, ActorContext, Handler}; /// A debouncer is a helper to debounce events. /// @@ -93,7 +93,7 @@ impl Debouncer { fn emit_message(&self, ctx: &ActorContext) where - A: Actor + Handler + DeferableReplyHandler, + A: Actor + Handler, M: Default + std::fmt::Debug + Send + Sync + 'static, { let _ = ctx.mailbox().send_message_with_high_priority(M::default()); @@ -101,7 +101,7 @@ impl Debouncer { fn schedule_post_cooldown_callback(&self, ctx: &ActorContext) where - A: Actor + Handler + DeferableReplyHandler, + A: Actor + Handler, M: Default + std::fmt::Debug + Send + Sync + 'static, { let ctx_clone = ctx.clone(); @@ -116,12 +116,8 @@ impl Debouncer { .schedule_event(callback, self.cooldown_period); } - pub fn self_send_with_cooldown( - &self, - ctx: &ActorContext + DeferableReplyHandler>, - ) where - M: Default + std::fmt::Debug + Send + Sync + 'static, - { + pub fn self_send_with_cooldown(&self, ctx: &ActorContext>) + where M: Default + std::fmt::Debug + Send + Sync + 'static { let cooldown_state = self.accept_transition(Transition::Emit); match cooldown_state { DebouncerState::NoCooldown => { diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index 9f2b8304556..23647c69270 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -40,6 +40,7 @@ pub struct IndexerNodeInfo { pub type IndexerPool = Pool; +mod cooldown_map; mod debouncer; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index cd356e8f35d..dc73a60880e 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -100,6 +100,7 @@ impl QueueSharedState { source_id: source_id.clone(), max_age_secs, max_count, + interval_secs: Some(pruning_interval.as_secs() as u32), }) .await; if let Err(err) = result { diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 80a576219e1..c5426b7cf19 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -35,8 +35,8 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, - PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, - StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, + ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; @@ -120,6 +120,12 @@ impl MetastoreService for ControlPlaneMetastore { Ok(response) } + // Proxy through the control plane to debounce queries + async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult { + self.control_plane.prune_shards(request).await?; + Ok(EmptyResponse {}) + } + // Other metastore API calls. async fn index_metadata( @@ -237,14 +243,6 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.delete_shards(request).await } - async fn prune_shards( - &self, - request: PruneShardsRequest, - ) -> MetastoreResult { - // TODO this call should go through the control plane which should apply debounce - self.metastore.prune_shards(request).await - } - // Index Template API async fn create_index_template( diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 426a91530b5..265697b0e81 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -37,7 +37,6 @@ use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, - PruneShardsResponse, }; use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId}; use serde::{Deserialize, Serialize}; @@ -667,7 +666,7 @@ impl FileBackedIndex { pub(crate) fn prune_shards( &mut self, request: PruneShardsRequest, - ) -> MetastoreResult> { + ) -> MetastoreResult> { self.get_shards_for_source_mut(&request.source_id)? .prune_shards(request) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index 432911a6c8b..b7875bda8ca 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -26,7 +26,7 @@ use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteShardsRequest, DeleteShardsResponse, EntityKind, ListShardsSubrequest, ListShardsSubresponse, MetastoreError, MetastoreResult, - OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, PruneShardsResponse, + OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest, }; use quickwit_proto::types::{queue_id, IndexUid, Position, PublishToken, ShardId, SourceId}; use time::OffsetDateTime; @@ -242,7 +242,7 @@ impl Shards { pub(super) fn prune_shards( &mut self, request: PruneShardsRequest, - ) -> MetastoreResult> { + ) -> MetastoreResult> { let initial_shard_count = self.shards.len(); if let Some(max_age_secs) = request.max_age_secs { @@ -268,14 +268,10 @@ impl Shards { } } } - let response = PruneShardsResponse { - index_uid: request.index_uid, - source_id: request.source_id, - }; if initial_shard_count > self.shards.len() { - Ok(MutationOccurred::Yes(response)) + Ok(MutationOccurred::Yes(())) } else { - Ok(MutationOccurred::No(response)) + Ok(MutationOccurred::No(())) } } @@ -646,24 +642,22 @@ mod tests { source_id: source_id.clone(), max_age_secs: None, max_count: None, + interval_secs: None, }; - let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else { panic!("expected `MutationOccurred::No`"); }; - assert_eq!(response.index_uid(), &index_uid); - assert_eq!(response.source_id, source_id); let request = PruneShardsRequest { index_uid: Some(index_uid.clone()), source_id: source_id.clone(), max_age_secs: Some(50), max_count: None, + interval_secs: None, }; - let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else { panic!("expected `MutationOccurred::No`"); }; - assert_eq!(response.index_uid(), &index_uid); - assert_eq!(response.source_id, source_id); let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); shards.shards.insert( @@ -696,23 +690,21 @@ mod tests { source_id: source_id.clone(), max_age_secs: Some(150), max_count: None, + interval_secs: None, }; - let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else { + let MutationOccurred::Yes(()) = shards.prune_shards(request).unwrap() else { panic!("expected `MutationOccurred::Yes`"); }; - assert_eq!(response.index_uid(), &index_uid); - assert_eq!(response.source_id, source_id); let request = PruneShardsRequest { index_uid: Some(index_uid.clone()), source_id: source_id.clone(), max_age_secs: Some(150), max_count: None, + interval_secs: None, }; - let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else { + let MutationOccurred::No(()) = shards.prune_shards(request).unwrap() else { panic!("expected `MutationOccurred::No`"); }; - assert_eq!(response.index_uid(), &index_uid); - assert_eq!(response.source_id, source_id); } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 673c3a6be37..729e10b6fe0 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -58,9 +58,9 @@ use quickwit_proto::metastore::{ ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest, - OpenShardsResponse, PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, - ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, - UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; @@ -892,15 +892,11 @@ impl MetastoreService for FileBackedMetastore { Ok(response) } - async fn prune_shards( - &self, - request: PruneShardsRequest, - ) -> MetastoreResult { + async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult { let index_uid = request.index_uid().clone(); - let response = self - .mutate(&index_uid, |index| index.prune_shards(request)) + self.mutate(&index_uid, |index| index.prune_shards(request)) .await?; - Ok(response) + Ok(EmptyResponse {}) } async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index d5234b39837..1714ce00d65 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -46,9 +46,8 @@ use quickwit_proto::metastore::{ ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, - PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, - UpdateSplitsDeleteOpstampResponse, + PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId}; use sea_query::{Alias, Asterisk, Expr, Func, PostgresQueryBuilder, Query, UnionType}; @@ -1497,10 +1496,7 @@ impl MetastoreService for PostgresqlMetastore { Ok(response) } - async fn prune_shards( - &self, - request: PruneShardsRequest, - ) -> MetastoreResult { + async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult { const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql"); const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql"); @@ -1523,12 +1519,7 @@ impl MetastoreService for PostgresqlMetastore { .execute(&self.connection_pool) .await?; } - - let response = PruneShardsResponse { - index_uid: request.index_uid, - source_id: request.source_id, - }; - Ok(response) + Ok(EmptyResponse {}) } // Index Template API diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index 663d8e9bb7e..f8597e84a90 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -654,10 +654,9 @@ pub async fn test_metastore_prune_shards< source_id: test_index.source_id.clone(), max_age_secs: None, max_count: None, + interval_secs: None, }; - let response = metastore.prune_shards(prune_index_request).await.unwrap(); - assert_eq!(response.index_uid(), &test_index.index_uid); - assert_eq!(response.source_id, test_index.source_id); + metastore.prune_shards(prune_index_request).await.unwrap(); let all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; @@ -671,10 +670,10 @@ pub async fn test_metastore_prune_shards< source_id: test_index.source_id.clone(), max_age_secs: Some(oldest_shard_age - 350), max_count: None, + interval_secs: None, }; - let response = metastore.prune_shards(prune_index_request).await.unwrap(); - assert_eq!(response.index_uid(), &test_index.index_uid); - assert_eq!(response.source_id, test_index.source_id); + metastore.prune_shards(prune_index_request).await.unwrap(); + let mut all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; @@ -691,10 +690,9 @@ pub async fn test_metastore_prune_shards< source_id: test_index.source_id.clone(), max_age_secs: None, max_count: Some(90), + interval_secs: None, }; - let response = metastore.prune_shards(prune_index_request).await.unwrap(); - assert_eq!(response.index_uid(), &test_index.index_uid); - assert_eq!(response.source_id, test_index.source_id); + metastore.prune_shards(prune_index_request).await.unwrap(); let mut all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; @@ -710,10 +708,9 @@ pub async fn test_metastore_prune_shards< source_id: test_index.source_id.clone(), max_age_secs: Some(oldest_shard_age - 2950), max_count: Some(80), + interval_secs: None, }; - let response = metastore.prune_shards(prune_index_request).await.unwrap(); - assert_eq!(response.index_uid(), &test_index.index_uid); - assert_eq!(response.source_id, test_index.source_id); + metastore.prune_shards(prune_index_request).await.unwrap(); let all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; @@ -725,10 +722,9 @@ pub async fn test_metastore_prune_shards< source_id: test_index.source_id.clone(), max_age_secs: Some(oldest_shard_age - 4000), max_count: Some(50), + interval_secs: None, }; - let response = metastore.prune_shards(prune_index_request).await.unwrap(); - assert_eq!(response.index_uid(), &test_index.index_uid); - assert_eq!(response.source_id, test_index.source_id); + metastore.prune_shards(prune_index_request).await.unwrap(); let all_shards = metastore .list_all_shards(&test_index.index_uid, &test_index.source_id) .await; diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 963734cd157..285732e2669 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -68,6 +68,9 @@ service ControlPlaneService { // Asks the control plane whether the shards listed in the request should be deleted or truncated. rpc AdviseResetShards(AdviseResetShardsRequest) returns (AdviseResetShardsResponse); + + // Performs a debounced shard pruning request to the metastore. + rpc PruneShards(quickwit.metastore.PruneShardsRequest) returns (quickwit.metastore.EmptyResponse); } // Shard API diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index bf7e6a21e6e..8ae7a8a4400 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -178,7 +178,7 @@ service MetastoreService { rpc DeleteShards(DeleteShardsRequest) returns (DeleteShardsResponse); // Deletes outdated shards. This RPC deletes the shards from the metastore. - rpc PruneShards(PruneShardsRequest) returns (PruneShardsResponse); + rpc PruneShards(PruneShardsRequest) returns (EmptyResponse); rpc ListShards(ListShardsRequest) returns (ListShardsResponse); @@ -461,11 +461,8 @@ message PruneShardsRequest { optional uint32 max_age_secs = 5; // The maximum number of the shards to keep. Delete older shards first. optional uint32 max_count = 6; -} - -message PruneShardsResponse { - quickwit.common.IndexUid index_uid = 1; - string source_id = 2; + // The interval between two pruning operations, in seconds. + optional uint32 interval_secs = 7; } message ListShardsRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 29d91cf6de0..c14ef724de0 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -175,6 +175,11 @@ pub trait ControlPlaneService: std::fmt::Debug + Send + Sync + 'static { &self, request: AdviseResetShardsRequest, ) -> crate::control_plane::ControlPlaneResult; + /// Performs a debounced shard pruning request to the metastore. + async fn prune_shards( + &self, + request: super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult; } #[derive(Debug, Clone)] pub struct ControlPlaneServiceClient { @@ -319,6 +324,12 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.0.advise_reset_shards(request).await } + async fn prune_shards( + &self, + request: super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner.0.prune_shards(request).await + } } #[cfg(any(test, feature = "testsuite"))] pub mod mock_control_plane_service { @@ -391,6 +402,14 @@ pub mod mock_control_plane_service { ) -> crate::control_plane::ControlPlaneResult { self.inner.lock().await.advise_reset_shards(request).await } + async fn prune_shards( + &self, + request: super::super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult< + super::super::metastore::EmptyResponse, + > { + self.inner.lock().await.prune_shards(request).await + } } } pub type BoxFuture = std::pin::Pin< @@ -530,6 +549,23 @@ impl tower::Service for InnerControlPlaneServiceClient Box::pin(fut) } } +impl tower::Service +for InnerControlPlaneServiceClient { + type Response = super::metastore::EmptyResponse; + type Error = crate::control_plane::ControlPlaneError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: super::metastore::PruneShardsRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.prune_shards(request).await }; + Box::pin(fut) + } +} /// A tower service stack is a set of tower services. #[derive(Debug)] struct ControlPlaneServiceTowerServiceStack { @@ -575,6 +611,11 @@ struct ControlPlaneServiceTowerServiceStack { AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, + prune_shards_svc: quickwit_common::tower::BoxService< + super::metastore::PruneShardsRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, } #[async_trait::async_trait] impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { @@ -630,6 +671,12 @@ impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { ) -> crate::control_plane::ControlPlaneResult { self.advise_reset_shards_svc.clone().ready().await?.call(request).await } + async fn prune_shards( + &self, + request: super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.prune_shards_svc.clone().ready().await?.call(request).await + } } type CreateIndexLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< @@ -711,6 +758,16 @@ type AdviseResetShardsLayer = quickwit_common::tower::BoxLayer< AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >; +type PruneShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::PruneShardsRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + super::metastore::PruneShardsRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, +>; #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerLayerStack { create_index_layers: Vec, @@ -721,6 +778,7 @@ pub struct ControlPlaneServiceTowerLayerStack { delete_source_layers: Vec, get_or_create_open_shards_layers: Vec, advise_reset_shards_layers: Vec, + prune_shards_layers: Vec, } impl ControlPlaneServiceTowerLayerStack { pub fn stack_layer(mut self, layer: L) -> Self @@ -939,6 +997,33 @@ impl ControlPlaneServiceTowerLayerStack { crate::control_plane::ControlPlaneError, >, >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::PruneShardsRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + super::metastore::PruneShardsRequest, + Response = super::metastore::EmptyResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service< + super::metastore::PruneShardsRequest, + >>::Future: Send + 'static, { self.create_index_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); @@ -956,6 +1041,8 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.advise_reset_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.prune_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } pub fn stack_create_index_layer(mut self, layer: L) -> Self @@ -1126,6 +1213,27 @@ impl ControlPlaneServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer)); self } + pub fn stack_prune_shards_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::PruneShardsRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + super::metastore::PruneShardsRequest, + Response = super::metastore::EmptyResponse, + Error = crate::control_plane::ControlPlaneError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.prune_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -1249,6 +1357,14 @@ impl ControlPlaneServiceTowerLayerStack { quickwit_common::tower::BoxService::new(inner_client.clone()), |svc, layer| layer.layer(svc), ); + let prune_shards_svc = self + .prune_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: inner_client, create_index_svc, @@ -1259,6 +1375,7 @@ impl ControlPlaneServiceTowerLayerStack { delete_source_svc, get_or_create_open_shards_svc, advise_reset_shards_svc, + prune_shards_svc, }; ControlPlaneServiceClient::new(tower_svc_stack) } @@ -1406,6 +1523,15 @@ where AdviseResetShardsResponse, crate::control_plane::ControlPlaneError, >, + > + + tower::Service< + super::metastore::PruneShardsRequest, + Response = super::metastore::EmptyResponse, + Error = crate::control_plane::ControlPlaneError, + Future = BoxFuture< + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, >, { async fn create_index( @@ -1460,6 +1586,12 @@ where ) -> crate::control_plane::ControlPlaneResult { self.clone().call(request).await } + async fn prune_shards( + &self, + request: super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.clone().call(request).await + } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1611,6 +1743,20 @@ where AdviseResetShardsRequest::rpc_name(), )) } + async fn prune_shards( + &self, + request: super::metastore::PruneShardsRequest, + ) -> crate::control_plane::ControlPlaneResult { + self.inner + .clone() + .prune_shards(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + super::metastore::PruneShardsRequest::rpc_name(), + )) + } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -1720,6 +1866,17 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } + async fn prune_shards( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .prune_shards(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -2061,6 +2218,37 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Performs a debounced shard pruning request to the metastore. + pub async fn prune_shards( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.control_plane.ControlPlaneService/PruneShards", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "quickwit.control_plane.ControlPlaneService", + "PruneShards", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -2135,6 +2323,14 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; + /// Performs a debounced shard pruning request to the metastore. + async fn prune_shards( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -2602,6 +2798,55 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } + "/quickwit.control_plane.ControlPlaneService/PruneShards" => { + #[allow(non_camel_case_types)] + struct PruneShardsSvc(pub Arc); + impl< + T: ControlPlaneServiceGrpc, + > tonic::server::UnaryService< + super::super::metastore::PruneShardsRequest, + > for PruneShardsSvc { + type Response = super::super::metastore::EmptyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::super::metastore::PruneShardsRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).prune_shards(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PruneShardsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index e3bbe152c3c..08b12006db3 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -422,15 +422,9 @@ pub struct PruneShardsRequest { /// The maximum number of the shards to keep. Delete older shards first. #[prost(uint32, optional, tag = "6")] pub max_count: ::core::option::Option, -} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PruneShardsResponse { - #[prost(message, optional, tag = "1")] - pub index_uid: ::core::option::Option, - #[prost(string, tag = "2")] - pub source_id: ::prost::alloc::string::String, + /// The interval between two pruning operations, in seconds. + #[prost(uint32, optional, tag = "7")] + pub interval_secs: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -933,7 +927,7 @@ pub trait MetastoreService: std::fmt::Debug + Send + Sync + 'static { async fn prune_shards( &self, request: PruneShardsRequest, - ) -> crate::metastore::MetastoreResult; + ) -> crate::metastore::MetastoreResult; async fn list_shards( &self, request: ListShardsRequest, @@ -1198,7 +1192,7 @@ impl MetastoreService for MetastoreServiceClient { async fn prune_shards( &self, request: PruneShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner.0.prune_shards(request).await } async fn list_shards( @@ -1398,7 +1392,7 @@ pub mod mock_metastore_service { async fn prune_shards( &self, request: super::PruneShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner.lock().await.prune_shards(request).await } async fn list_shards( @@ -1817,7 +1811,7 @@ impl tower::Service for InnerMetastoreServiceClient { } } impl tower::Service for InnerMetastoreServiceClient { - type Response = PruneShardsResponse; + type Response = EmptyResponse; type Error = crate::metastore::MetastoreError; type Future = BoxFuture; fn poll_ready( @@ -2050,7 +2044,7 @@ struct MetastoreServiceTowerServiceStack { >, prune_shards_svc: quickwit_common::tower::BoxService< PruneShardsRequest, - PruneShardsResponse, + EmptyResponse, crate::metastore::MetastoreError, >, list_shards_svc: quickwit_common::tower::BoxService< @@ -2227,7 +2221,7 @@ impl MetastoreService for MetastoreServiceTowerServiceStack { async fn prune_shards( &self, request: PruneShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.prune_shards_svc.clone().ready().await?.call(request).await } async fn list_shards( @@ -2506,11 +2500,11 @@ type DeleteShardsLayer = quickwit_common::tower::BoxLayer< type PruneShardsLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< PruneShardsRequest, - PruneShardsResponse, + EmptyResponse, crate::metastore::MetastoreError, >, PruneShardsRequest, - PruneShardsResponse, + EmptyResponse, crate::metastore::MetastoreError, >; type ListShardsLayer = quickwit_common::tower::BoxLayer< @@ -3195,25 +3189,25 @@ impl MetastoreServiceTowerLayerStack { L: tower::Layer< quickwit_common::tower::BoxService< PruneShardsRequest, - PruneShardsResponse, + EmptyResponse, crate::metastore::MetastoreError, >, > + Clone + Send + Sync + 'static, , >>::Service: tower::Service< PruneShardsRequest, - Response = PruneShardsResponse, + Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, <, >>::Service as tower::Service>::Future: Send + 'static, @@ -3894,13 +3888,13 @@ impl MetastoreServiceTowerLayerStack { L: tower::Layer< quickwit_common::tower::BoxService< PruneShardsRequest, - PruneShardsResponse, + EmptyResponse, crate::metastore::MetastoreError, >, > + Send + Sync + 'static, L::Service: tower::Service< PruneShardsRequest, - Response = PruneShardsResponse, + Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, @@ -4591,9 +4585,9 @@ where > + tower::Service< PruneShardsRequest, - Response = PruneShardsResponse, + Response = EmptyResponse, Error = crate::metastore::MetastoreError, - Future = BoxFuture, + Future = BoxFuture, > + tower::Service< ListShardsRequest, @@ -4782,7 +4776,7 @@ where async fn prune_shards( &self, request: PruneShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.clone().call(request).await } async fn list_shards( @@ -5201,7 +5195,7 @@ where async fn prune_shards( &self, request: PruneShardsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult { self.inner .clone() .prune_shards(request) @@ -5590,7 +5584,7 @@ for MetastoreServiceGrpcServerAdapter { async fn prune_shards( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { self.inner .0 .prune_shards(request.into_inner()) @@ -6479,10 +6473,7 @@ pub mod metastore_service_grpc_client { pub async fn prune_shards( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { + ) -> std::result::Result, tonic::Status> { self.inner .ready() .await @@ -6866,10 +6857,7 @@ pub mod metastore_service_grpc_server { async fn prune_shards( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn list_shards( &self, request: tonic::Request, @@ -8103,7 +8091,7 @@ pub mod metastore_service_grpc_server { T: MetastoreServiceGrpc, > tonic::server::UnaryService for PruneShardsSvc { - type Response = super::PruneShardsResponse; + type Response = super::EmptyResponse; type Future = BoxFuture< tonic::Response, tonic::Status, diff --git a/quickwit/quickwit-proto/src/getters.rs b/quickwit/quickwit-proto/src/getters.rs index ceda0908f03..15ca7235fbe 100644 --- a/quickwit/quickwit-proto/src/getters.rs +++ b/quickwit/quickwit-proto/src/getters.rs @@ -135,7 +135,6 @@ generate_getters! { MarkSplitsForDeletionRequest, OpenShardSubrequest, PruneShardsRequest, - PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,