Skip to content

Commit

Permalink
Introducing the merge scheduler to schedule merge operation in a global
Browse files Browse the repository at this point in the history
manner.

Closes #4469 #4471
  • Loading branch information
fulmicoton committed Jan 30, 2024
1 parent a3cad8f commit 6f83d6f
Show file tree
Hide file tree
Showing 38 changed files with 941 additions and 630 deletions.
16 changes: 11 additions & 5 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,19 @@ impl<A: Actor> Inbox<A> {
self.rx.try_recv()
}

pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
while let Ok(mut envelope) = self.rx.recv().await {
if let Some(msg) = envelope.message_typed() {
return Some(msg);
pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> {
loop {
match self.rx.recv().await {
Ok(mut envelope) => {
if let Some(msg) = envelope.message_typed() {
return Ok(msg);
}
}
Err(err) => {
return Err(err);
}
}
}
None
}

/// Destroys the inbox and returns the list of pending messages or commands
Expand Down
47 changes: 47 additions & 0 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops::Mul;
use std::time::Duration;

use async_trait::async_trait;
use quickwit_common::new_coolid;
use serde::Serialize;

use crate::observation::ObservationType;
Expand Down Expand Up @@ -725,3 +726,49 @@ async fn test_unsync_actor_message() {

universe.assert_quit().await;
}

struct FakeActorService {
// We use a cool id to make sure in the test that we get twice the same instance.
cool_id: String,
}

#[derive(Debug)]
struct GetCoolId;

impl Actor for FakeActorService {
type ObservableState = ();

fn observable_state(&self) {}
}

#[async_trait]
impl Handler<GetCoolId> for FakeActorService {
type Reply = String;

async fn handle(
&mut self,
_: GetCoolId,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.cool_id.clone())
}
}

impl Default for FakeActorService {
fn default() -> Self {
FakeActorService {
cool_id: new_coolid("fake-actor"),
}
}
}

#[tokio::test]
async fn test_get_or_spawn() {
let universe = Universe::new();
let mailbox1: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id1 = mailbox1.ask(GetCoolId).await.unwrap();
let mailbox2: Mailbox<FakeActorService> = universe.get_or_spawn_one();
let id2 = mailbox2.ask(GetCoolId).await.unwrap();
assert_eq!(id1, id2);
universe.assert_quit().await;
}
10 changes: 10 additions & 0 deletions quickwit/quickwit-actors/src/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl Universe {
self.spawn_ctx.registry.get_one::<A>()
}

pub fn get_or_spawn_one<A: Actor + Default>(&self) -> Mailbox<A> {
if let Some(actor_mailbox) = self.spawn_ctx.registry.get_one::<A>() {
actor_mailbox
} else {
let actor_default = A::default();
let (mailbox, _handler) = self.spawn_builder().spawn(actor_default);
mailbox
}
}

pub async fn observe(&self, timeout: Duration) -> Vec<ActorObservation> {
self.spawn_ctx.registry.observe(timeout).await
}
Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe};
use quickwit_cluster::{ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::runtimes::RuntimesConfig;
Expand All @@ -40,7 +40,9 @@ use quickwit_config::{
VecSourceParams, CLI_INGEST_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergePipelineId};
use quickwit_indexing::actors::{
IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService,
};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand Down Expand Up @@ -451,6 +453,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -459,12 +463,12 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
merge_scheduler_service_mailbox,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
)
.await?;
let universe = Universe::new();
let (indexing_server_mailbox, indexing_server_handle) =
universe.spawn_builder().spawn(indexing_server);
let pipeline_id = indexing_server_mailbox
Expand Down Expand Up @@ -580,10 +584,9 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let indexer_config = IndexerConfig::default();
let universe = Universe::new();
let indexer_config = IndexerConfig {
..Default::default()
};
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id,
config.data_dir_path,
Expand All @@ -592,6 +595,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
cluster,
metastore,
None,
merge_scheduler_service,
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Expand Down
131 changes: 66 additions & 65 deletions quickwit/quickwit-config/resources/tests/config/quickwit.json
Original file line number Diff line number Diff line change
@@ -1,70 +1,71 @@
# Comments are supported.
{
"version": "0.7",
"cluster_id": "quickwit-cluster",
"node_id": "my-unique-node-id",
"enabled_services": [
"janitor",
"metastore"
],
"listen_address": "0.0.0.0",
"advertise_address": "172.0.0.12",
"gossip_listen_port": 2222,
"grpc_listen_port": 3333,
"peer_seeds": [
"quickwit-searcher-0.local",
"quickwit-searcher-1.local"
],
"data_dir": "/opt/quickwit/data",
"metastore_uri": "postgres://username:password@host:port/db",
"default_index_root_uri": "s3://quickwit-indexes",
"rest": {
"listen_port": 1111,
"extra_headers": {
"x-header-1": "header-value-1",
"x-header-2": "header-value-2"
}
},
"grpc": {
"max_message_size": "10 MB"
},
"storage": {
"azure": {
"account": "quickwit-dev"
},
"s3": {
"flavor": "gcs",
"endpoint": "http://localhost:4566",
"force_path_style_access": true
}
},
"metastore": {
"postgres": {
"max_num_connections": 12
}
},
"indexer": {
"enable_otlp_endpoint": true,
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8,
"max_merge_write_throughput": "100mb"
},
"ingest_api": {
"replication_factor": 2
},
"searcher": {
"aggregation_memory_limit": "1G",
"aggregation_bucket_limit": 500000,
"fast_field_cache_capacity": "10G",
"split_footer_cache_capacity": "1G",
"max_num_concurrent_split_streams": 120,
"max_num_concurrent_split_searches": 150
"version": "0.7",
"cluster_id": "quickwit-cluster",
"node_id": "my-unique-node-id",
"enabled_services": [
"janitor",
"metastore"
],
"listen_address": "0.0.0.0",
"advertise_address": "172.0.0.12",
"gossip_listen_port": 2222,
"grpc_listen_port": 3333,
"peer_seeds": [
"quickwit-searcher-0.local",
"quickwit-searcher-1.local"
],
"data_dir": "/opt/quickwit/data",
"metastore_uri": "postgres://username:password@host:port/db",
"default_index_root_uri": "s3://quickwit-indexes",
"rest": {
"listen_port": 1111,
"extra_headers": {
"x-header-1": "header-value-1",
"x-header-2": "header-value-2"
}
},
"grpc": {
"max_message_size": "10 MB"
},
"storage": {
"azure": {
"account": "quickwit-dev"
},
"jaeger": {
"enable_endpoint": true,
"lookback_period_hours": 24,
"max_trace_duration_secs": 600,
"max_fetch_spans": 1000
"s3": {
"flavor": "gcs",
"endpoint": "http://localhost:4566",
"force_path_style_access": true
}
},
"metastore": {
"postgres": {
"max_num_connections": 12
}
},
"indexer": {
"enable_otlp_endpoint": true,
"split_store_max_num_bytes": "1T",
"split_store_max_num_splits": 10000,
"max_concurrent_split_uploads": 8,
"max_merge_write_throughput": "100mb",
"merge_concurrency": 2
},
"ingest_api": {
"replication_factor": 2
},
"searcher": {
"aggregation_memory_limit": "1G",
"aggregation_bucket_limit": 500000,
"fast_field_cache_capacity": "10G",
"split_footer_cache_capacity": "1G",
"max_num_concurrent_split_streams": 120,
"max_num_concurrent_split_searches": 150
},
"jaeger": {
"enable_endpoint": true,
"lookback_period_hours": 24,
"max_trace_duration_secs": 600,
"max_fetch_spans": 1000
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-config/resources/tests/config/quickwit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ split_store_max_num_bytes = "1T"
split_store_max_num_splits = 10_000
max_concurrent_split_uploads = 8
max_merge_write_throughput = "100mb"
merge_concurrency = 2


[ingest_api]
replication_factor = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ indexer:
split_store_max_num_splits: 10000
max_concurrent_split_uploads: 8
max_merge_write_throughput: 100mb
merge_concurrency: 2

ingest_api:
replication_factor: 2
Expand Down
27 changes: 27 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct IndexerConfig {
/// does not starve indexing itself (as it is a latency sensitive operation).
#[serde(default)]
pub max_merge_write_throughput: Option<ByteSize>,
/// Maximum number of merge or delete operation that can be executed concurrently.
/// (defaults to num_cpu / 2).
#[serde(default = "IndexerConfig::default_merge_concurrency")]
pub merge_concurrency: NonZeroUsize,
/// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry
/// Protocol (OTLP).
#[serde(default = "IndexerConfig::default_enable_otlp_endpoint")]
Expand Down Expand Up @@ -134,6 +138,10 @@ impl IndexerConfig {
1_000
}

pub fn default_merge_concurrency() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get() / 2).unwrap_or(NonZeroUsize::new(1).unwrap())
}

fn default_cpu_capacity() -> CpuCapacity {
CpuCapacity::one_cpu_thread() * (num_cpus::get() as u32)
}
Expand All @@ -149,6 +157,7 @@ impl IndexerConfig {
max_concurrent_split_uploads: 4,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
merge_concurrency: NonZeroUsize::new(3).unwrap(),
};
Ok(indexer_config)
}
Expand All @@ -163,6 +172,7 @@ impl Default for IndexerConfig {
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
cpu_capacity: Self::default_cpu_capacity(),
merge_concurrency: Self::default_merge_concurrency(),
max_merge_write_throughput: None,
}
}
Expand Down Expand Up @@ -476,6 +486,23 @@ mod tests {
"1500m"
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap();
assert_eq!(
indexer_config.merge_concurrency,
NonZeroUsize::new(5).unwrap()
);
let indexer_config_json = serde_json::to_value(&indexer_config).unwrap();
assert_eq!(
indexer_config_json
.get("merge_concurrency")
.unwrap()
.as_u64()
.unwrap(),
5
);
}
{
let indexer_config: IndexerConfig =
serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ pub fn node_config_for_test() -> NodeConfig {
mod tests {
use std::env;
use std::net::Ipv4Addr;
use std::num::NonZeroU64;
use std::num::{NonZeroU64, NonZeroUsize};
use std::path::Path;

use bytesize::ByteSize;
Expand Down Expand Up @@ -554,6 +554,7 @@ mod tests {
split_store_max_num_bytes: ByteSize::tb(1),
split_store_max_num_splits: 10_000,
max_concurrent_split_uploads: 8,
merge_concurrency: NonZeroUsize::new(2).unwrap(),
cpu_capacity: IndexerConfig::default_cpu_capacity(),
enable_cooperative_indexing: false,
max_merge_write_throughput: Some(ByteSize::mb(100)),
Expand Down
Loading

0 comments on commit 6f83d6f

Please sign in to comment.