diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index c0999d82315..90f95e7b8b3 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -340,13 +340,20 @@ impl Inbox { self.rx.try_recv() } - pub async fn recv_typed_message(&self) -> Option { - while let Ok(mut envelope) = self.rx.recv().await { - if let Some(msg) = envelope.message_typed() { - return Some(msg); + #[cfg(any(test, feature = "testsuite"))] + pub async fn recv_typed_message(&self) -> Result { + loop { + match self.rx.recv().await { + Ok(mut envelope) => { + if let Some(msg) = envelope.message_typed() { + return Ok(msg); + } + } + Err(err) => { + return Err(err); + } } } - None } /// Destroys the inbox and returns the list of pending messages or commands diff --git a/quickwit/quickwit-actors/src/tests.rs b/quickwit/quickwit-actors/src/tests.rs index 6a916b364e1..4e006ce3984 100644 --- a/quickwit/quickwit-actors/src/tests.rs +++ b/quickwit/quickwit-actors/src/tests.rs @@ -23,6 +23,7 @@ use std::ops::Mul; use std::time::Duration; use async_trait::async_trait; +use quickwit_common::new_coolid; use serde::Serialize; use crate::observation::ObservationType; @@ -725,3 +726,49 @@ async fn test_unsync_actor_message() { universe.assert_quit().await; } + +struct FakeActorService { + // We use a cool id to make sure in the test that we get twice the same instance. + cool_id: String, +} + +#[derive(Debug)] +struct GetCoolId; + +impl Actor for FakeActorService { + type ObservableState = (); + + fn observable_state(&self) {} +} + +#[async_trait] +impl Handler for FakeActorService { + type Reply = String; + + async fn handle( + &mut self, + _: GetCoolId, + _ctx: &ActorContext, + ) -> Result { + Ok(self.cool_id.clone()) + } +} + +impl Default for FakeActorService { + fn default() -> Self { + FakeActorService { + cool_id: new_coolid("fake-actor"), + } + } +} + +#[tokio::test] +async fn test_get_or_spawn() { + let universe = Universe::new(); + let mailbox1: Mailbox = universe.get_or_spawn_one(); + let id1 = mailbox1.ask(GetCoolId).await.unwrap(); + let mailbox2: Mailbox = universe.get_or_spawn_one(); + let id2 = mailbox2.ask(GetCoolId).await.unwrap(); + assert_eq!(id1, id2); + universe.assert_quit().await; +} diff --git a/quickwit/quickwit-actors/src/universe.rs b/quickwit/quickwit-actors/src/universe.rs index 04869a351a8..e907feb29e8 100644 --- a/quickwit/quickwit-actors/src/universe.rs +++ b/quickwit/quickwit-actors/src/universe.rs @@ -89,6 +89,16 @@ impl Universe { self.spawn_ctx.registry.get_one::() } + pub fn get_or_spawn_one(&self) -> Mailbox { + if let Some(actor_mailbox) = self.spawn_ctx.registry.get_one::() { + actor_mailbox + } else { + let actor_default = A::default(); + let (mailbox, _handler) = self.spawn_builder().spawn(actor_default); + mailbox + } + } + pub async fn observe(&self, timeout: Duration) -> Vec { self.spawn_ctx.registry.observe(timeout).await } diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 81c8c4b3aa4..f22026494d9 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -29,7 +29,7 @@ use anyhow::{bail, Context}; use clap::{arg, ArgMatches, Command}; use colored::{ColoredString, Colorize}; use humantime::format_duration; -use quickwit_actors::{ActorExitStatus, ActorHandle, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe}; use quickwit_cluster::{ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimesConfig; @@ -40,7 +40,9 @@ use quickwit_config::{ VecSourceParams, CLI_INGEST_SOURCE_ID, }; use quickwit_index_management::{clear_cache_directory, IndexService}; -use quickwit_indexing::actors::{IndexingService, MergePipeline, MergePipelineId}; +use quickwit_indexing::actors::{ + IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService, +}; use quickwit_indexing::models::{ DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, }; @@ -451,6 +453,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< runtimes_config, &HashSet::from_iter([QuickwitService::Indexer]), )?; + let universe = Universe::new(); + let merge_scheduler_service_mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( config.node_id.clone(), config.data_dir_path.clone(), @@ -459,12 +463,12 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< cluster, metastore, None, + merge_scheduler_service_mailbox, IngesterPool::default(), storage_resolver, EventBroker::default(), ) .await?; - let universe = Universe::new(); let (indexing_server_mailbox, indexing_server_handle) = universe.spawn_builder().spawn(indexing_server); let pipeline_id = indexing_server_mailbox @@ -580,10 +584,9 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { runtimes_config, &HashSet::from_iter([QuickwitService::Indexer]), )?; + let indexer_config = IndexerConfig::default(); let universe = Universe::new(); - let indexer_config = IndexerConfig { - ..Default::default() - }; + let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( config.node_id, config.data_dir_path, @@ -592,6 +595,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { cluster, metastore, None, + merge_scheduler_service, IngesterPool::default(), storage_resolver, EventBroker::default(), diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.json b/quickwit/quickwit-config/resources/tests/config/quickwit.json index 828ea718d59..fe4ed3050e5 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.json @@ -48,7 +48,8 @@ "split_store_max_num_bytes": "1T", "split_store_max_num_splits": 10000, "max_concurrent_split_uploads": 8, - "max_merge_write_throughput": "100mb" + "max_merge_write_throughput": "100mb", + "merge_concurrency": 2 }, "ingest_api": { "replication_factor": 2 diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.toml b/quickwit/quickwit-config/resources/tests/config/quickwit.toml index 49f582fc0e0..1c6cb0b57f8 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.toml @@ -39,6 +39,7 @@ split_store_max_num_bytes = "1T" split_store_max_num_splits = 10_000 max_concurrent_split_uploads = 8 max_merge_write_throughput = "100mb" +merge_concurrency = 2 [ingest_api] replication_factor = 2 diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml index 4581efed515..9a829d2306f 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml @@ -43,6 +43,7 @@ indexer: split_store_max_num_splits: 10000 max_concurrent_split_uploads: 8 max_merge_write_throughput: 100mb + merge_concurrency: 2 ingest_api: replication_factor: 2 diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 8f2ed5d5c0e..4f788ea7c48 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -96,6 +96,10 @@ pub struct IndexerConfig { /// does not starve indexing itself (as it is a latency sensitive operation). #[serde(default)] pub max_merge_write_throughput: Option, + /// Maximum number of merge or delete operation that can be executed concurrently. + /// (defaults to num_cpu / 2). + #[serde(default = "IndexerConfig::default_merge_concurrency")] + pub merge_concurrency: NonZeroUsize, /// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry /// Protocol (OTLP). #[serde(default = "IndexerConfig::default_enable_otlp_endpoint")] @@ -134,6 +138,10 @@ impl IndexerConfig { 1_000 } + pub fn default_merge_concurrency() -> NonZeroUsize { + NonZeroUsize::new(num_cpus::get() / 2).unwrap_or(NonZeroUsize::new(1).unwrap()) + } + fn default_cpu_capacity() -> CpuCapacity { CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32) } @@ -149,6 +157,7 @@ impl IndexerConfig { max_concurrent_split_uploads: 4, cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32, max_merge_write_throughput: None, + merge_concurrency: NonZeroUsize::new(3).unwrap(), }; Ok(indexer_config) } @@ -163,6 +172,7 @@ impl Default for IndexerConfig { split_store_max_num_splits: Self::default_split_store_max_num_splits(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), cpu_capacity: Self::default_cpu_capacity(), + merge_concurrency: Self::default_merge_concurrency(), max_merge_write_throughput: None, } } @@ -476,6 +486,23 @@ mod tests { "1500m" ); } + { + let indexer_config: IndexerConfig = + serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap(); + assert_eq!( + indexer_config.merge_concurrency, + NonZeroUsize::new(5).unwrap() + ); + let indexer_config_json = serde_json::to_value(&indexer_config).unwrap(); + assert_eq!( + indexer_config_json + .get("merge_concurrency") + .unwrap() + .as_u64() + .unwrap(), + 5 + ); + } { let indexer_config: IndexerConfig = serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap(); diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 240ba88ef03..c0b3bfb8434 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -455,7 +455,7 @@ pub fn node_config_for_test() -> NodeConfig { mod tests { use std::env; use std::net::Ipv4Addr; - use std::num::NonZeroU64; + use std::num::{NonZeroU64, NonZeroUsize}; use std::path::Path; use bytesize::ByteSize; @@ -554,6 +554,7 @@ mod tests { split_store_max_num_bytes: ByteSize::tb(1), split_store_max_num_splits: 10_000, max_concurrent_split_uploads: 8, + merge_concurrency: NonZeroUsize::new(2).unwrap(), cpu_capacity: IndexerConfig::default_cpu_capacity(), enable_cooperative_indexing: false, max_merge_write_throughput: Some(ByteSize::mb(100)), diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs index 47960384a45..d9a28cac8a4 100644 --- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs +++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs @@ -95,7 +95,7 @@ impl Handler for IndexSerializer { checkpoint_delta_opt: batch_builder.checkpoint_delta_opt, publish_lock: batch_builder.publish_lock, publish_token_opt: batch_builder.publish_token_opt, - merge_operation_opt: None, + merge_task_opt: None, batch_parent_span: batch_builder.batch_parent_span, }; ctx.send_message(&self.packager_mailbox, indexed_split_batch) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 769d8c4e946..f581ae20456 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -898,6 +898,7 @@ mod tests { merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, + merge_scheduler_service: universe.get_or_spawn_one(), event_broker: Default::default(), }; let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx()); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 81d2553ea44..e23e90340d4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -57,7 +57,7 @@ use tokio::sync::Semaphore; use tracing::{debug, error, info, warn}; use super::merge_pipeline::{MergePipeline, MergePipelineParams}; -use super::MergePlanner; +use super::{MergePlanner, MergeSchedulerService}; use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline}; use crate::source::{AssignShards, Assignment}; use crate::split_store::{LocalSplitStore, SplitStoreQuota}; @@ -121,6 +121,7 @@ pub struct IndexingService { cluster: Cluster, metastore: MetastoreServiceClient, ingest_api_service_opt: Option>, + merge_scheduler_service: Mailbox, ingester_pool: IngesterPool, storage_resolver: StorageResolver, indexing_pipelines: HashMap, @@ -154,6 +155,7 @@ impl IndexingService { cluster: Cluster, metastore: MetastoreServiceClient, ingest_api_service_opt: Option>, + merge_scheduler_service: Mailbox, ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, @@ -182,6 +184,7 @@ impl IndexingService { cluster, metastore, ingest_api_service_opt, + merge_scheduler_service, ingester_pool, storage_resolver, local_split_store: Arc::new(local_split_store), @@ -297,6 +300,7 @@ impl IndexingService { indexing_directory: indexing_directory.clone(), metastore: self.metastore.clone(), split_store: split_store.clone(), + merge_scheduler_service: self.merge_scheduler_service.clone(), merge_policy: merge_policy.clone(), merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, @@ -893,6 +897,7 @@ mod tests { init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default()) .await .unwrap(); + let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( "test-node".to_string(), data_dir_path.to_path_buf(), @@ -901,6 +906,7 @@ mod tests { cluster, metastore, Some(ingest_api_service), + merge_scheduler_mailbox, IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), @@ -1345,6 +1351,7 @@ mod tests { init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) .await .unwrap(); + let merge_scheduler_service = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( "test-node".to_string(), data_dir_path, @@ -1353,6 +1360,7 @@ mod tests { cluster.clone(), metastore.clone(), Some(ingest_api_service), + merge_scheduler_service, IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), @@ -1548,6 +1556,7 @@ mod tests { let indexer_config = IndexerConfig::for_test().unwrap(); let num_blocking_threads = 1; let storage_resolver = StorageResolver::unconfigured(); + let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); let mut indexing_server = IndexingService::new( "test-ingest-api-gc-node".to_string(), data_dir_path, @@ -1556,6 +1565,7 @@ mod tests { cluster.clone(), metastore.clone(), Some(ingest_api_service.clone()), + merge_scheduler_service, IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 030180d4a6f..8dfa0de312a 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -85,19 +85,19 @@ impl Actor for MergeExecutor { impl Handler for MergeExecutor { type Reply = (); - #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_operation.merge_parent_span.id(), skip_all)] + #[instrument(level = "info", name = "merge_executor", parent = merge_scratch.merge_task.merge_parent_span.id(), skip_all)] async fn handle( &mut self, merge_scratch: MergeScratch, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { let start = Instant::now(); - let merge_op = merge_scratch.merge_operation; - let indexed_split_opt: Option = match merge_op.operation_type { + let merge_task = merge_scratch.merge_task; + let indexed_split_opt: Option = match merge_task.operation_type { MergeOperationType::Merge => Some( self.process_merge( - merge_op.merge_split_id.clone(), - merge_op.splits.clone(), + merge_task.merge_split_id.clone(), + merge_task.splits.clone(), merge_scratch.tantivy_dirs, merge_scratch.merge_scratch_directory, ctx, @@ -106,14 +106,14 @@ impl Handler for MergeExecutor { ), MergeOperationType::DeleteAndMerge => { assert_eq!( - merge_op.splits.len(), + merge_task.splits.len(), 1, "Delete tasks can be applied only on one split." ); assert_eq!(merge_scratch.tantivy_dirs.len(), 1); - let split_with_docs_to_delete = merge_op.splits[0].clone(); + let split_with_docs_to_delete = merge_task.splits[0].clone(); self.process_delete_and_merge( - merge_op.merge_split_id.clone(), + merge_task.merge_split_id.clone(), split_with_docs_to_delete, merge_scratch.tantivy_dirs, merge_scratch.merge_scratch_directory, @@ -126,7 +126,7 @@ impl Handler for MergeExecutor { info!( merged_num_docs = %indexed_split.split_attrs.num_docs, elapsed_secs = %start.elapsed().as_secs_f32(), - operation_type = %merge_op.operation_type, + operation_type = %merge_task.operation_type, "merge-operation-success" ); ctx.send_message( @@ -136,8 +136,8 @@ impl Handler for MergeExecutor { checkpoint_delta_opt: Default::default(), publish_lock: PublishLock::default(), publish_token_opt: None, - batch_parent_span: merge_op.merge_parent_span.clone(), - merge_operation_opt: Some(merge_op), + batch_parent_span: merge_task.merge_parent_span.clone(), + merge_task_opt: Some(merge_task), }, ) .await?; @@ -566,10 +566,10 @@ mod tests { DeleteQuery, ListSplitsRequest, PublishSplitsRequest, StageSplitsRequest, }; use serde_json::Value as JsonValue; - use tantivy::{Document, Inventory, ReloadPolicy, TantivyDocument}; + use tantivy::{Document, ReloadPolicy, TantivyDocument}; use super::*; - use crate::merge_policy::MergeOperation; + use crate::merge_policy::{MergeOperation, MergeTask}; use crate::{get_tantivy_directory_from_split_bundle, new_split_id, TestSandbox}; #[tokio::test] @@ -623,11 +623,10 @@ mod tests { .await?; tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap()) } - let merge_ops_inventory = Inventory::new(); - let merge_operation = - merge_ops_inventory.track(MergeOperation::new_merge_operation(split_metas)); + let merge_operation = MergeOperation::new_merge_operation(split_metas); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); let merge_scratch = MergeScratch { - merge_operation, + merge_task, tantivy_dirs, merge_scratch_directory, downloaded_splits_directory, @@ -772,12 +771,10 @@ mod tests { .copy_to_file(Path::new(&split_filename), &dest_filepath) .await?; let tantivy_dir = get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap(); - let merge_ops_inventory = Inventory::new(); - let merge_operation = merge_ops_inventory.track( - MergeOperation::new_delete_and_merge_operation(new_split_metadata), - ); + let merge_operation = MergeOperation::new_delete_and_merge_operation(new_split_metadata); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); let merge_scratch = MergeScratch { - merge_operation, + merge_task, tantivy_dirs: vec![tantivy_dir], merge_scratch_directory, downloaded_splits_directory, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 04c28aaba3c..244f176dab0 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -40,6 +40,7 @@ use quickwit_proto::metastore::{ use time::OffsetDateTime; use tracing::{debug, error, info, instrument}; +use super::MergeSchedulerService; use crate::actors::indexing_pipeline::wait_duration_before_retry; use crate::actors::merge_split_downloader::MergeSplitDownloader; use crate::actors::publisher::PublisherType; @@ -228,7 +229,6 @@ impl MergePipeline { let published_splits_metadata = ctx .protect_future(published_splits_stream.collect_splits_metadata()) .await?; - info!( num_splits = published_splits_metadata.len(), "loaded list of published splits" @@ -244,6 +244,11 @@ impl MergePipeline { let (merge_publisher_mailbox, merge_publisher_handler) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) + .set_backpressure_micros_counter( + crate::metrics::INDEXER_METRICS + .backpressure_micros + .with_label_values(["merge_publisher"]), + ) .spawn(merge_publisher); // Merge uploader @@ -288,6 +293,11 @@ impl MergePipeline { let (merge_executor_mailbox, merge_executor_handler) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) + .set_backpressure_micros_counter( + crate::metrics::INDEXER_METRICS + .backpressure_micros + .with_label_values(["merge_executor"]), + ) .spawn(merge_executor); let merge_split_downloader = MergeSplitDownloader { @@ -302,7 +312,7 @@ impl MergePipeline { .set_backpressure_micros_counter( crate::metrics::INDEXER_METRICS .backpressure_micros - .with_label_values(["MergeSplitDownloader"]), + .with_label_values(["merge_split_downloader"]), ) .spawn(merge_split_downloader); @@ -312,6 +322,7 @@ impl MergePipeline { published_splits_metadata, self.params.merge_policy.clone(), merge_split_downloader_mailbox, + self.params.merge_scheduler_service.clone(), ); let (_, merge_planner_handler) = ctx .spawn_actor() @@ -357,6 +368,9 @@ impl MergePipeline { handles.merge_planner.refresh_observe(); handles.merge_uploader.refresh_observe(); handles.merge_publisher.refresh_observe(); + let num_ongoing_merges = crate::metrics::INDEXER_METRICS + .ongoing_merge_operations + .get(); self.statistics = self .previous_generations_statistics .clone() @@ -366,13 +380,7 @@ impl MergePipeline { ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts) - .set_ongoing_merges( - handles - .merge_planner - .last_observation() - .ongoing_merge_operations - .len(), - ); + .set_ongoing_merges(usize::try_from(num_ongoing_merges).unwrap_or(0)); } async fn perform_health_check( @@ -455,6 +463,7 @@ pub struct MergePipelineParams { pub doc_mapper: Arc, pub indexing_directory: TempDirectory, pub metastore: MetastoreServiceClient, + pub merge_scheduler_service: Mailbox, pub split_store: IndexingSplitStore, pub merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. @@ -515,6 +524,7 @@ mod tests { doc_mapper: Arc::new(default_doc_mapper_for_test()), indexing_directory: TempDirectory::for_test(), metastore: MetastoreServiceClient::from(metastore), + merge_scheduler_service: universe.get_or_spawn_one(), split_store, merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 79f57afc3aa..5fcc84f73fe 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -17,14 +17,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::cmp::Reverse; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; -use itertools::Itertools; -use quickwit_actors::channel_with_priority::TrySendError; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_metastore::SplitMetadata; use quickwit_proto::indexing::IndexingPipelineId; @@ -33,9 +30,10 @@ use tantivy::Inventory; use time::OffsetDateTime; use tracing::{info, warn}; +use super::MergeSchedulerService; +use crate::actors::merge_scheduler_service::schedule_merge; use crate::actors::MergeSplitDownloader; use crate::merge_policy::MergeOperation; -use crate::metrics::INDEXER_METRICS; use crate::models::NewSplits; use crate::MergePolicy; @@ -61,13 +59,16 @@ pub struct MergePlanner { /// We incrementally build this set, by adding new splits to it. /// When it becomes too large, we entirely rebuild it. known_split_ids: HashSet, + known_split_ids_recompute_attempt_id: usize, merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, /// Inventory of ongoing merge operations. If everything goes well, /// a merge operation is dropped after the publish of the merged split. - /// Used for observability. + /// + /// It is used to GC the known_split_ids set. ongoing_merge_operations_inventory: Inventory, /// We use the actor start_time as a way to identify incarnations. @@ -75,27 +76,14 @@ pub struct MergePlanner { /// Since we recycle the mailbox of the merge planner, this incarnation /// makes it possible to ignore messages that where emitted from the previous /// instantiation. - /// - /// In particular, it is necessary to avoid ever increasing the number - /// `RefreshMetrics` loop, every time the `MergePlanner` is respawned. incarnation_started_at: Instant, } #[async_trait] impl Actor for MergePlanner { - type ObservableState = MergePlannerState; - - fn observable_state(&self) -> Self::ObservableState { - let ongoing_merge_operations = self - .ongoing_merge_operations_inventory - .list() - .iter() - .map(|tracked_operation| tracked_operation.as_ref().clone()) - .collect_vec(); - MergePlannerState { - ongoing_merge_operations, - } - } + type ObservableState = (); + + fn observable_state(&self) -> Self::ObservableState {} fn name(&self) -> String { "MergePlanner".to_string() @@ -106,13 +94,6 @@ impl Actor for MergePlanner { } async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - self.handle( - RefreshMetrics { - incarnation_started_at: self.incarnation_started_at, - }, - ctx, - ) - .await?; // We do not call the handle method directly and instead queue the message in order to drain // the recycled mailbox and get a consolidated vision of the set of published // splits, before scheduling any merge operation. See #3847 for more details. @@ -159,23 +140,11 @@ impl Handler for MergePlanner { ) -> Result<(), ActorExitStatus> { self.record_splits_if_necessary(new_splits.new_splits); self.send_merge_ops(ctx).await?; - if self.known_split_ids.len() >= self.num_known_splits_rebuild_threshold() { - self.known_split_ids = self.rebuild_known_split_ids(); - } self.recompute_known_splits_if_necessary(); Ok(()) } } -fn max_merge_ops(merge_op: &MergeOperation) -> usize { - merge_op - .splits_as_slice() - .iter() - .map(|split_metadata| split_metadata.num_merge_ops) - .max() - .unwrap_or(0) -} - impl MergePlanner { pub fn queue_capacity() -> QueueCapacity { // We cannot have a Queue capacity of 0 here because `try_send_self` @@ -188,6 +157,7 @@ impl MergePlanner { published_splits: Vec, merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, ) -> MergePlanner { let published_splits: Vec = published_splits .into_iter() @@ -195,10 +165,13 @@ impl MergePlanner { .collect(); let mut merge_planner = MergePlanner { known_split_ids: Default::default(), + known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, merge_split_downloader_mailbox, + merge_scheduler_service, ongoing_merge_operations_inventory: Inventory::default(), + incarnation_started_at: Instant::now(), }; merge_planner.record_splits_if_necessary(published_splits); @@ -206,8 +179,7 @@ impl MergePlanner { } fn rebuild_known_split_ids(&self) -> HashSet { - let mut known_split_ids: HashSet = - HashSet::with_capacity(self.num_known_splits_rebuild_threshold()); + let mut known_split_ids: HashSet = HashSet::default(); // Add splits that in `partitioned_young_splits`. for young_split_partition in self.partitioned_young_splits.values() { for split in young_split_partition { @@ -242,43 +214,13 @@ impl MergePlanner { true } + // No need to rebuild every time, we do once out of 100 times. fn recompute_known_splits_if_necessary(&mut self) { - if self.known_split_ids.len() >= self.num_known_splits_rebuild_threshold() { + self.known_split_ids_recompute_attempt_id += 1; + if self.known_split_ids_recompute_attempt_id % 100 == 0 { self.known_split_ids = self.rebuild_known_split_ids(); + self.known_split_ids_recompute_attempt_id = 0; } - if cfg!(test) { - let merge_operation = self.ongoing_merge_operations_inventory.list(); - let mut young_splits = HashSet::new(); - for (&partition_id, young_splits_in_partition) in &self.partitioned_young_splits { - for split_metadata in young_splits_in_partition { - assert_eq!(split_metadata.partition_id, partition_id); - young_splits.insert(split_metadata.split_id()); - } - } - for merge_op in merge_operation { - assert!(!self.known_split_ids.contains(&merge_op.merge_split_id)); - for split_in_merge in merge_op.splits_as_slice() { - assert!(self.known_split_ids.contains(split_in_merge.split_id())); - } - } - assert!(self.known_split_ids.len() <= self.num_known_splits_rebuild_threshold() + 1); - } - } - - /// Whenever the number of known splits exceeds this threshold, we rebuild the `known_split_ids` - /// set. - /// - /// We have this function to return a number that is higher than 2 times the len of - /// `known_split_ids` after a rebuild to get amortization. - fn num_known_splits_rebuild_threshold(&self) -> usize { - // The idea behind this formula is that we expect the max legitimate of splits after a - // rebuild to be `num_young_splits` + `num_splits_merge`. - // The capacity of `partioned_young_splits` is a good upper bound for the number of - // partition. - // - // We can expect a maximum of 100 ongoing splits in merge per partition. (We oversize this - // because it actually depends on the merge factor. - 1 + self.num_young_splits() + (1 + self.partitioned_young_splits.capacity()) * 20 } // Record a split. This function does NOT check if the split is mature or not, or if the split @@ -331,70 +273,24 @@ impl MergePlanner { Ok(merge_operations) } - fn num_young_splits(&self) -> usize { - self.partitioned_young_splits - .values() - .map(|splits| splits.len()) - .sum() - } - async fn send_merge_ops(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - // We do not want to simply schedule all available merge operations here. + // We identify all of the merge operations we want to run and leave it + // to the merge scheduler to decide in which order these should be scheduled. // - // The reason is that in presence of partitioning, it is very possible - // to receive a set of splits opening the opportunity to run a lot "large" merge - // operations at the same time. - // - // These large merge operation will in turn prevent small merge operations - // from being executed, when in fact small merge operations should be executed - // in priority. - // - // As an alternative approach, this function push merge operations until it starts - // experience some push back, and then just "loops". - let mut merge_ops = self.compute_merge_ops(ctx).await?; - // We run smaller merges in priority. - merge_ops.sort_by_cached_key(|merge_op| Reverse(max_merge_ops(merge_op))); - while let Some(merge_operation) = merge_ops.pop() { - info!(merge_operation=?merge_operation, "planned merge operation"); + // The merge scheduler has the merit of knowing about merge operations from other + // index as well. + let merge_ops = self.compute_merge_ops(ctx).await?; + for merge_operation in merge_ops { + info!(merge_operation=?merge_operation, "schedule merge operation"); let tracked_merge_operation = self .ongoing_merge_operations_inventory .track(merge_operation); - if let Err(try_send_err) = self - .merge_split_downloader_mailbox - .try_send_message(tracked_merge_operation) - { - match try_send_err { - TrySendError::Disconnected => { - return Err(ActorExitStatus::DownstreamClosed); - } - TrySendError::Full(merge_op) => { - ctx.send_message(&self.merge_split_downloader_mailbox, merge_op) - .await?; - break; - } - } - } - } - if !merge_ops.is_empty() { - // We experienced some push back and decided to stop queueing too - // many merge operation. (For more detail see #2348) - // - // We need to re-record the related split, so that we - // perform a merge in the future. - for merge_op in merge_ops { - for split in merge_op.splits { - self.record_split(split); - } - } - // We try_self_send a `PlanMerge` message in order to ensure that - // progress on our merges. - // - // If `try_send_self_message` returns an error, it means that the - // the self queue is full, which means that the `PlanMerge` - // message is not really needed anyway. - let _ignored_result = ctx.try_send_self_message(PlanMerge { - incarnation_started_at: self.incarnation_started_at, - }); + schedule_merge( + &self.merge_scheduler_service, + tracked_merge_operation, + self.merge_split_downloader_mailbox.clone(), + ) + .await? } Ok(()) } @@ -407,43 +303,11 @@ fn belongs_to_pipeline(pipeline_id: &IndexingPipelineId, split: &SplitMetadata) && pipeline_id.node_id == split.node_id } -#[derive(Debug)] -struct RefreshMetrics { - incarnation_started_at: Instant, -} - #[derive(Debug)] struct PlanMerge { incarnation_started_at: Instant, } -#[async_trait] -impl Handler for MergePlanner { - type Reply = (); - - async fn handle( - &mut self, - refresh_metric: RefreshMetrics, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if self.incarnation_started_at != refresh_metric.incarnation_started_at { - // This message was emitted by a different incarnation. - // (See `Self::incarnation_started_at`) - return Ok(()); - } - INDEXER_METRICS - .ongoing_merge_operations - .set(self.ongoing_merge_operations_inventory.list().len() as i64); - ctx.schedule_self_msg( - *quickwit_actors::HEARTBEAT, - RefreshMetrics { - incarnation_started_at: self.incarnation_started_at, - }, - ); - Ok(()) - } -} - #[derive(Clone, Debug, Serialize)] pub struct MergePlannerState { pub(crate) ongoing_merge_operations: Vec, @@ -463,12 +327,11 @@ mod tests { use quickwit_metastore::{SplitMaturity, SplitMetadata}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::types::{IndexUid, PipelineUid}; - use tantivy::TrackedObject; use time::OffsetDateTime; use crate::actors::MergePlanner; use crate::merge_policy::{ - merge_policy_from_settings, MergeOperation, MergePolicy, StableLogMergePolicy, + merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy, }; use crate::models::NewSplits; @@ -521,6 +384,7 @@ mod tests { Vec::new(), merge_policy, merge_split_downloader_mailbox, + universe.get_or_spawn_one(), ); let (merge_planner_mailbox, merge_planner_handle) = @@ -561,8 +425,7 @@ mod tests { }; merge_planner_mailbox.send_message(message).await?; merge_planner_handle.process_pending_and_observe().await; - let operations = merge_split_downloader_inbox - .drain_for_test_typed::>(); + let operations = merge_split_downloader_inbox.drain_for_test_typed::(); assert_eq!(operations.len(), 2); let mut merge_operations = operations.into_iter().sorted_by(|left_op, right_op| { left_op.splits[0] @@ -581,155 +444,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_merge_planner_priority() -> anyhow::Result<()> { - let universe = Universe::with_accelerated_time(); - let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = - universe.create_test_mailbox(); - let index_uid = IndexUid::new_with_random_ulid("test-index"); - let pipeline_id = IndexingPipelineId { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - node_id: "test-node".to_string(), - pipeline_uid: PipelineUid::default(), - }; - let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { - merge_factor: 2, - max_merge_factor: 2, - max_merge_ops: 3, - ..Default::default() - }; - let indexing_settings = IndexingSettings { - merge_policy: MergePolicyConfig::ConstWriteAmplification(merge_policy_config), - ..Default::default() - }; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); - let merge_planner = MergePlanner::new( - pipeline_id, - Vec::new(), - merge_policy, - merge_split_downloader_mailbox, - ); - let (merge_planner_mailbox, merge_planner_handle) = - universe.spawn_builder().spawn(merge_planner); - // send 4 splits, offering 2 merge opportunities. - let message = NewSplits { - new_splits: vec![ - split_metadata_for_test(&index_uid, "2_a", 2, 100, 2), - split_metadata_for_test(&index_uid, "2_b", 2, 100, 2), - split_metadata_for_test(&index_uid, "1_a", 1, 10, 1), - split_metadata_for_test(&index_uid, "1_b", 1, 10, 1), - ], - }; - merge_planner_mailbox.send_message(message).await?; - merge_planner_handle.process_pending_and_observe().await; - let merge_ops: Vec> = - merge_split_downloader_inbox.drain_for_test_typed(); - assert_eq!(merge_ops.len(), 2); - assert_eq!(merge_ops[0].splits_as_slice()[0].num_merge_ops, 1); - assert_eq!(merge_ops[1].splits_as_slice()[0].num_merge_ops, 2); - universe.assert_quit().await; - Ok(()) - } - - #[tokio::test] - async fn test_merge_planner_priority_only_queue_up_to_capacity() { - let universe = Universe::with_accelerated_time(); - let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe - .spawn_ctx() - .create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2)); - let index_uid = IndexUid::new_with_random_ulid("test-index"); - let pipeline_id = IndexingPipelineId { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - node_id: "test-node".to_string(), - pipeline_uid: PipelineUid::default(), - }; - let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { - merge_factor: 2, - max_merge_factor: 2, - max_merge_ops: 3, - ..Default::default() - }; - let indexing_settings = IndexingSettings { - merge_policy: MergePolicyConfig::ConstWriteAmplification(merge_policy_config), - ..Default::default() - }; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); - let merge_planner = MergePlanner::new( - pipeline_id, - Vec::new(), - merge_policy, - merge_split_downloader_mailbox, - ); - let universe = Universe::with_accelerated_time(); - let (merge_planner_mailbox, _) = universe.spawn_builder().spawn(merge_planner); - tokio::task::spawn(async move { - // Sending 20 splits offering 10 split opportunities - let messages_with_merge_ops2 = NewSplits { - new_splits: (0..10) - .flat_map(|partition_id| { - [ - split_metadata_for_test( - &index_uid, - &format!("{partition_id}_a_large"), - partition_id, - 1_000_000, - 2, - ), - split_metadata_for_test( - &index_uid, - &format!("{partition_id}_b_large"), - partition_id, - 1_000_000, - 2, - ), - ] - }) - .collect(), - }; - merge_planner_mailbox - .send_message(messages_with_merge_ops2) - .await - .unwrap(); - let messages_with_merge_ops1 = NewSplits { - new_splits: (0..10) - .flat_map(|partition_id| { - [ - split_metadata_for_test( - &index_uid, - &format!("{partition_id}_a_small"), - partition_id, - 100_000, - 1, - ), - split_metadata_for_test( - &index_uid, - &format!("{partition_id}_b_small"), - partition_id, - 100_000, - 1, - ), - ] - }) - .collect(), - }; - merge_planner_mailbox - .send_message(messages_with_merge_ops1) - .await - .unwrap(); - }); - tokio::task::spawn_blocking(move || { - let mut merge_ops: Vec> = Vec::new(); - while merge_ops.len() < 20 { - merge_ops.extend(merge_split_downloader_inbox.drain_for_test_typed()); - } - }) - .await - .unwrap(); - universe.assert_quit().await; - } - #[tokio::test] async fn test_merge_planner_spawns_merge_over_existing_splits_on_startup() -> anyhow::Result<()> { @@ -770,16 +484,17 @@ mod tests { pre_existing_splits.clone(), merge_policy, merge_split_downloader_mailbox, + universe.get_or_spawn_one(), ); let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); // We wait for the first merge ops. If we sent the Quit message right away, it would have // been queue before first `PlanMerge` message. - let merge_op = merge_split_downloader_inbox - .recv_typed_message::>() + let merge_task_res = merge_split_downloader_inbox + .recv_typed_message::() .await; - assert!(merge_op.is_some()); + assert!(merge_task_res.is_ok()); // We make sure that the known splits filtering set filters out splits are currently in // merge. @@ -791,8 +506,7 @@ mod tests { let _ = merge_planner_handle.process_pending_and_observe().await; - let merge_ops = - merge_split_downloader_inbox.drain_for_test_typed::>(); + let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_ops.is_empty()); @@ -800,8 +514,7 @@ mod tests { let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_ops = - merge_split_downloader_inbox.drain_for_test_typed::>(); + let merge_ops = merge_split_downloader_inbox.drain_for_test_typed::(); assert!(merge_ops.is_empty()); universe.assert_quit().await; Ok(()) @@ -858,6 +571,7 @@ mod tests { pre_existing_splits.clone(), merge_policy, merge_split_downloader_mailbox, + universe.get_or_spawn_one(), ); let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); @@ -865,10 +579,9 @@ mod tests { merge_planner_mailbox.send_message(Command::Quit).await?; let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_ops = - merge_split_downloader_inbox.drain_for_test_typed::>(); + let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); - assert!(merge_ops.is_empty()); + assert!(merge_tasks.is_empty()); universe.assert_quit().await; Ok(()) } @@ -914,8 +627,8 @@ mod tests { pre_existing_splits.clone(), merge_policy, merge_split_downloader_mailbox, + universe.get_or_spawn_one(), ); - let universe = Universe::with_accelerated_time(); // We create a fake old mailbox that contains two new splits and a PlanMerge message from an // old incarnation. This could happen in real life if the merge pipeline failed @@ -935,87 +648,18 @@ mod tests { // sent in the initialize method. // Instead, we wait for the first merge ops. - let merge_ops = merge_split_downloader_inbox - .recv_typed_message::>() + let merge_task_res = merge_split_downloader_inbox + .recv_typed_message::() .await; - assert!(merge_ops.is_some()); + assert!(merge_task_res.is_ok()); // At this point, our merge has been initialized. merge_planner_mailbox.send_message(Command::Quit).await?; let (exit_status, _last_state) = merge_planner_handle.join().await; assert!(matches!(exit_status, ActorExitStatus::Quit)); - let merge_ops = - merge_split_downloader_inbox.drain_for_test_typed::>(); - assert!(merge_ops.is_empty()); - - universe.assert_quit().await; - Ok(()) - } - - #[tokio::test] - async fn test_merge_planner_known_splits_set_size_stays_bounded() -> anyhow::Result<()> { - let universe = Universe::with_accelerated_time(); - let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe - .spawn_ctx() - .create_mailbox("MergeSplitDownloader", QueueCapacity::Unbounded); - let index_uid = IndexUid::new_with_random_ulid("test-index"); - let pipeline_id = IndexingPipelineId { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - node_id: "test-node".to_string(), - pipeline_uid: PipelineUid::default(), - }; - let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { - merge_factor: 2, - max_merge_factor: 2, - max_merge_ops: 3, - ..Default::default() - }; - let indexing_settings = IndexingSettings { - merge_policy: MergePolicyConfig::ConstWriteAmplification(merge_policy_config), - ..Default::default() - }; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); - let merge_planner = MergePlanner::new( - pipeline_id, - Vec::new(), - merge_policy, - merge_split_downloader_mailbox, - ); - let universe = Universe::with_accelerated_time(); - - // We spawn our merge planner with this recycled mailbox. - let (merge_planner_mailbox, merge_planner_handle) = - universe.spawn_builder().spawn(merge_planner); - - for j in 0..100 { - for i in 0..10 { - merge_planner_mailbox - .ask(NewSplits { - new_splits: vec![split_metadata_for_test( - &index_uid, - &format!("split_{}", j * 10 + i), - 0, - 1_000_000, - 1, - )], - }) - .await - .unwrap(); - } - // We drain the merge_ops to make sure merge ops are dropped (as if merges where - // successful) and that we are properly testing that the known_splits_set is - // bounded. - let merge_ops = merge_split_downloader_inbox - .drain_for_test_typed::>(); - assert_eq!(merge_ops.len(), 5); - } - - // At this point, our merge has been initialized. - merge_planner_mailbox.send_message(Command::Quit).await?; - let (exit_status, _last_state) = merge_planner_handle.join().await; - assert!(matches!(exit_status, ActorExitStatus::Quit)); + let merge_tasks = merge_split_downloader_inbox.drain_for_test_typed::(); + assert!(merge_tasks.is_empty()); universe.assert_quit().await; Ok(()) diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs new file mode 100644 index 00000000000..2ba1efb7e37 --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -0,0 +1,462 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::cmp::Reverse; +use std::collections::binary_heap::PeekMut; +use std::collections::BinaryHeap; +use std::sync::Arc; + +use anyhow::Context; +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; +use tantivy::TrackedObject; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracing::error; + +use super::MergeSplitDownloader; +use crate::merge_policy::{MergeOperation, MergeTask}; + +pub struct MergePermit { + _semaphore_permit: Option, + merge_scheduler_mailbox: Option>, +} + +impl MergePermit { + #[cfg(test)] + pub fn for_test() -> MergePermit { + MergePermit { + _semaphore_permit: None, + merge_scheduler_mailbox: None, + } + } +} + +impl Drop for MergePermit { + fn drop(&mut self) { + let Some(merge_scheduler_mailbox) = self.merge_scheduler_mailbox.take() else { + return; + }; + if merge_scheduler_mailbox + .send_message_with_high_priority(PermitReleased) + .is_err() + { + error!("merge scheduler service is dead"); + } + } +} + +pub async fn schedule_merge( + merge_scheduler_service: &Mailbox, + merge_operation: TrackedObject, + merge_split_downloader_mailbox: Mailbox, +) -> anyhow::Result<()> { + let schedule_merge = ScheduleMerge::new(merge_operation, merge_split_downloader_mailbox); + // TODO add backpressure. + merge_scheduler_service + .ask(schedule_merge) + .await + .context("failed to acquire permit")?; + Ok(()) +} + +struct ScheduledMerge { + score: u64, + id: u64, //< just for total ordering. + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, +} + +impl ScheduledMerge { + fn order_key(&self) -> (u64, Reverse) { + (self.score, std::cmp::Reverse(self.id)) + } +} + +impl Eq for ScheduledMerge {} + +impl PartialEq for ScheduledMerge { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +impl PartialOrd for ScheduledMerge { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ScheduledMerge { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.order_key().cmp(&other.order_key()) + } +} + +/// The merge scheduler service is in charge of keeping track of all scheduled merge operations, +/// and schedule them in the best possible order, respecting the `merge_concurrency` limit. +/// +/// This actor is not supervised and should stay as simple as possible. +/// In particular, +/// - the `ScheduleMerge` handler should reply in microseconds. +/// - the task should never be dropped before reaching its `split_downloader_mailbox` destination +/// as it would break the consistency of `MergePlanner` with the metastore (ie: several splits will +/// never be merged). +pub struct MergeSchedulerService { + merge_semaphore: Arc, + merge_concurrency: usize, + pending_merge_queue: BinaryHeap, + next_merge_id: u64, + pending_merge_bytes: u64, +} + +impl Default for MergeSchedulerService { + fn default() -> MergeSchedulerService { + MergeSchedulerService::new(3) + } +} + +impl MergeSchedulerService { + pub fn new(merge_concurrency: usize) -> MergeSchedulerService { + let merge_semaphore = Arc::new(Semaphore::new(merge_concurrency)); + MergeSchedulerService { + merge_semaphore, + merge_concurrency, + pending_merge_queue: BinaryHeap::default(), + next_merge_id: 0, + pending_merge_bytes: 0, + } + } + + fn schedule_pending_merges(&mut self, ctx: &ActorContext) { + // We schedule as many pending merges as we can, + // until there are no permits available or merges to schedule. + loop { + let merge_semaphore = self.merge_semaphore.clone(); + let Some(next_merge) = self.pending_merge_queue.peek_mut() else { + // No merge to schedule. + break; + }; + let Ok(semaphore_permit) = Semaphore::try_acquire_owned(merge_semaphore) else { + // No permit available right away. + break; + }; + let merge_permit = MergePermit { + _semaphore_permit: Some(semaphore_permit), + merge_scheduler_mailbox: Some(ctx.mailbox().clone()), + }; + let ScheduledMerge { + merge_operation, + split_downloader_mailbox, + .. + } = PeekMut::pop(next_merge); + let merge_task = MergeTask { + merge_operation, + _merge_permit: merge_permit, + }; + self.pending_merge_bytes -= merge_task.merge_operation.total_num_bytes(); + crate::metrics::INDEXER_METRICS + .pending_merge_operations + .set(self.pending_merge_queue.len() as i64); + crate::metrics::INDEXER_METRICS + .pending_merge_bytes + .set(self.pending_merge_bytes as i64); + match split_downloader_mailbox.try_send_message(merge_task) { + Ok(_) => {} + Err(quickwit_actors::TrySendError::Full(_)) => { + // The split downloader mailbox has an unbounded queue capacity, + error!("split downloader queue is full: please report"); + } + Err(quickwit_actors::TrySendError::Disconnected) => { + // It means the split downloader is dead. + // This is fine, the merge pipeline has probably been restarted. + } + } + } + let num_merges = + self.merge_concurrency as i64 - self.merge_semaphore.available_permits() as i64; + crate::metrics::INDEXER_METRICS + .ongoing_merge_operations + .set(num_merges); + } +} + +#[async_trait] +impl Actor for MergeSchedulerService { + type ObservableState = (); + + fn observable_state(&self) {} + + async fn initialize(&mut self, _ctx: &ActorContext) -> Result<(), ActorExitStatus> { + Ok(()) + } +} + +#[derive(Debug)] +struct ScheduleMerge { + score: u64, + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, +} + +/// The higher, the sooner we will execute the merge operation. +/// A good merge operation +/// - strongly reduces the number splits +/// - is light. +fn score_merge_operation(merge_operation: &MergeOperation) -> u64 { + let total_num_bytes: u64 = merge_operation.total_num_bytes(); + if total_num_bytes == 0 { + // Silly corner case that should never happen. + return u64::MAX; + } + // We will remove splits.len() and add 1 merge splits. + let delta_num_splits = (merge_operation.splits.len() - 1) as u64; + // We use integer arithmetic to avoid `f64 are not ordered` silliness. + (delta_num_splits << 48) + .checked_div(total_num_bytes) + .unwrap_or(1u64) +} + +impl ScheduleMerge { + pub fn new( + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, + ) -> ScheduleMerge { + let score = score_merge_operation(&merge_operation); + ScheduleMerge { + score, + merge_operation, + split_downloader_mailbox, + } + } +} + +#[async_trait] +impl Handler for MergeSchedulerService { + type Reply = (); + + async fn handle( + &mut self, + schedule_merge: ScheduleMerge, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + let ScheduleMerge { + score, + merge_operation, + split_downloader_mailbox, + } = schedule_merge; + let merge_id = self.next_merge_id; + self.next_merge_id += 1; + let scheduled_merge = ScheduledMerge { + score, + id: merge_id, + merge_operation, + split_downloader_mailbox, + }; + self.pending_merge_bytes += scheduled_merge.merge_operation.total_num_bytes(); + self.pending_merge_queue.push(scheduled_merge); + crate::metrics::INDEXER_METRICS + .pending_merge_operations + .set(self.pending_merge_queue.len() as i64); + crate::metrics::INDEXER_METRICS + .pending_merge_bytes + .set(self.pending_merge_bytes as i64); + self.schedule_pending_merges(ctx); + Ok(()) + } +} + +#[derive(Debug)] +struct PermitReleased; + +#[async_trait] +impl Handler for MergeSchedulerService { + type Reply = (); + + async fn handle( + &mut self, + _: PermitReleased, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.schedule_pending_merges(ctx); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_actors::Universe; + use quickwit_metastore::SplitMetadata; + use tantivy::Inventory; + use tokio::time::timeout; + + use super::*; + use crate::merge_policy::{MergeOperation, MergeTask}; + + fn build_merge_operation(num_splits: usize, num_bytes_per_split: u64) -> MergeOperation { + let splits: Vec = std::iter::repeat_with(|| SplitMetadata { + footer_offsets: num_bytes_per_split..num_bytes_per_split, + ..Default::default() + }) + .take(num_splits) + .collect(); + MergeOperation::new_merge_operation(splits) + } + + #[test] + fn test_score_merge_operation() { + let score_merge_operation_aux = |num_splits, num_bytes_per_split| { + let merge_operation = build_merge_operation(num_splits, num_bytes_per_split); + score_merge_operation(&merge_operation) + }; + assert!(score_merge_operation_aux(10, 10_000_000) < score_merge_operation_aux(10, 999_999)); + assert!( + score_merge_operation_aux(10, 10_000_000) > score_merge_operation_aux(9, 10_000_000) + ); + assert_eq!( + // 9 - 1 = 8 splits removed. + score_merge_operation_aux(9, 10_000_000), + // 5 - 1 = 4 splits removed. + score_merge_operation_aux(5, 10_000_000 * 9 / 10) + ); + } + + #[tokio::test] + async fn test_merge_schedule_service_prioritize() { + let universe = Universe::new(); + let (merge_scheduler_service, _) = universe + .spawn_builder() + .spawn(MergeSchedulerService::new(2)); + let inventory = Inventory::new(); + + let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = + universe.create_test_mailbox(); + { + let large_merge_operation = build_merge_operation(10, 4_000_000); + let tracked_large_merge_operation = inventory.track(large_merge_operation); + schedule_merge( + &merge_scheduler_service, + tracked_large_merge_operation, + merge_split_downloader_mailbox.clone(), + ) + .await + .unwrap(); + } + { + let large_merge_operation2 = build_merge_operation(10, 3_000_000); + let tracked_large_merge_operation2 = inventory.track(large_merge_operation2); + schedule_merge( + &merge_scheduler_service, + tracked_large_merge_operation2, + merge_split_downloader_mailbox.clone(), + ) + .await + .unwrap(); + } + { + let large_merge_operation2 = build_merge_operation(10, 5_000_000); + let tracked_large_merge_operation2 = inventory.track(large_merge_operation2); + schedule_merge( + &merge_scheduler_service, + tracked_large_merge_operation2, + merge_split_downloader_mailbox.clone(), + ) + .await + .unwrap(); + } + { + let large_merge_operation2 = build_merge_operation(10, 2_000_000); + let tracked_large_merge_operation2 = inventory.track(large_merge_operation2); + schedule_merge( + &merge_scheduler_service, + tracked_large_merge_operation2, + merge_split_downloader_mailbox.clone(), + ) + .await + .unwrap(); + } + { + let large_merge_operation2 = build_merge_operation(10, 1_000_000); + let tracked_large_merge_operation2 = inventory.track(large_merge_operation2); + schedule_merge( + &merge_scheduler_service, + tracked_large_merge_operation2, + merge_split_downloader_mailbox.clone(), + ) + .await + .unwrap(); + } + { + let merge_task: MergeTask = merge_split_downloader_inbox + .recv_typed_message::() + .await + .unwrap(); + assert_eq!( + merge_task.merge_operation.splits[0].footer_offsets.end, + 4_000_000 + ); + let merge_task2: MergeTask = merge_split_downloader_inbox + .recv_typed_message::() + .await + .unwrap(); + assert_eq!( + merge_task2.merge_operation.splits[0].footer_offsets.end, + 3_000_000 + ); + assert!(timeout( + Duration::from_millis(200), + merge_split_downloader_inbox.recv_typed_message::() + ) + .await + .is_err()); + } + { + let merge_task: MergeTask = merge_split_downloader_inbox + .recv_typed_message::() + .await + .unwrap(); + assert_eq!( + merge_task.merge_operation.splits[0].footer_offsets.end, + 1_000_000 + ); + } + { + let merge_task: MergeTask = merge_split_downloader_inbox + .recv_typed_message::() + .await + .unwrap(); + assert_eq!( + merge_task.merge_operation.splits[0].footer_offsets.end, + 2_000_000 + ); + } + { + let merge_task: MergeTask = merge_split_downloader_inbox + .recv_typed_message::() + .await + .unwrap(); + assert_eq!( + merge_task.merge_operation.splits[0].footer_offsets.end, + 5_000_000 + ); + } + universe.assert_quit().await; + } +} diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 0e80c23e4a9..09a782f72c1 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -24,11 +24,11 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu use quickwit_common::io::IoControls; use quickwit_common::temp_dir::{self, TempDirectory}; use quickwit_metastore::SplitMetadata; -use tantivy::{Directory, TrackedObject}; +use tantivy::Directory; use tracing::{debug, info, instrument}; use super::MergeExecutor; -use crate::merge_policy::MergeOperation; +use crate::merge_policy::MergeTask; use crate::models::MergeScratch; use crate::split_store::IndexingSplitStore; @@ -45,7 +45,7 @@ impl Actor for MergeSplitDownloader { fn observable_state(&self) -> Self::ObservableState {} fn queue_capacity(&self) -> QueueCapacity { - QueueCapacity::Bounded(1) + QueueCapacity::Unbounded } fn name(&self) -> String { @@ -54,17 +54,17 @@ impl Actor for MergeSplitDownloader { } #[async_trait] -impl Handler> for MergeSplitDownloader { +impl Handler for MergeSplitDownloader { type Reply = (); #[instrument( name = "merge_split_downloader", - parent = merge_operation.merge_parent_span.id(), + parent = merge_task.merge_parent_span.id(), skip_all, )] async fn handle( &mut self, - merge_operation: TrackedObject, + merge_task: MergeTask, ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { let merge_scratch_directory = temp_dir::Builder::default() @@ -78,13 +78,13 @@ impl Handler> for MergeSplitDownloader { .map_err(|error| anyhow::anyhow!(error))?; let tantivy_dirs = self .download_splits( - merge_operation.splits_as_slice(), + merge_task.splits_as_slice(), downloaded_splits_directory.path(), ctx, ) .await?; let msg = MergeScratch { - merge_operation, + merge_task, merge_scratch_directory, downloaded_splits_directory, tantivy_dirs, @@ -139,9 +139,9 @@ mod tests { use quickwit_actors::Universe; use quickwit_common::split_file; use quickwit_storage::{PutPayload, RamStorageBuilder, SplitPayloadBuilder}; - use tantivy::Inventory; use super::*; + use crate::merge_policy::MergeOperation; use crate::new_split_id; #[tokio::test] @@ -179,10 +179,10 @@ mod tests { }; let (merge_split_downloader_mailbox, merge_split_downloader_handler) = universe.spawn_builder().spawn(merge_split_downloader); - let inventory = Inventory::new(); - let merge_operation = inventory.track(MergeOperation::new_merge_operation(splits_to_merge)); + let merge_operation: MergeOperation = MergeOperation::new_merge_operation(splits_to_merge); + let merge_task = MergeTask::from_merge_operation_for_test(merge_operation); merge_split_downloader_mailbox - .send_message(merge_operation) + .send_message(merge_task) .await?; merge_split_downloader_handler .process_pending_and_observe() @@ -195,8 +195,8 @@ mod tests { .unwrap() .downcast::() .unwrap(); - assert_eq!(merge_scratch.merge_operation.splits_as_slice().len(), 10); - for split in merge_scratch.merge_operation.splits_as_slice() { + assert_eq!(merge_scratch.merge_task.splits_as_slice().len(), 10); + for split in merge_scratch.merge_task.splits_as_slice() { let split_filename = split_file(split.split_id()); let split_filepath = merge_scratch .downloaded_splits_directory diff --git a/quickwit/quickwit-indexing/src/actors/mod.rs b/quickwit/quickwit-indexing/src/actors/mod.rs index e85a4289ef8..6c7befd41ba 100644 --- a/quickwit/quickwit-indexing/src/actors/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/mod.rs @@ -25,6 +25,7 @@ mod indexing_service; mod merge_executor; mod merge_pipeline; mod merge_planner; +mod merge_scheduler_service; mod merge_split_downloader; mod packager; mod publisher; @@ -43,6 +44,7 @@ pub use indexing_service::{ pub use merge_executor::{combine_partition_ids, merge_split_attrs, MergeExecutor}; pub use merge_pipeline::MergePipeline; pub use merge_planner::MergePlanner; +pub use merge_scheduler_service::{schedule_merge, MergePermit, MergeSchedulerService}; pub use merge_split_downloader::MergeSplitDownloader; pub use packager::Packager; pub use publisher::{Publisher, PublisherCounters, PublisherType}; diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index d6f24e6fb37..07459147f37 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -159,7 +159,7 @@ impl Handler for Packager { batch.checkpoint_delta_opt, batch.publish_lock, batch.publish_token_opt, - batch.merge_operation_opt, + batch.merge_task_opt, batch.batch_parent_span, ), ) @@ -573,7 +573,7 @@ mod tests { checkpoint_delta_opt: IndexCheckpointDelta::for_test("source_id", 10..20).into(), publish_lock: PublishLock::default(), publish_token_opt: None, - merge_operation_opt: None, + merge_task_opt: None, batch_parent_span: Span::none(), }) .await?; diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 435b568a2a4..5011d54b10c 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -251,7 +251,7 @@ mod tests { }), publish_lock: PublishLock::default(), publish_token_opt: None, - merge_operation: None, + merge_task: None, parent_span: tracing::Span::none(), }) .await @@ -322,7 +322,7 @@ mod tests { }), publish_lock: PublishLock::default(), publish_token_opt: None, - merge_operation: None, + merge_task: None, parent_span: tracing::Span::none(), }) .await @@ -386,7 +386,7 @@ mod tests { checkpoint_delta_opt: None, publish_lock: PublishLock::default(), publish_token_opt: None, - merge_operation: None, + merge_task: None, parent_span: Span::none(), }; assert!(publisher_mailbox @@ -428,7 +428,7 @@ mod tests { checkpoint_delta_opt: None, publish_lock, publish_token_opt: None, - merge_operation: None, + merge_task: None, parent_span: Span::none(), }) .await diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index ee691f322ff..38df82ef04c 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -38,14 +38,13 @@ use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use quickwit_proto::types::{IndexUid, PublishToken}; use quickwit_storage::SplitPayloadBuilder; use serde::Serialize; -use tantivy::TrackedObject; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Semaphore, SemaphorePermit}; use tracing::{debug, info, instrument, warn, Instrument, Span}; use crate::actors::sequencer::{Sequencer, SequencerCommand}; use crate::actors::Publisher; -use crate::merge_policy::{MergeOperation, MergePolicy}; +use crate::merge_policy::{MergePolicy, MergeTask}; use crate::metrics::INDEXER_METRICS; use crate::models::{ create_split_metadata, EmptySplit, PackagedSplit, PackagedSplitBatch, PublishLock, SplitsUpdate, @@ -373,7 +372,7 @@ impl Handler for Uploader { batch.checkpoint_delta_opt, batch.publish_lock, batch.publish_token_opt, - batch.merge_operation_opt, + batch.merge_task_opt, batch.batch_parent_span, ); @@ -416,7 +415,7 @@ impl Handler for Uploader { checkpoint_delta_opt: Some(empty_split.checkpoint_delta), publish_lock: empty_split.publish_lock, publish_token_opt: empty_split.publish_token_opt, - merge_operation: None, + merge_task: None, parent_span: empty_split.batch_parent_span, }; @@ -431,7 +430,7 @@ fn make_publish_operation( checkpoint_delta_opt: Option, publish_lock: PublishLock, publish_token_opt: Option, - merge_operation: Option>, + merge_task: Option, parent_span: Span, ) -> SplitsUpdate { assert!(!packaged_splits_and_metadatas.is_empty()); @@ -449,7 +448,7 @@ fn make_publish_operation( checkpoint_delta_opt, publish_lock, publish_token_opt, - merge_operation, + merge_task, parent_span, } } diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index ec87f01a132..53a0b37b3a9 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -29,6 +29,7 @@ use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_storage::StorageResolver; use tracing::info; +use crate::actors::MergeSchedulerService; pub use crate::actors::{ IndexingError, IndexingPipeline, IndexingPipelineParams, IndexingService, PublisherType, Sequencer, SplitsUpdateMailbox, @@ -70,13 +71,15 @@ pub async fn start_indexing_service( num_blocking_threads: usize, cluster: Cluster, metastore: MetastoreServiceClient, - ingest_api_service: Mailbox, ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, ) -> anyhow::Result> { info!("starting indexer service"); - + let ingest_api_service_mailbox = universe.get_one::(); + let (merge_scheduler_mailbox, _) = universe.spawn_builder().spawn(MergeSchedulerService::new( + config.indexer_config.merge_concurrency.get(), + )); // Spawn indexing service. let indexing_service = IndexingService::new( config.node_id.clone(), @@ -85,13 +88,13 @@ pub async fn start_indexing_service( num_blocking_threads, cluster, metastore.clone(), - Some(ingest_api_service), + ingest_api_service_mailbox, + merge_scheduler_mailbox, ingester_pool, storage_resolver, event_broker, ) .await?; let (indexing_service, _) = universe.spawn_builder().spawn(indexing_service); - Ok(indexing_service) } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index cbda92b5c0a..462a95ac944 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -22,6 +22,7 @@ mod nop_merge_policy; mod stable_log_merge_policy; use std::fmt; +use std::ops::Deref; use std::sync::Arc; pub(crate) use const_write_amplification::ConstWriteAmplificationMergePolicy; @@ -32,8 +33,10 @@ use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use serde::Serialize; pub(crate) use stable_log_merge_policy::StableLogMergePolicy; +use tantivy::TrackedObject; use tracing::{info_span, Span}; +use crate::actors::MergePermit; use crate::new_split_id; #[derive(Clone, Debug, PartialEq, Eq, Serialize)] @@ -48,6 +51,37 @@ impl fmt::Display for MergeOperationType { } } +pub struct MergeTask { + pub merge_operation: TrackedObject, + pub(crate) _merge_permit: MergePermit, +} + +impl MergeTask { + #[cfg(test)] + pub fn from_merge_operation_for_test(merge_operation: MergeOperation) -> MergeTask { + let inventory = tantivy::Inventory::default(); + let tracked_merge_operation = inventory.track(merge_operation); + MergeTask { + merge_operation: tracked_merge_operation, + _merge_permit: MergePermit::for_test(), + } + } +} + +impl fmt::Debug for MergeTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.merge_operation.as_ref().fmt(f) + } +} + +impl Deref for MergeTask { + type Target = MergeOperation; + + fn deref(&self) -> &Self::Target { + self.merge_operation.as_ref() + } +} + #[derive(Clone, Serialize)] pub struct MergeOperation { #[serde(skip_serializing)] @@ -70,6 +104,13 @@ impl MergeOperation { } } + pub fn total_num_bytes(&self) -> u64 { + self.splits + .iter() + .map(|split: &SplitMetadata| split.footer_offsets.end) + .sum() + } + pub fn new_delete_and_merge_operation(split: SplitMetadata) -> Self { let merge_split_id = new_split_id(); let merge_parent_span = info_span!("delete", merge_split_id=%merge_split_id, split_ids=?split.split_id(), typ=%MergeOperationType::DeleteAndMerge); @@ -172,11 +213,12 @@ pub mod tests { use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::types::{IndexUid, PipelineUid}; use rand::seq::SliceRandom; - use tantivy::TrackedObject; use time::OffsetDateTime; use super::*; - use crate::actors::{merge_split_attrs, MergePlanner, MergeSplitDownloader}; + use crate::actors::{ + merge_split_attrs, MergePlanner, MergeSchedulerService, MergeSplitDownloader, + }; use crate::models::{create_split_metadata, NewSplits}; fn pow_of_10(n: usize) -> usize { @@ -360,7 +402,7 @@ pub mod tests { check_final_configuration: CheckFn, ) -> anyhow::Result> { let universe = Universe::new(); - let (merge_op_mailbox, merge_op_inbox) = + let (merge_task_mailbox, merge_task_inbox) = universe.create_test_mailbox::(); let pipeline_id = IndexingPipelineId { index_uid: IndexUid::new_with_random_ulid("test-index"), @@ -372,7 +414,8 @@ pub mod tests { pipeline_id, Vec::new(), merge_policy.clone(), - merge_op_mailbox, + merge_task_mailbox, + universe.get_or_spawn_one::(), ); let mut split_index: HashMap = HashMap::default(); let (merge_planner_mailbox, merge_planner_handler) = @@ -388,12 +431,11 @@ pub mod tests { loop { let obs = merge_planner_handler.process_pending_and_observe().await; assert_eq!(obs.obs_type, quickwit_actors::ObservationType::Alive); - let merge_ops = - merge_op_inbox.drain_for_test_typed::>(); - if merge_ops.is_empty() { + let merge_tasks = merge_task_inbox.drain_for_test_typed::(); + if merge_tasks.is_empty() { break; } - let new_splits: Vec = merge_ops + let new_splits: Vec = merge_tasks .into_iter() .map(|merge_op| apply_merge(&merge_policy, &mut split_index, &merge_op)) .collect(); diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index 84367bcc701..e971abb71d4 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -699,6 +699,24 @@ mod tests { } } + #[tokio::test] + async fn test_simulate_stable_log_merge_planner_edge_case() { + let merge_policy = StableLogMergePolicy::default(); + let batch_num_docs = vec![ + 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, + ]; + aux_test_simulate_merge_planner_num_docs( + Arc::new(merge_policy.clone()), + &batch_num_docs, + |splits| { + let num_docs = splits.iter().map(|split| split.num_docs as u64).sum(); + assert!(splits.len() <= merge_policy.max_num_splits_worst_case(num_docs)); + }, + ) + .await + .unwrap(); + } + #[tokio::test] async fn test_simulate_stable_log_merge_planner_ideal_case() -> anyhow::Result<()> { let merge_policy = StableLogMergePolicy::default(); diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index 071a605efd7..e24cc3872d0 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -28,6 +28,8 @@ pub struct IndexerMetrics { pub backpressure_micros: IntCounterVec<1>, pub available_concurrent_upload_permits: IntGaugeVec<1>, pub ongoing_merge_operations: IntGauge, + pub pending_merge_operations: IntGauge, + pub pending_merge_bytes: IntGauge, } impl Default for IndexerMetrics { @@ -65,6 +67,16 @@ impl Default for IndexerMetrics { "Number of ongoing merge operations", "quickwit_indexing", ), + pending_merge_operations: new_gauge( + "pending_merge_operations", + "Number of pending merge operations", + "quickwit_indexing", + ), + pending_merge_bytes: new_gauge( + "pending_merge_bytes", + "Number of pending merge bytes", + "quickwit_indexing", + ), } } } diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 5e48f9ebd60..dfd985a30e4 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -26,11 +26,11 @@ use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::types::{IndexUid, PublishToken}; use tantivy::directory::MmapDirectory; -use tantivy::{IndexBuilder, TrackedObject}; +use tantivy::IndexBuilder; use tracing::{instrument, Span}; use crate::controlled_directory::ControlledDirectory; -use crate::merge_policy::MergeOperation; +use crate::merge_policy::MergeTask; use crate::models::{PublishLock, SplitAttrs}; use crate::new_split_id; @@ -157,11 +157,11 @@ pub struct IndexedSplitBatch { pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, pub publish_token_opt: Option, - /// A [`MergeOperation`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` + /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. /// If `None`, the split batch was built in the `IndexingPipeline`. - pub merge_operation_opt: Option>, + pub merge_task_opt: Option, pub batch_parent_span: Span, } diff --git a/quickwit/quickwit-indexing/src/models/merge_scratch.rs b/quickwit/quickwit-indexing/src/models/merge_scratch.rs index 146d98ed1c4..8a718d440b1 100644 --- a/quickwit/quickwit-indexing/src/models/merge_scratch.rs +++ b/quickwit/quickwit-indexing/src/models/merge_scratch.rs @@ -18,15 +18,15 @@ // along with this program. If not, see . use quickwit_common::temp_dir::TempDirectory; -use tantivy::{Directory, TrackedObject}; +use tantivy::Directory; -use crate::merge_policy::MergeOperation; +use crate::merge_policy::MergeTask; #[derive(Debug)] pub struct MergeScratch { - /// A [`MergeOperation`] tracked by either the `MergePlannner` or the `DeleteTaksPlanner` + /// A [`MergeTask`] tracked by either the `MergePlannner` or the `DeleteTaksPlanner` /// See planners docs to understand the usage. - pub merge_operation: TrackedObject, + pub merge_task: MergeTask, /// Scratch directory for computing the merge. pub merge_scratch_directory: TempDirectory, pub downloaded_splits_directory: TempDirectory, diff --git a/quickwit/quickwit-indexing/src/models/packaged_split.rs b/quickwit/quickwit-indexing/src/models/packaged_split.rs index 0214ec14ee2..078c81a75cc 100644 --- a/quickwit/quickwit-indexing/src/models/packaged_split.rs +++ b/quickwit/quickwit-indexing/src/models/packaged_split.rs @@ -24,10 +24,9 @@ use itertools::Itertools; use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::types::{IndexUid, PublishToken, SplitId}; -use tantivy::TrackedObject; use tracing::Span; -use crate::merge_policy::MergeOperation; +use crate::merge_policy::MergeTask; use crate::models::{PublishLock, SplitAttrs}; pub struct PackagedSplit { @@ -66,11 +65,11 @@ pub struct PackagedSplitBatch { pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, pub publish_token_opt: Option, - /// A [`MergeOperation`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` + /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. /// If `None`, the split batch was built in the `IndexingPipeline`. - pub merge_operation_opt: Option>, + pub merge_task_opt: Option, pub batch_parent_span: Span, } @@ -84,7 +83,7 @@ impl PackagedSplitBatch { checkpoint_delta_opt: Option, publish_lock: PublishLock, publish_token_opt: Option, - merge_operation_opt: Option>, + merge_task_opt: Option, batch_parent_span: Span, ) -> Self { assert!(!splits.is_empty()); @@ -100,7 +99,7 @@ impl PackagedSplitBatch { checkpoint_delta_opt, publish_lock, publish_token_opt, - merge_operation_opt, + merge_task_opt, batch_parent_span, } } diff --git a/quickwit/quickwit-indexing/src/models/publisher_message.rs b/quickwit/quickwit-indexing/src/models/publisher_message.rs index c79a3792339..5e3ce1dedae 100644 --- a/quickwit/quickwit-indexing/src/models/publisher_message.rs +++ b/quickwit/quickwit-indexing/src/models/publisher_message.rs @@ -23,10 +23,9 @@ use itertools::Itertools; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_metastore::SplitMetadata; use quickwit_proto::types::{IndexUid, PublishToken}; -use tantivy::TrackedObject; use tracing::Span; -use crate::merge_policy::MergeOperation; +use crate::merge_policy::MergeTask; use crate::models::PublishLock; pub struct SplitsUpdate { @@ -36,11 +35,11 @@ pub struct SplitsUpdate { pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, pub publish_token_opt: Option, - /// A [`MergeOperation`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` + /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. /// If `None`, the split batch was built in the `IndexingPipeline`. - pub merge_operation: Option>, + pub merge_task: Option, pub parent_span: Span, } diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index aad549fbe29..bf91c453d56 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -109,6 +109,7 @@ impl TestSandbox { .into(); let storage = storage_resolver.resolve(&index_uri).await?; let universe = Universe::with_accelerated_time(); + let merge_scheduler_mailbox = universe.get_or_spawn_one(); let queues_dir_path = temp_dir.path().join(QUEUES_DIR_NAME); let ingest_api_service = init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()).await?; @@ -120,6 +121,7 @@ impl TestSandbox { cluster, metastore.clone(), Some(ingest_api_service), + merge_scheduler_mailbox, IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), diff --git a/quickwit/quickwit-jaeger/src/integration_tests.rs b/quickwit/quickwit-jaeger/src/integration_tests.rs index 0136a4d16cf..95412178fc7 100644 --- a/quickwit/quickwit-jaeger/src/integration_tests.rs +++ b/quickwit/quickwit-jaeger/src/integration_tests.rs @@ -27,6 +27,7 @@ use quickwit_cluster::{create_cluster_for_test, ChannelTransport, Cluster}; use quickwit_common::pubsub::EventBroker; use quickwit_common::uri::Uri; use quickwit_config::{IndexerConfig, IngestApiConfig, JaegerConfig, SearcherConfig, SourceConfig}; +use quickwit_indexing::actors::MergeSchedulerService; use quickwit_indexing::models::SpawnPipeline; use quickwit_indexing::IndexingService; use quickwit_ingest::{ @@ -346,6 +347,7 @@ async fn indexer_for_test( cluster, metastore, Some(ingester_service), + universe.get_or_spawn_one::(), ingester_pool, storage_resolver, EventBroker::default(), diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 278a8efd22b..75a0f737ef2 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -23,7 +23,8 @@ use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Supervisor, SupervisorState, + Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, + SupervisorState, }; use quickwit_common::io::IoControls; use quickwit_common::pubsub::EventBroker; @@ -31,8 +32,8 @@ use quickwit_common::temp_dir::{self}; use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; use quickwit_indexing::actors::{ - MergeExecutor, MergeSplitDownloader, Packager, Publisher, PublisherCounters, Uploader, - UploaderCounters, UploaderType, + MergeExecutor, MergeSchedulerService, MergeSplitDownloader, Packager, Publisher, + PublisherCounters, Uploader, UploaderCounters, UploaderType, }; use quickwit_indexing::merge_policy::merge_policy_from_settings; use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; @@ -86,6 +87,7 @@ pub struct DeleteTaskPipeline { handles: Option, max_concurrent_split_uploads: usize, state: DeleteTaskPipelineState, + merge_scheduler_service: Mailbox, event_broker: EventBroker, } @@ -127,6 +129,7 @@ impl Actor for DeleteTaskPipeline { } impl DeleteTaskPipeline { + #[allow(clippy::too_many_arguments)] pub fn new( index_uid: IndexUid, metastore: MetastoreServiceClient, @@ -134,6 +137,7 @@ impl DeleteTaskPipeline { index_storage: Arc, delete_service_task_dir: PathBuf, max_concurrent_split_uploads: usize, + merge_scheduler_service: Mailbox, event_broker: EventBroker, ) -> Self { Self { @@ -145,6 +149,7 @@ impl DeleteTaskPipeline { handles: Default::default(), max_concurrent_split_uploads, state: DeleteTaskPipelineState::default(), + merge_scheduler_service, event_broker, } } @@ -230,6 +235,7 @@ impl DeleteTaskPipeline { self.metastore.clone(), self.search_job_placer.clone(), downloader_mailbox, + self.merge_scheduler_service.clone(), ); let (_, task_planner_supervisor_handler) = ctx.spawn_actor().supervise(task_planner); self.handles = Some(DeletePipelineHandle { @@ -279,9 +285,10 @@ impl Handler for DeleteTaskPipeline { #[cfg(test)] mod tests { use async_trait::async_trait; - use quickwit_actors::Handler; + use quickwit_actors::{Handler, Universe}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; + use quickwit_indexing::actors::MergeSchedulerService; use quickwit_indexing::TestSandbox; use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitState}; use quickwit_proto::metastore::{DeleteQuery, ListSplitsRequest, MetastoreService}; @@ -336,6 +343,8 @@ mod tests { ) .await .unwrap(); + let universe: &Universe = test_sandbox.universe(); + let merge_scheduler_service = universe.get_or_spawn_one::(); let index_uid = test_sandbox.index_uid(); let docs = vec![ serde_json::json!({"body": "info", "ts": 0 }), @@ -386,19 +395,16 @@ mod tests { test_sandbox.storage(), delete_service_task_dir.path().into(), 4, + merge_scheduler_service, EventBroker::default(), ); - let (pipeline_mailbox, pipeline_handler) = - test_sandbox.universe().spawn_builder().spawn(pipeline); + let (pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); // Ensure that the message sent by initialize method is processed. let _ = pipeline_handler.process_pending_and_observe().await.state; // Pipeline will first fail and we need to wait a OBSERVE_PIPELINE_INTERVAL * some number // for the pipeline state to be updated. - test_sandbox - .universe() - .sleep(OBSERVE_PIPELINE_INTERVAL * 5) - .await; + universe.sleep(OBSERVE_PIPELINE_INTERVAL * 5).await; let pipeline_state = pipeline_handler.process_pending_and_observe().await.state; assert_eq!(pipeline_state.delete_task_planner.metrics.num_errors, 1); assert_eq!(pipeline_state.downloader.metrics.num_errors, 0); @@ -440,6 +446,8 @@ mod tests { let test_sandbox = TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["body"]) .await .unwrap(); + let universe: &Universe = test_sandbox.universe(); + let merge_scheduler_mailbox = universe.get_or_spawn_one::(); let metastore = test_sandbox.metastore(); let mut mock_search_service = MockSearchService::new(); mock_search_service @@ -468,16 +476,13 @@ mod tests { test_sandbox.storage(), delete_service_task_dir.path().into(), 4, + merge_scheduler_mailbox, EventBroker::default(), ); - let (_pipeline_mailbox, pipeline_handler) = - test_sandbox.universe().spawn_builder().spawn(pipeline); + let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); pipeline_handler.quit().await; - let observations = test_sandbox - .universe() - .observe(OBSERVE_PIPELINE_INTERVAL) - .await; + let observations = universe.observe(OBSERVE_PIPELINE_INTERVAL).await; assert!(observations.into_iter().all( |observation| observation.type_name != std::any::type_name::() )); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 82669ccdf95..d592f9c2a22 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -28,7 +28,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu use quickwit_common::extract_time_range; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; -use quickwit_indexing::actors::MergeSplitDownloader; +use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; use quickwit_indexing::merge_policy::MergeOperation; use quickwit_metastore::{split_tag_filter, split_time_range_filter, ListSplitsResponseExt, Split}; use quickwit_proto::metastore::{ @@ -83,6 +83,7 @@ pub struct DeleteTaskPlanner { metastore: MetastoreServiceClient, search_job_placer: SearchJobPlacer, merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, /// Inventory of ongoing delete operations. If everything goes well, /// a merge operation is dropped after the publish of the split that underwent /// the delete operation. @@ -127,6 +128,7 @@ impl DeleteTaskPlanner { metastore: MetastoreServiceClient, search_job_placer: SearchJobPlacer, merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, ) -> Self { Self { index_uid, @@ -135,6 +137,7 @@ impl DeleteTaskPlanner { metastore, search_job_placer, merge_split_downloader_mailbox, + merge_scheduler_service, ongoing_delete_operations_inventory: Inventory::new(), } } @@ -200,9 +203,10 @@ impl DeleteTaskPlanner { let tracked_delete_operation = self .ongoing_delete_operations_inventory .track(delete_operation); - ctx.send_message( - &self.merge_split_downloader_mailbox, + schedule_merge( + &self.merge_scheduler_service, tracked_delete_operation, + self.merge_split_downloader_mailbox.clone(), ) .await?; JANITOR_METRICS @@ -422,7 +426,7 @@ impl Handler for DeleteTaskPlanner { #[cfg(test)] mod tests { use quickwit_config::build_doc_mapper; - use quickwit_indexing::merge_policy::MergeOperation; + use quickwit_indexing::merge_policy::MergeTask; use quickwit_indexing::TestSandbox; use quickwit_metastore::{ IndexMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, @@ -431,7 +435,6 @@ mod tests { use quickwit_proto::metastore::{DeleteQuery, IndexMetadataRequest, ListSplitsRequest}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; use quickwit_search::{searcher_pool_for_test, MockSearchService}; - use tantivy::TrackedObject; use super::*; @@ -458,6 +461,7 @@ mod tests { &["body"], ) .await?; + let universe = test_sandbox.universe(); let docs = [ serde_json::json!({"body": "info", "ts": 0 }), serde_json::json!({"body": "info", "ts": 0 }), @@ -537,12 +541,14 @@ mod tests { let searcher_pool = searcher_pool_for_test([("127.0.0.1:1000", mock_search_service)]); let search_job_placer = SearchJobPlacer::new(searcher_pool); let (downloader_mailbox, downloader_inbox) = test_sandbox.universe().create_test_mailbox(); + let (merge_split_downloader_mailbox, _) = universe.create_test_mailbox(); let delete_planner_executor = DeleteTaskPlanner::new( index_uid.clone(), index_config.index_uri.clone(), doc_mapper_str, metastore.clone(), search_job_placer, + merge_split_downloader_mailbox, downloader_mailbox, ); let (delete_planner_mailbox, delete_planner_handle) = test_sandbox @@ -550,8 +556,7 @@ mod tests { .spawn_builder() .spawn(delete_planner_executor); delete_planner_handle.process_pending_and_observe().await; - let downloader_msgs: Vec> = - downloader_inbox.drain_for_test_typed(); + let downloader_msgs: Vec = downloader_inbox.drain_for_test_typed(); assert_eq!(downloader_msgs.len(), 1); // The last split will undergo a delete operation. assert_eq!( @@ -585,8 +590,7 @@ mod tests { .ask(PlanDeleteOperations) .await .unwrap(); - let downloader_last_msgs = - downloader_inbox.drain_for_test_typed::>(); + let downloader_last_msgs = downloader_inbox.drain_for_test_typed::(); assert_eq!(downloader_last_msgs.len(), 1); assert_eq!( downloader_last_msgs[0].splits[0].split_id(), diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs index 514e1fb69a1..ff943ad91c2 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs @@ -22,10 +22,11 @@ use std::path::PathBuf; use std::time::Duration; use async_trait::async_trait; -use quickwit_actors::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler}; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::{self}; use quickwit_config::IndexConfig; +use quickwit_indexing::actors::MergeSchedulerService; use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -61,6 +62,7 @@ pub struct DeleteTaskService { pipeline_handles_by_index_uid: HashMap>, max_concurrent_split_uploads: usize, event_broker: EventBroker, + merge_scheduler_service: Mailbox, } impl DeleteTaskService { @@ -70,6 +72,7 @@ impl DeleteTaskService { storage_resolver: StorageResolver, data_dir_path: PathBuf, max_concurrent_split_uploads: usize, + merge_scheduler_service: Mailbox, event_broker: EventBroker, ) -> anyhow::Result { let delete_service_task_path = data_dir_path.join(DELETE_SERVICE_TASK_DIR_NAME); @@ -82,6 +85,7 @@ impl DeleteTaskService { delete_service_task_dir, pipeline_handles_by_index_uid: Default::default(), max_concurrent_split_uploads, + merge_scheduler_service, event_broker, }) } @@ -180,6 +184,7 @@ impl DeleteTaskService { index_storage, self.delete_service_task_dir.clone(), self.max_concurrent_split_uploads, + self.merge_scheduler_service.clone(), self.event_broker.clone(), ); let (_pipeline_mailbox, pipeline_handler) = ctx.spawn_actor().spawn(pipeline); @@ -212,6 +217,7 @@ impl Handler for DeleteTaskService { #[cfg(test)] mod tests { + use quickwit_actors::Universe; use quickwit_common::pubsub::EventBroker; use quickwit_indexing::TestSandbox; use quickwit_proto::metastore::{ @@ -242,20 +248,20 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let temp_dir = tempfile::tempdir().unwrap(); let data_dir_path = temp_dir.path().to_path_buf(); + let universe: &Universe = test_sandbox.universe(); let delete_task_service = DeleteTaskService::new( metastore.clone(), search_job_placer, StorageResolver::unconfigured(), data_dir_path, 4, + universe.get_or_spawn_one(), EventBroker::default(), ) .await .unwrap(); - let (_delete_task_service_mailbox, delete_task_service_handler) = test_sandbox - .universe() - .spawn_builder() - .spawn(delete_task_service); + let (_delete_task_service_mailbox, delete_task_service_handler) = + universe.spawn_builder().spawn(delete_task_service); let state = delete_task_service_handler .process_pending_and_observe() .await; @@ -283,32 +289,20 @@ mod tests { }) .await .unwrap(); - test_sandbox - .universe() - .sleep(UPDATE_PIPELINES_INTERVAL * 2) - .await; + universe.sleep(UPDATE_PIPELINES_INTERVAL * 2).await; let state_after_deletion = delete_task_service_handler .process_pending_and_observe() .await; assert_eq!(state_after_deletion.num_running_pipelines, 0); - assert!(test_sandbox - .universe() - .get_one::() - .is_some()); - let actors_observations = test_sandbox - .universe() - .observe(UPDATE_PIPELINES_INTERVAL) - .await; + assert!(universe.get_one::().is_some()); + let actors_observations = universe.observe(UPDATE_PIPELINES_INTERVAL).await; assert!( actors_observations .into_iter() .any(|observation| observation.type_name == std::any::type_name::()) ); - assert!(test_sandbox - .universe() - .get_one::() - .is_some()); + assert!(universe.get_one::().is_some()); test_sandbox.assert_quit().await; Ok(()) } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index 9aa52b70789..70783d860db 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -22,6 +22,7 @@ use quickwit_actors::{Mailbox, Universe}; use quickwit_common::pubsub::EventBroker; use quickwit_config::NodeConfig; +use quickwit_indexing::actors::MergeSchedulerService; use quickwit_metastore::SplitInfo; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::SearchJobPlacer; @@ -64,6 +65,7 @@ pub async fn start_janitor_service( storage_resolver, config.data_dir_path.clone(), config.indexer_config.max_concurrent_split_uploads, + universe.get_or_spawn_one::(), event_broker, ) .await?; diff --git a/quickwit/quickwit-lambda/src/indexer/ingest.rs b/quickwit/quickwit-lambda/src/indexer/ingest.rs index e1a37bec90f..9808ff15662 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest.rs @@ -38,7 +38,7 @@ use quickwit_config::{ SourceConfig, SourceInputFormat, SourceParams, TransformConfig, CLI_INGEST_SOURCE_ID, }; use quickwit_index_management::{clear_cache_directory, IndexService}; -use quickwit_indexing::actors::{IndexingService, MergePipelineId}; +use quickwit_indexing::actors::{IndexingService, MergePipelineId, MergeSchedulerService}; use quickwit_indexing::models::{ DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, }; @@ -197,6 +197,11 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { runtimes_config, &HashSet::from_iter([QuickwitService::Indexer]), )?; + let merge_scheduler_service = + MergeSchedulerService::new(indexer_config.merge_concurrency.get()); + let universe = Universe::new(); + let (merge_scheduler_service_mailbox, _) = + universe.spawn_builder().spawn(merge_scheduler_service); let indexing_server = IndexingService::new( config.node_id.clone(), config.data_dir_path.clone(), @@ -205,12 +210,12 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { cluster, metastore, None, + merge_scheduler_service_mailbox, IngesterPool::default(), storage_resolver, EventBroker::default(), ) .await?; - let universe = Universe::new(); let (indexing_server_mailbox, indexing_server_handle) = universe.spawn_builder().spawn(indexing_server); let pipeline_id = indexing_server_mailbox diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index b83bacc86cf..15b9b0cae7e 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -133,8 +133,9 @@ pub struct SplitMetadata { /// this split. pub num_merge_ops: usize, } + impl fmt::Debug for SplitMetadata { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut debug_struct = f.debug_struct("SplitMetadata"); debug_struct.field("split_id", &self.split_id); debug_struct.field("index_uid", &self.index_uid); @@ -219,8 +220,8 @@ impl SplitMetadata { #[cfg(any(test, feature = "testsuite"))] /// Returns an instance of `SplitMetadata` for testing. - pub fn for_test(split_id: String) -> Self { - Self { + pub fn for_test(split_id: String) -> SplitMetadata { + SplitMetadata { split_id, ..Default::default() } diff --git a/quickwit/quickwit-query/src/tokenizers/chinese_compatible.rs b/quickwit/quickwit-query/src/tokenizers/chinese_compatible.rs index 7f995070e7e..6757850d793 100644 --- a/quickwit/quickwit-query/src/tokenizers/chinese_compatible.rs +++ b/quickwit/quickwit-query/src/tokenizers/chinese_compatible.rs @@ -205,7 +205,7 @@ mod tests { }, ]; - assert_eq!(dbg!(res), dbg!(expected)); + assert_eq!(res, expected); } #[test] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index aeef3b3b8c8..a5c074c7301 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -77,8 +77,8 @@ use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, - GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, IngestServiceClient, - Ingester, IngesterPool, LocalShardsUpdate, + GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, + LocalShardsUpdate, }; use quickwit_jaeger::JaegerService; use quickwit_janitor::{start_janitor_service, JanitorService}; @@ -387,16 +387,12 @@ pub async fn serve_quickwit( let ingest_service = start_ingest_client_if_needed(&node_config, &universe, &cluster).await?; let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { - let ingest_api_service: Mailbox = universe - .get_one() - .context("Ingest API Service should have been started.")?; let indexing_service = start_indexing_service( &universe, &node_config, runtimes_config.num_threads_blocking, cluster.clone(), metastore_through_control_plane.clone(), - ingest_api_service.clone(), ingester_pool.clone(), storage_resolver.clone(), event_broker.clone(),