Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/4469 limit concurrent merges #4473

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,20 @@ impl<A: Actor> Inbox<A> {
self.rx.try_recv()
}

pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
while let Ok(mut envelope) = self.rx.recv().await {
if let Some(msg) = envelope.message_typed() {
return Some(msg);
#[cfg(any(test, feature = "testsuite"))]
pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> {
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
loop {
match self.rx.recv().await {
Ok(mut envelope) => {
if let Some(msg) = envelope.message_typed() {
return Ok(msg);
}
}
Err(err) => {
return Err(err);
}
}
}
None
}

/// Destroys the inbox and returns the list of pending messages or commands
Expand Down
47 changes: 47 additions & 0 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops::Mul;
use std::time::Duration;

use async_trait::async_trait;
use quickwit_common::new_coolid;
use serde::Serialize;

use crate::observation::ObservationType;
Expand Down Expand Up @@ -725,3 +726,49 @@ async fn test_unsync_actor_message() {

universe.assert_quit().await;
}

struct FakeActorService {
// We use a cool id to make sure in the test that we get twice the same instance.
cool_id: String,
}

#[derive(Debug)]
struct GetCoolId;

impl Actor for FakeActorService {
type ObservableState = ();

fn observable_state(&self) {}
}

#[async_trait]
impl Handler<GetCoolId> for FakeActorService {
type Reply = String;

async fn handle(
&mut self,
_: GetCoolId,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.cool_id.clone())
}
}

impl Default for FakeActorService {
fn default() -> Self {
FakeActorService {
cool_id: new_coolid("fake-actor"),
}
}
}

#[tokio::test]
async fn test_get_or_spawn() {
let universe = Universe::new();
let mailbox1: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id1 = mailbox1.ask(GetCoolId).await.unwrap();
let mailbox2: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id2 = mailbox2.ask(GetCoolId).await.unwrap();
assert_eq!(id1, id2);
universe.assert_quit().await;
}
10 changes: 10 additions & 0 deletions quickwit/quickwit-actors/src/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl Universe {
self.spawn_ctx.registry.get_one::<A>()
}

pub fn get_or_spawn_one<A: Actor + Default>(&self) -> Mailbox<A> {
if let Some(actor_mailbox) = self.spawn_ctx.registry.get_one::<A>() {
actor_mailbox
} else {
let actor_default = A::default();
let (mailbox, _handler) = self.spawn_builder().spawn(actor_default);
mailbox
}
}

pub async fn observe(&self, timeout: Duration) -> Vec<ActorObservation> {
self.spawn_ctx.registry.observe(timeout).await
}
Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe};
use quickwit_cluster::{ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::runtimes::RuntimesConfig;
Expand All @@ -40,7 +40,9 @@ use quickwit_config::{
VecSourceParams, CLI_INGEST_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergePipelineId};
use quickwit_indexing::actors::{
IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService,
};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand Down Expand Up @@ -451,6 +453,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -459,12 +463,12 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
merge_scheduler_service_mailbox,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
)
.await?;
let universe = Universe::new();
let (indexing_server_mailbox, indexing_server_handle) =
universe.spawn_builder().spawn(indexing_server);
let pipeline_id = indexing_server_mailbox
Expand Down Expand Up @@ -580,10 +584,9 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let indexer_config = IndexerConfig::default();
let universe = Universe::new();
let indexer_config = IndexerConfig {
..Default::default()
};
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id,
config.data_dir_path,
Expand All @@ -592,6 +595,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
cluster,
metastore,
None,
merge_scheduler_service,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8,
"max_merge_write_throughput": "100mb"
"max_merge_write_throughput": "100mb",
"merge_concurrency": 2
},
"ingest_api": {
"replication_factor": 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ split_store_max_num_bytes = "1T"
split_store_max_num_splits = 10_000
max_concurrent_split_uploads = 8
max_merge_write_throughput = "100mb"
merge_concurrency = 2

[ingest_api]
replication_factor = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ indexer:
split_store_max_num_splits: 10000
max_concurrent_split_uploads: 8
max_merge_write_throughput: 100mb
merge_concurrency: 2

ingest_api:
replication_factor: 2
Expand Down
27 changes: 27 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct IndexerConfig {
/// does not starve indexing itself (as it is a latency sensitive operation).
#[serde(default)]
pub max_merge_write_throughput: Option<ByteSize>,
/// Maximum number of merge or delete operation that can be executed concurrently.
/// (defaults to num_cpu / 2).
#[serde(default = "IndexerConfig::default_merge_concurrency")]
pub merge_concurrency: NonZeroUsize,
/// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry
/// Protocol (OTLP).
#[serde(default = "IndexerConfig::default_enable_otlp_endpoint")]
Expand Down Expand Up @@ -134,6 +138,10 @@ impl IndexerConfig {
1_000
}

pub fn default_merge_concurrency() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get() / 2).unwrap_or(NonZeroUsize::new(1).unwrap())
}

fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}
Expand All @@ -149,6 +157,7 @@ impl IndexerConfig {
max_concurrent_split_uploads: 4,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
merge_concurrency: NonZeroUsize::new(3).unwrap(),
};
Ok(indexer_config)
}
Expand All @@ -163,6 +172,7 @@ impl Default for IndexerConfig {
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
cpu_capacity: Self::default_cpu_capacity(),
merge_concurrency: Self::default_merge_concurrency(),
max_merge_write_throughput: None,
}
}
Expand Down Expand Up @@ -476,6 +486,23 @@ mod tests {
"1500m"
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap();
assert_eq!(
indexer_config.merge_concurrency,
NonZeroUsize::new(5).unwrap()
);
let indexer_config_json = serde_json::to_value(&indexer_config).unwrap();
assert_eq!(
indexer_config_json
.get("merge_concurrency")
.unwrap()
.as_u64()
.unwrap(),
5
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ pub fn node_config_for_test() -> NodeConfig {
mod tests {
use std::env;
use std::net::Ipv4Addr;
use std::num::NonZeroU64;
use std::num::{NonZeroU64, NonZeroUsize};
use std::path::Path;

use bytesize::ByteSize;
Expand Down Expand Up @@ -554,6 +554,7 @@ mod tests {
split_store_max_num_bytes: ByteSize::tb(1),
split_store_max_num_splits: 10_000,
max_concurrent_split_uploads: 8,
merge_concurrency: NonZeroUsize::new(2).unwrap(),
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
max_merge_write_throughput: Some(ByteSize::mb(100)),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
checkpoint_delta_opt: batch_builder.checkpoint_delta_opt,
publish_lock: batch_builder.publish_lock,
publish_token_opt: batch_builder.publish_token_opt,
merge_operation_opt: None,
merge_task_opt: None,
batch_parent_span: batch_builder.batch_parent_span,
};
ctx.send_message(&self.packager_mailbox, indexed_split_batch)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ mod tests {
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
event_broker: Default::default(),
};
let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx());
Expand Down
12 changes: 11 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
use super::{MergePlanner, MergeSchedulerService};
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
Expand Down Expand Up @@ -121,6 +121,7 @@ pub struct IndexingService {
cluster: Cluster,
metastore: MetastoreServiceClient,
ingest_api_service_opt: Option<Mailbox<IngestApiService>>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
indexing_pipelines: HashMap<PipelineUid, PipelineHandle>,
Expand Down Expand Up @@ -154,6 +155,7 @@ impl IndexingService {
cluster: Cluster,
metastore: MetastoreServiceClient,
ingest_api_service_opt: Option<Mailbox<IngestApiService>>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
ingester_pool: IngesterPool,
storage_resolver: StorageResolver,
event_broker: EventBroker,
Expand Down Expand Up @@ -182,6 +184,7 @@ impl IndexingService {
cluster,
metastore,
ingest_api_service_opt,
merge_scheduler_service,
ingester_pool,
storage_resolver,
local_split_store: Arc::new(local_split_store),
Expand Down Expand Up @@ -297,6 +300,7 @@ impl IndexingService {
indexing_directory: indexing_directory.clone(),
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
Expand Down Expand Up @@ -893,6 +897,7 @@ mod tests {
init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default())
.await
.unwrap();
let merge_scheduler_mailbox: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
"test-node".to_string(),
data_dir_path.to_path_buf(),
Expand All @@ -901,6 +906,7 @@ mod tests {
cluster,
metastore,
Some(ingest_api_service),
merge_scheduler_mailbox,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down Expand Up @@ -1345,6 +1351,7 @@ mod tests {
init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default())
.await
.unwrap();
let merge_scheduler_service = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
"test-node".to_string(),
data_dir_path,
Expand All @@ -1353,6 +1360,7 @@ mod tests {
cluster.clone(),
metastore.clone(),
Some(ingest_api_service),
merge_scheduler_service,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down Expand Up @@ -1548,6 +1556,7 @@ mod tests {
let indexer_config = IndexerConfig::for_test().unwrap();
let num_blocking_threads = 1;
let storage_resolver = StorageResolver::unconfigured();
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let mut indexing_server = IndexingService::new(
"test-ingest-api-gc-node".to_string(),
data_dir_path,
Expand All @@ -1556,6 +1565,7 @@ mod tests {
cluster.clone(),
metastore.clone(),
Some(ingest_api_service.clone()),
merge_scheduler_service,
IngesterPool::default(),
storage_resolver.clone(),
EventBroker::default(),
Expand Down
Loading
Loading