Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support memory usage limit on background compaction #476

Merged
merged 3 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ impl CompactionTask {
}
}
}

// Estimate the size of the total input files.
pub fn estimate_total_input_size(&self) -> usize {
let total_input_size: u64 = self
.compaction_inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}
}

pub struct PickerManager {
Expand Down
272 changes: 233 additions & 39 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
use async_trait::async_trait;
use common_types::{request_id::RequestId, time::Timestamp};
use common_util::{
config::ReadableDuration,
config::{ReadableDuration, ReadableSize},
define_result,
runtime::{JoinHandle, Runtime},
time::DurationExt,
Expand All @@ -26,7 +26,7 @@ use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
mpsc::{self, error::SendError, Receiver, Sender},
Mutex,
},
time,
Expand All @@ -37,7 +37,12 @@ use crate::{
metrics::COMPACTION_PENDING_REQUEST_GAUGE, picker::PickerContext, CompactionTask,
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::{flush_compaction::TableFlushOptions, Instance, SpaceStore},
instance::{
flush_compaction::{self, TableFlushOptions},
write_worker::CompactionNotifier,
Instance, SpaceStore,
},
table::data::TableDataRef,
TableOptions,
};

Expand All @@ -56,6 +61,7 @@ pub struct SchedulerConfig {
pub schedule_interval: ReadableDuration,
pub max_ongoing_tasks: usize,
pub max_unflushed_duration: ReadableDuration,
pub memory_limit: ReadableSize,
}

// TODO(boyan), a better default value?
Expand All @@ -71,6 +77,7 @@ impl Default for SchedulerConfig {
max_ongoing_tasks: MAX_GOING_COMPACTION_TASKS,
// flush_interval default is 5h.
max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)),
memory_limit: ReadableSize::gb(4),
}
}
}
Expand Down Expand Up @@ -134,6 +141,63 @@ impl<K: Eq + Hash + Clone, V> RequestQueue<K, V> {

type RequestBuf = RwLock<RequestQueue<TableId, TableCompactionRequest>>;

/// Combined with [`MemoryUsageToken`], [`MemoryLimit`] provides a mechanism to
/// impose limit on the memory usage.
#[derive(Clone, Debug)]
struct MemoryLimit {
usage: Arc<AtomicUsize>,
// TODO: support to adjust this threshold dynamically.
limit: usize,
}

/// The token for the memory usage, which should not derive Clone.
/// The applied memory will be subtracted from the global memory usage.
#[derive(Debug)]
struct MemoryUsageToken {
global_usage: Arc<AtomicUsize>,
applied_usage: usize,
}

impl Drop for MemoryUsageToken {
fn drop(&mut self) {
self.global_usage
.fetch_sub(self.applied_usage, Ordering::Relaxed);
}
}

impl MemoryLimit {
fn new(limit: usize) -> Self {
Self {
usage: Arc::new(AtomicUsize::new(0)),
limit,
}
}

/// Try to apply a token if possible.
fn try_apply_token(&self, bytes: usize) -> Option<MemoryUsageToken> {
let token = self.apply_token(bytes);
if self.is_exceeded() {
None
} else {
Some(token)
}
}

fn apply_token(&self, bytes: usize) -> MemoryUsageToken {
self.usage.fetch_add(bytes, Ordering::Relaxed);

MemoryUsageToken {
global_usage: self.usage.clone(),
applied_usage: bytes,
}
}

#[inline]
fn is_exceeded(&self) -> bool {
self.usage.load(Ordering::Relaxed) > self.limit
}
}

struct OngoingTaskLimit {
ongoing_tasks: AtomicUsize,
/// Buffer to hold pending requests
Expand Down Expand Up @@ -243,6 +307,7 @@ impl SchedulerImpl {
request_buf: RwLock::new(RequestQueue::default()),
}),
running: running.clone(),
memory_limit: MemoryLimit::new(config.memory_limit.as_bytes() as usize),
};

let handle = runtime.spawn(async move {
Expand Down Expand Up @@ -306,6 +371,7 @@ struct ScheduleWorker {
max_ongoing_tasks: usize,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
}

#[inline]
Expand Down Expand Up @@ -358,15 +424,15 @@ impl ScheduleWorker {
self.limit.request_buf_len()
);
} else {
self.do_table_compaction_request(compact_req).await;
self.handle_table_compaction_request(compact_req).await;
}
}
ScheduleTask::Schedule => {
if self.max_ongoing_tasks > ongoing {
let pending = self.limit.drain_requests(self.max_ongoing_tasks - ongoing);
let len = pending.len();
for compact_req in pending {
self.do_table_compaction_request(compact_req).await;
self.handle_table_compaction_request(compact_req).await;
}
debug!("Scheduled {} pending compaction tasks.", len);
}
Expand All @@ -375,40 +441,15 @@ impl ScheduleWorker {
};
}

async fn do_table_compaction_request(&self, compact_req: TableCompactionRequest) {
let table_data = compact_req.table_data;
let compaction_notifier = compact_req.compaction_notifier;
let waiter_notifier = WaiterNotifier::new(compact_req.waiter);

let table_options = table_data.table_options();
let compaction_strategy = table_options.compaction_strategy;
let picker = self.picker_manager.get_picker(compaction_strategy);
let picker_ctx = match new_picker_context(&table_options) {
Some(v) => v,
None => {
warn!("No valid context can be created, compaction request will be ignored, table_id:{}, table_name:{}",
table_data.id, table_data.name);
return;
}
};
let version = table_data.current_version();

// Pick compaction task.
let compaction_task = version.pick_for_compaction(picker_ctx, &picker);
let compaction_task = match compaction_task {
Ok(v) => v,
Err(e) => {
error!(
"Compaction scheduler failed to pick compaction, table:{}, table_id:{}, err:{}",
table_data.name, table_data.id, e
);
// Now the error of picking compaction is considered not fatal and not sent to
// compaction notifier.
return;
}
};

// Mark files are in compaction.
async fn do_table_compaction_task(
&self,
table_data: TableDataRef,
compaction_task: CompactionTask,
compaction_notifier: CompactionNotifier,
waiter_notifier: WaiterNotifier,
token: MemoryUsageToken,
) {
// Mark files being in compaction.
compaction_task.mark_files_being_compacted(true);

let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty();
Expand All @@ -425,6 +466,9 @@ impl ScheduleWorker {
let request_id = RequestId::next_id();
// Do actual costly compact job in background.
self.runtime.spawn(async move {
// Release the token after compaction finished.
let _token = token;

let res = space_store
.compact_table(runtime, &table_data, request_id, &compaction_task)
.await;
Expand Down Expand Up @@ -470,6 +514,91 @@ impl ScheduleWorker {
});
}

// Try to apply the memory usage token. Return `None` if the current memory
// usage exceeds the limit.
fn try_apply_memory_usage_token_for_task(
&self,
task: &CompactionTask,
) -> Option<MemoryUsageToken> {
let input_size = task.estimate_total_input_size();
let estimate_memory_usage = input_size * 2;
self.memory_limit.try_apply_token(estimate_memory_usage)
}

async fn handle_table_compaction_request(&self, compact_req: TableCompactionRequest) {
let table_data = compact_req.table_data.clone();
let table_options = table_data.table_options();
let compaction_strategy = table_options.compaction_strategy;
let picker = self.picker_manager.get_picker(compaction_strategy);
let picker_ctx = match new_picker_context(&table_options) {
Some(v) => v,
None => {
warn!("No valid context can be created, compaction request will be ignored, table_id:{}, table_name:{}",
table_data.id, table_data.name);
return;
}
};
let version = table_data.current_version();

// Pick compaction task.
let compaction_task = version.pick_for_compaction(picker_ctx, &picker);
let compaction_task = match compaction_task {
Ok(v) => v,
Err(e) => {
error!(
"Compaction scheduler failed to pick compaction, table:{}, table_id:{}, err:{}",
table_data.name, table_data.id, e
);
// Now the error of picking compaction is considered not fatal and not sent to
// compaction notifier.
return;
}
};

let token = match self.try_apply_memory_usage_token_for_task(&compaction_task) {
Some(v) => v,
None => {
// Memory usage exceeds the threshold, let's put pack the
// request.
self.put_back_compaction_request(compact_req).await;
return;
}
};

let compaction_notifier = compact_req.compaction_notifier;
let waiter_notifier = WaiterNotifier::new(compact_req.waiter);

self.do_table_compaction_task(
table_data,
compaction_task,
compaction_notifier,
waiter_notifier,
token,
)
.await;
}

async fn put_back_compaction_request(&self, req: TableCompactionRequest) {
if let Err(SendError(ScheduleTask::Request(TableCompactionRequest {
compaction_notifier,
waiter,
..
}))) = self.sender.send(ScheduleTask::Request(req)).await
{
let e = Arc::new(
flush_compaction::Other {
msg: "Failed to put back the compaction request for memory usage exceeds",
}
.build(),
);
compaction_notifier.notify_err(e.clone());

let waiter_notifier = WaiterNotifier::new(waiter);
let wait_err = WaitError::Compaction { source: e };
waiter_notifier.notify_wait_result(Err(wait_err));
}
}

async fn schedule(&mut self) {
self.purge_tables();
self.flush_tables().await;
Expand Down Expand Up @@ -570,6 +699,71 @@ fn new_picker_context(table_opts: &TableOptions) -> Option<PickerContext> {
mod tests {
use super::*;

#[test]
fn test_memory_usage_limit_apply() {
let limit = MemoryLimit::new(100);
let cases = vec![
// One case is (applied_requests, applied_results).
(vec![10, 20, 90, 30], vec![true, true, false, true]),
(vec![100, 10], vec![true, false]),
(vec![0, 90, 10], vec![true, true, true]),
];

for (apply_requests, expect_applied_results) in cases {
assert_eq!(limit.usage.load(Ordering::Relaxed), 0);

let mut applied_tokens = Vec::with_capacity(apply_requests.len());
for bytes in &apply_requests {
let token = limit.try_apply_token(*bytes);
applied_tokens.push(token);
}
assert_eq!(applied_tokens.len(), expect_applied_results.len());
assert_eq!(applied_tokens.len(), applied_tokens.len());

for (token, (apply_bytes, applied)) in applied_tokens.into_iter().zip(
apply_requests
.into_iter()
.zip(expect_applied_results.into_iter()),
) {
if applied {
let token = token.unwrap();
assert_eq!(token.applied_usage, apply_bytes);
assert_eq!(
token.global_usage.load(Ordering::Relaxed),
limit.usage.load(Ordering::Relaxed),
);
}
}
}
}

#[test]
fn test_memory_usage_limit_release() {
let limit = MemoryLimit::new(100);

let cases = vec![
// One case includes the operation consisting of (applied bytes, whether to keep the
// applied token) and final memory usage.
(vec![(10, false), (20, false)], 0),
(vec![(100, false), (10, true), (20, true), (30, true)], 60),
(vec![(0, false), (100, false), (20, true), (30, false)], 20),
];

for (ops, expect_memory_usage) in cases {
assert_eq!(limit.usage.load(Ordering::Relaxed), 0);

let mut tokens = Vec::new();
for (applied_bytes, keep_token) in ops {
let token = limit.try_apply_token(applied_bytes);
if keep_token {
tokens.push(token);
}
}

assert_eq!(limit.usage.load(Ordering::Relaxed), expect_memory_usage);
}
}

#[test]
fn test_request_queue() {
let mut q: RequestQueue<i32, String> = RequestQueue::default();
Expand Down
Loading