diff --git a/Cargo.lock b/Cargo.lock index 456bffe3c347b..87894550f38db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5812,6 +5812,7 @@ dependencies = [ name = "risingwave_compute" version = "0.2.0-alpha" dependencies = [ + "anyhow", "async-trait", "async_stack_trace", "clap 3.2.23", diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index b9162b1156a4b..2da8cc81add14 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +anyhow = "1" async-trait = "0.1" async_stack_trace = { path = "../utils/async_stack_trace" } clap = { version = "3", features = ["derive"] } diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs index c853b27df5870..97d6be1c38dcd 100644 --- a/src/compute/src/memory_management/memory_manager.rs +++ b/src/compute/src/memory_management/memory_manager.rs @@ -16,22 +16,16 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use risingwave_batch::task::BatchManager; -#[cfg(target_os = "linux")] -use risingwave_common::util::epoch::Epoch; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::task::LocalStreamManager; +use super::policy::MemoryControlPolicy; + /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// The memory reserved for system usage (stack and code segment of processes, allocation overhead, /// network buffer, etc.) in megabytes. pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512; -#[cfg(any())] -/// The proportion of stream memory to all available memory for computing. -const STREAM_MEMORY_PROPORTION: f64 = 0.7; -#[cfg(any())] -/// The proportion of batch memory to all available memory for computing. -const BATCH_MEMORY_PROPORTION: f64 = 1.0 - STREAM_MEMORY_PROPORTION; /// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit /// the memory usage. @@ -45,33 +39,34 @@ pub struct GlobalMemoryManager { /// Barrier interval. barrier_interval_ms: u32, metrics: Arc, + /// The memory control policy for computing tasks. + memory_control_policy: MemoryControlPolicy, } pub type GlobalMemoryManagerRef = Arc; impl GlobalMemoryManager { - #[cfg(target_os = "linux")] - #[cfg(any())] - const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; - #[cfg(target_os = "linux")] - const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; - #[cfg(target_os = "linux")] - const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; - pub fn new( total_compute_memory_bytes: usize, barrier_interval_ms: u32, metrics: Arc, + memory_control_policy: MemoryControlPolicy, ) -> Arc { // Arbitrarily set a minimal barrier interval in case it is too small, // especially when it's 0. let barrier_interval_ms = std::cmp::max(barrier_interval_ms, 10); + tracing::debug!( + "memory control policy: {}", + memory_control_policy.describe(total_compute_memory_bytes) + ); + Arc::new(Self { watermark_epoch: Arc::new(0.into()), total_compute_memory_bytes, barrier_interval_ms, metrics, + memory_control_policy, }) } @@ -79,15 +74,6 @@ impl GlobalMemoryManager { self.watermark_epoch.clone() } - #[cfg(target_os = "linux")] - fn set_watermark_time_ms(&self, time_ms: u64) { - use std::sync::atomic::Ordering; - - let epoch = Epoch::from_physical_time(time_ms).0; - let watermark_epoch = self.watermark_epoch.as_ref(); - watermark_epoch.store(epoch, Ordering::Relaxed); - } - // FIXME: remove such limitation after #7180 /// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons. /// See the comments for the macro `enable_jemalloc_on_linux!()` @@ -106,145 +92,53 @@ impl GlobalMemoryManager { ) { use std::time::Duration; - use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; - - // Turn off memory control by default. - #[cfg(any())] - { - use pretty_bytes::converter::convert; - - let total_batch_memory_bytes = - self.total_compute_memory_bytes as f64 * BATCH_MEMORY_PROPORTION; - let batch_memory_threshold = - (total_batch_memory_bytes * Self::BATCH_KILL_QUERY_THRESHOLD) as usize; - let total_stream_memory_bytes = - self.total_compute_memory_bytes as f64 * STREAM_MEMORY_PROPORTION; - let stream_memory_threshold_graceful = - (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize; - let stream_memory_threshold_aggressive = - (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) as usize; - - tracing::debug!( - "Total memory for batch tasks: {}, total memory for streaming tasks: {}", - convert(total_batch_memory_bytes), - convert(total_stream_memory_bytes) - ); - } - - let stream_memory_threshold_graceful = (self.total_compute_memory_bytes as f64 - * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) - as usize; - let stream_memory_threshold_aggressive = (self.total_compute_memory_bytes as f64 - * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) - as usize; - - let mut watermark_time_ms = Epoch::physical_now(); - let mut last_stream_used_memory_bytes = 0; - let mut step = 0; + use risingwave_common::util::epoch::Epoch; - let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); - let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); - let mut last_jemalloc_allocated_mib = 0; + use crate::memory_management::policy::MemoryControlStats; let mut tick_interval = tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64)); + let mut memory_control_stats = MemoryControlStats { + batch_memory_usage: 0, + streaming_memory_usage: 0, + jemalloc_allocated_mib: 0, + lru_watermark_step: 0, + lru_watermark_time_ms: Epoch::physical_now(), + lru_physical_now_ms: Epoch::physical_now(), + }; loop { // Wait for a while to check if need eviction. tick_interval.tick().await; - if let Err(e) = jemalloc_epoch_mib.advance() { - tracing::warn!("Jemalloc epoch advance failed! {:?}", e); - } - - let jemalloc_allocated_mib = jemalloc_allocated_mib.read().unwrap_or_else(|e| { - tracing::warn!("Jemalloc read allocated failed! {:?}", e); - last_jemalloc_allocated_mib - }); - last_jemalloc_allocated_mib = jemalloc_allocated_mib; - - // ## Batch memory control - // - // When the batch memory usage exceeds the threshold, we choose the query that uses the - // most memory and kills it. - - let batch_used_memory_bytes = batch_manager.total_mem_usage(); - // We currently turn this off until batch memory control becomes stable. - #[cfg(any())] - if batch_used_memory_bytes > batch_memory_threshold { - batch_manager.kill_queries("excessive batch memory usage".to_string()); - } - - // ## Streaming memory control - // - // 1. When the streaming memory usage is below the graceful threshold, we do not evict - // any caches, and simply reset the step to 0. - // - // 2. When the memory usage is between the graceful and aggressive threshold: - // - If the last eviction memory usage decreases after last eviction, we set the - // eviction step to 1. - // - Otherwise, we set the step to last_step + 1. - // - // 3. When the memory usage exceeds the aggressive threshold: - // - If the memory usage decreases after the last eviction, we set the eviction step - // to last_step. - // - Otherwise, we set the step to last_step * 2. - - #[cfg(any())] - let cur_stream_used_memory_bytes = stream_manager.total_mem_usage(); - // We use the memory stats collected by jemalloc as available streaming memory for now. - let cur_stream_used_memory_bytes = jemalloc_allocated_mib; - let last_step = step; - step = if cur_stream_used_memory_bytes < stream_memory_threshold_graceful { - // Do not evict if the memory usage is lower than `mem_threshold_graceful` - 0 - } else if cur_stream_used_memory_bytes < stream_memory_threshold_aggressive { - // Gracefully evict - if last_stream_used_memory_bytes > cur_stream_used_memory_bytes { - 1 - } else { - step + 1 - } - } else if last_stream_used_memory_bytes < cur_stream_used_memory_bytes { - // Aggressively evict - if step == 0 { - 2 - } else { - step * 2 - } - } else { - step - }; - - last_stream_used_memory_bytes = cur_stream_used_memory_bytes; - - // if watermark_time_ms + self.barrier_interval_ms as u64 * step > now, we do not - // increase the step, and set the epoch to now time epoch. - let physical_now = Epoch::physical_now(); - if (physical_now - watermark_time_ms) / (self.barrier_interval_ms as u64) < step { - step = last_step; - watermark_time_ms = physical_now; - } else { - watermark_time_ms += self.barrier_interval_ms as u64 * step; - } + memory_control_stats = self.memory_control_policy.apply( + self.total_compute_memory_bytes, + self.barrier_interval_ms, + memory_control_stats, + batch_manager.clone(), + stream_manager.clone(), + self.watermark_epoch.clone(), + ); self.metrics .lru_current_watermark_time_ms - .set(watermark_time_ms as i64); - self.metrics.lru_physical_now_ms.set(physical_now as i64); - self.metrics.lru_watermark_step.set(step as i64); + .set(memory_control_stats.lru_watermark_time_ms as i64); + self.metrics + .lru_physical_now_ms + .set(memory_control_stats.lru_physical_now_ms as i64); + self.metrics + .lru_watermark_step + .set(memory_control_stats.lru_watermark_step as i64); self.metrics.lru_runtime_loop_count.inc(); self.metrics .jemalloc_allocated_bytes - .set(jemalloc_allocated_mib as i64); + .set(memory_control_stats.jemalloc_allocated_mib as i64); self.metrics .stream_total_mem_usage - .set(stream_manager.total_mem_usage() as i64); + .set(memory_control_stats.streaming_memory_usage as i64); self.metrics .batch_total_mem_usage - .set(batch_used_memory_bytes as i64); - - self.set_watermark_time_ms(watermark_time_ms); + .set(memory_control_stats.batch_memory_usage as i64); } } } diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 242dc4e8e67e9..4fc5f94fbbfe5 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod memory_manager; +pub mod policy; diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs new file mode 100644 index 0000000000000..7f1a4013a4086 --- /dev/null +++ b/src/compute/src/memory_management/policy.rs @@ -0,0 +1,301 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use anyhow::anyhow; +use pretty_bytes::converter::convert; +use risingwave_batch::task::BatchManager; +use risingwave_common::error::Result; +use risingwave_common::util::epoch::Epoch; +use risingwave_stream::task::LocalStreamManager; + +/// `MemoryControlStats` contains the necessary information for memory control, including both batch +/// and streaming. +pub struct MemoryControlStats { + pub batch_memory_usage: usize, + pub streaming_memory_usage: usize, + pub jemalloc_allocated_mib: usize, + pub lru_watermark_step: u64, + pub lru_watermark_time_ms: u64, + pub lru_physical_now_ms: u64, +} + +pub type MemoryControlPolicy = Box; + +pub trait MemoryControl: Send + Sync { + fn apply( + &self, + total_compute_memory_bytes: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, + batch_manager: Arc, + stream_manager: Arc, + watermark_epoch: Arc, + ) -> MemoryControlStats; + + fn describe(&self, total_compute_memory_bytes: usize) -> String; +} + +/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and +/// streaming to a fixed proportion. +pub struct FixedProportionPolicy { + /// The proportion of streaming memory to all available memory for computing. This should + /// always fall in (0, 1). The proportion of batch memory will be 1 + /// -`streaming_memory_proportion`. + streaming_memory_proportion: f64, +} + +impl FixedProportionPolicy { + const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; + const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; + const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; + + pub fn new(streaming_memory_proportion: f64) -> Result { + if streaming_memory_proportion <= 0.0 || streaming_memory_proportion >= 1.0 { + return Err(anyhow!("streaming memory proportion should fall in (0, 1)").into()); + } + Ok(Self { + streaming_memory_proportion, + }) + } +} + +impl Default for FixedProportionPolicy { + fn default() -> Self { + Self { + // The default streaming memory proportion is 70%. That for batch is correspondingly + // 30%. + streaming_memory_proportion: 0.7, + } + } +} + +impl MemoryControl for FixedProportionPolicy { + fn apply( + &self, + total_compute_memory_bytes: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, + batch_manager: Arc, + stream_manager: Arc, + watermark_epoch: Arc, + ) -> MemoryControlStats { + let batch_memory_proportion = 1.0 - self.streaming_memory_proportion; + let total_batch_memory_bytes = total_compute_memory_bytes as f64 * batch_memory_proportion; + let batch_memory_threshold = + (total_batch_memory_bytes * Self::BATCH_KILL_QUERY_THRESHOLD) as usize; + let total_stream_memory_bytes = + total_compute_memory_bytes as f64 * self.streaming_memory_proportion; + let stream_memory_threshold_graceful = + (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize; + let stream_memory_threshold_aggressive = + (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) as usize; + + let jemalloc_allocated_mib = + advance_jemalloc_epoch(prev_memory_stats.jemalloc_allocated_mib); + + // Batch memory control + // + // When the batch memory usage exceeds the threshold, we choose the query that uses the + // most memory and kill it. + + let batch_used_memory_bytes = batch_manager.total_mem_usage(); + if batch_used_memory_bytes > batch_memory_threshold { + batch_manager.kill_queries("excessive batch memory usage".to_string()); + } + + // Streaming memory control + // + // We calculate the watermark of the LRU cache, which provides hints for streaming executors + // on cache eviction. + + let stream_used_memory_bytes = stream_manager.total_mem_usage(); + let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( + stream_used_memory_bytes, + stream_memory_threshold_graceful, + stream_memory_threshold_aggressive, + barrier_interval_ms, + prev_memory_stats, + ); + set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); + + MemoryControlStats { + batch_memory_usage: batch_used_memory_bytes, + streaming_memory_usage: stream_used_memory_bytes, + jemalloc_allocated_mib, + lru_watermark_step, + lru_watermark_time_ms, + lru_physical_now_ms: lru_physical_now, + } + } + + fn describe(&self, total_compute_memory_bytes: usize) -> String { + let total_stream_memory_bytes = + total_compute_memory_bytes as f64 * self.streaming_memory_proportion; + let total_batch_memory_bytes = + total_compute_memory_bytes as f64 * (1.0 - self.streaming_memory_proportion); + format!( + "FixedProportionPolicy: total available streaming memory is {}, total available batch memory is {}", + convert(total_stream_memory_bytes), + convert(total_batch_memory_bytes) + ) + } +} +/// `StreamingOnlyPolicy` only performs memory control on streaming tasks. It differs from +/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics, +/// which actually contains system usage other than computing tasks. This is the default memory +/// control policy. +#[derive(Default)] +pub struct StreamingOnlyPolicy {} + +impl StreamingOnlyPolicy { + const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; + const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; +} + +impl MemoryControl for StreamingOnlyPolicy { + fn apply( + &self, + total_compute_memory_bytes: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, + batch_manager: Arc, + stream_manager: Arc, + watermark_epoch: Arc, + ) -> MemoryControlStats { + let jemalloc_allocated_mib = + advance_jemalloc_epoch(prev_memory_stats.jemalloc_allocated_mib); + let stream_memory_threshold_graceful = + (total_compute_memory_bytes as f64 * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize; + let stream_memory_threshold_aggressive = (total_compute_memory_bytes as f64 + * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) + as usize; + + // Streaming memory control + // + // We calculate the watermark of the LRU cache, which provides hints for streaming executors + // on cache eviction. Here we do the calculation based on jemalloc statistics. + + let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( + jemalloc_allocated_mib, + stream_memory_threshold_graceful, + stream_memory_threshold_aggressive, + barrier_interval_ms, + prev_memory_stats, + ); + set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); + + MemoryControlStats { + batch_memory_usage: batch_manager.total_mem_usage(), + streaming_memory_usage: stream_manager.total_mem_usage(), + jemalloc_allocated_mib, + lru_watermark_step, + lru_watermark_time_ms, + lru_physical_now_ms: lru_physical_now, + } + } + + fn describe(&self, total_compute_memory_bytes: usize) -> String { + format!( + "StreamingOnlyPolicy: total available streaming memory is {}", + convert(total_compute_memory_bytes as f64) + ) + } +} + +fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { + use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; + + let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); + let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); + + if let Err(e) = jemalloc_epoch_mib.advance() { + tracing::warn!("Jemalloc epoch advance failed! {:?}", e); + } + jemalloc_allocated_mib.read().unwrap_or_else(|e| { + tracing::warn!("Jemalloc read allocated failed! {:?}", e); + prev_jemalloc_allocated_mib + }) +} + +fn calculate_lru_watermark( + cur_stream_used_memory_bytes: usize, + stream_memory_threshold_graceful: usize, + stream_memory_threshold_aggressive: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, +) -> (u64, u64, u64) { + let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms; + let last_step = prev_memory_stats.lru_watermark_step; + let last_stream_used_memory_bytes = prev_memory_stats.streaming_memory_usage; + + // The watermark calculation works in the following way: + // + // 1. When the streaming memory usage is below the graceful threshold, we do not evict + // any caches, and simply reset the step to 0. + // + // 2. When the memory usage is between the graceful and aggressive threshold: + // - If the last eviction memory usage decreases after last eviction, we set the eviction step + // to 1. + // - Otherwise, we set the step to last_step + 1. + // + // 3. When the memory usage exceeds the aggressive threshold: + // - If the memory usage decreases after the last eviction, we set the eviction step to + // last_step. + // - Otherwise, we set the step to last_step * 2. + + let mut step = if cur_stream_used_memory_bytes < stream_memory_threshold_graceful { + // Do not evict if the memory usage is lower than `stream_memory_threshold_graceful` + 0 + } else if cur_stream_used_memory_bytes < stream_memory_threshold_aggressive { + // Gracefully evict + if last_stream_used_memory_bytes > cur_stream_used_memory_bytes { + 1 + } else { + last_step + 1 + } + } else if last_stream_used_memory_bytes < cur_stream_used_memory_bytes { + // Aggressively evict + if last_step == 0 { + 2 + } else { + last_step * 2 + } + } else { + last_step + }; + + let physical_now = Epoch::physical_now(); + if (physical_now - prev_memory_stats.lru_watermark_time_ms) / (barrier_interval_ms as u64) + < step + { + // We do not increase the step and watermark here to prevent a too-advanced watermark. The + // original condition is `prev_watermark_time_ms + barrier_interval_ms * step > now`. + step = last_step; + watermark_time_ms = physical_now; + } else { + watermark_time_ms += barrier_interval_ms as u64 * step; + } + + (step, watermark_time_ms, physical_now) +} + +fn set_lru_watermark_time_ms(watermark_epoch: Arc, time_ms: u64) { + use std::sync::atomic::Ordering; + + let epoch = Epoch::from_physical_time(time_ms).0; + watermark_epoch.as_ref().store(epoch, Ordering::Relaxed); +} diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 3a0fc6415b9ac..435a678a5267d 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -59,6 +59,7 @@ use tokio::task::JoinHandle; use crate::memory_management::memory_manager::{ GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB, }; +use crate::memory_management::policy::StreamingOnlyPolicy; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics; use crate::rpc::service::exchange_service::ExchangeServiceImpl; @@ -239,6 +240,7 @@ pub async fn compute_node_serve( compute_memory_bytes, system_params.barrier_interval_ms(), streaming_metrics.clone(), + Box::new(StreamingOnlyPolicy {}), ); // Run a background memory monitor tokio::spawn(mgr.clone().run(batch_mgr_clone, stream_mgr_clone));