Skip to content

Commit

Permalink
config: add memory limit options for the compaction scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Dec 12, 2022
1 parent c8fe634 commit f429adc
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use async_trait::async_trait;
use common_types::{request_id::RequestId, time::Timestamp};
use common_util::{
config::ReadableDuration,
config::{ReadableDuration, ReadableSize},
define_result,
runtime::{JoinHandle, Runtime},
time::DurationExt,
Expand Down Expand Up @@ -61,6 +61,7 @@ pub struct SchedulerConfig {
pub schedule_interval: ReadableDuration,
pub max_ongoing_tasks: usize,
pub max_unflushed_duration: ReadableDuration,
pub memory_limit: ReadableSize,
}

// TODO(boyan), a better default value?
Expand All @@ -76,6 +77,7 @@ impl Default for SchedulerConfig {
max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS,
// flush_interval default is 5h.
max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)),
memory_limit: ReadableSize::gb(4),
}
}
}
Expand Down Expand Up @@ -305,7 +307,7 @@ impl SchedulerImpl {
request_buf: RwLock::new(RequestQueue::default()),
}),
running: running.clone(),
memory_limit: MemoryLimit::new(4 * 1024 * 1024),
memory_limit: MemoryLimit::new(config.memory_limit.as_bytes() as usize),
};

let handle = runtime.spawn(async move {
Expand Down Expand Up @@ -447,7 +449,7 @@ impl ScheduleWorker {
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
// Mark files are in compaction.
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
Expand Down

0 comments on commit f429adc

Please sign in to comment.