diff --git a/risedev.yml b/risedev.yml index 08f70054fe626..4040b793bc6ca 100644 --- a/risedev.yml +++ b/risedev.yml @@ -778,13 +778,6 @@ template: # Total available memory for the compute node in bytes total-memory-bytes: 8589934592 - # The policy for compute node memory control. - memory-control-policy: streaming-only - - # The proportion of streaming memory to all available memory for computing. Only works when - # `memory_control_policy` is set to "streaming-batch". - streaming-memory-proportion: 0.7 - # Parallelism of tasks per compute node parallelism: 4 diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index a4b5b2d471ddb..75c06f9503afe 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -84,22 +84,6 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())] pub parallelism: usize, - /// The policy for compute node memory control. Valid values: - /// - streaming-only - /// - streaming-batch - #[clap( - long, - env = "RW_MEMORY_CONTROL_POLICY", - default_value = "streaming-only" - )] - pub memory_control_policy: String, - - /// The proportion of streaming memory to all available memory for computing. Only works when - /// `memory_control_policy` is set to "streaming-batch". Ignored otherwise. See - /// [`FixedProportionPolicy`] for more details. - #[clap(long, env = "RW_STREAMING_MEMORY_PROPORTION", default_value_t = 0.7)] - pub streaming_memory_proportion: f64, - #[clap(flatten)] override_config: OverrideConfigOpts, } diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs index 82b482386d900..8c7507519f335 100644 --- a/src/compute/src/memory_management/memory_manager.rs +++ b/src/compute/src/memory_management/memory_manager.rs @@ -21,45 +21,37 @@ use risingwave_common::util::epoch::Epoch; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::task::LocalStreamManager; -use super::MemoryControlPolicy; +use super::MemoryControlRef; use crate::memory_management::MemoryControlStats; /// Compute node uses [`GlobalMemoryManager`] to limit the memory usage. pub struct GlobalMemoryManager { /// All cached data before the watermark should be evicted. watermark_epoch: Arc, - /// Total memory that can be allocated by the compute node for computing tasks (stream & batch) - /// in bytes. - total_compute_memory_bytes: usize, - /// Barrier interval. - barrier_interval_ms: u32, + /// Loop interval of running control policy + interval_ms: u32, metrics: Arc, /// The memory control policy for computing tasks. - memory_control_policy: MemoryControlPolicy, + memory_control_policy: MemoryControlRef, } pub type GlobalMemoryManagerRef = Arc; impl GlobalMemoryManager { pub fn new( - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, + interval_ms: u32, metrics: Arc, - memory_control_policy: MemoryControlPolicy, + memory_control_policy: MemoryControlRef, ) -> Arc { // Arbitrarily set a minimal barrier interval in case it is too small, // especially when it's 0. - let barrier_interval_ms = std::cmp::max(barrier_interval_ms, 10); + let interval_ms = std::cmp::max(interval_ms, 10); - tracing::info!( - "memory control policy: {}", - memory_control_policy.describe(total_compute_memory_bytes) - ); + tracing::info!("memory control policy: {:?}", &memory_control_policy); Arc::new(Self { watermark_epoch: Arc::new(0.into()), - total_compute_memory_bytes, - barrier_interval_ms, + interval_ms, metrics, memory_control_policy, }) @@ -78,11 +70,9 @@ impl GlobalMemoryManager { ) { // Keep same interval with the barrier interval let mut tick_interval = - tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64)); + tokio::time::interval(Duration::from_millis(self.interval_ms as u64)); let mut memory_control_stats = MemoryControlStats { - batch_memory_usage: 0, - streaming_memory_usage: 0, jemalloc_allocated_mib: 0, lru_watermark_step: 0, lru_watermark_time_ms: Epoch::physical_now(), @@ -94,8 +84,7 @@ impl GlobalMemoryManager { tick_interval.tick().await; memory_control_stats = self.memory_control_policy.apply( - self.total_compute_memory_bytes, - self.barrier_interval_ms, + self.interval_ms, memory_control_stats, batch_manager.clone(), stream_manager.clone(), diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index c12b77b8561ee..4ea0c002e4fb7 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -27,8 +27,6 @@ use risingwave_common::config::{StorageConfig, StorageMemoryConfig}; use risingwave_common::error::Result; use risingwave_stream::task::LocalStreamManager; -use crate::ComputeNodeOpts; - /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// The memory reserved for system usage (stack and code segment of processes, allocation @@ -47,62 +45,37 @@ pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.5; pub const STORAGE_FILE_CACHE_MEMORY_PROPORTION: f64 = 0.1; pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70; -/// `MemoryControlStats` contains the necessary information for memory control, including both batch -/// and streaming. +/// `MemoryControlStats` contains the state from previous control loop #[derive(Default)] pub struct MemoryControlStats { - pub batch_memory_usage: usize, - pub streaming_memory_usage: usize, pub jemalloc_allocated_mib: usize, pub lru_watermark_step: u64, pub lru_watermark_time_ms: u64, pub lru_physical_now_ms: u64, } -pub type MemoryControlPolicy = Box; +pub type MemoryControlRef = Box; -pub trait MemoryControl: Send + Sync { +pub trait MemoryControl: Send + Sync + std::fmt::Debug { fn apply( &self, - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, + interval_ms: u32, prev_memory_stats: MemoryControlStats, batch_manager: Arc, stream_manager: Arc, watermark_epoch: Arc, ) -> MemoryControlStats; - - fn describe(&self, total_compute_memory_bytes: usize) -> String; } #[cfg(target_os = "linux")] -pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { - use anyhow::anyhow; - - use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy}; - - let input_policy = &opts.memory_control_policy; - if input_policy == FixedProportionPolicy::CONFIG_STR { - Ok(Box::new(FixedProportionPolicy::new( - opts.streaming_memory_proportion, - )?)) - } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { - Ok(Box::new(StreamingOnlyPolicy)) - } else { - let valid_values = [ - FixedProportionPolicy::CONFIG_STR, - StreamingOnlyPolicy::CONFIG_STR, - ]; - Err(anyhow!(format!( - "invalid memory control policy in configuration: {}, valid values: {:?}", - input_policy, valid_values, - )) - .into()) - } +pub fn build_memory_control_policy(total_memory_bytes: usize) -> Result { + use self::policy::JemallocMemoryControl; + + Ok(Box::new(JemallocMemoryControl::new(total_memory_bytes))) } #[cfg(not(target_os = "linux"))] -pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result { +pub fn build_memory_control_policy(_total_memory_bytes: usize) -> Result { // We disable memory control on operating systems other than Linux now because jemalloc // stats do not work well. tracing::warn!("memory control is only enabled on Linux now"); @@ -111,13 +84,13 @@ pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result, _stream_manager: Arc, @@ -125,10 +98,6 @@ impl MemoryControl for DummyPolicy { ) -> MemoryControlStats { MemoryControlStats::default() } - - fn describe(&self, _total_compute_memory_bytes: usize) -> String { - "DummyPolicy".to_string() - } } /// Each compute node reserves some memory for stack and code segment of processes, allocation diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs index 3c90edfa74b44..adfd9fd34b2ff 100644 --- a/src/compute/src/memory_management/policy.rs +++ b/src/compute/src/memory_management/policy.rs @@ -15,140 +15,46 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; -use anyhow::anyhow; use risingwave_batch::task::BatchManager; -use risingwave_common::error::Result; use risingwave_common::util::epoch::Epoch; -use risingwave_common::util::pretty_bytes::convert; use risingwave_stream::task::LocalStreamManager; use super::{MemoryControl, MemoryControlStats}; -/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and -/// streaming to a fixed proportion. -pub struct FixedProportionPolicy { - /// The proportion of streaming memory to all available memory for computing. This should - /// always fall in (0, 1). The proportion of batch memory will be 1 - /// -`streaming_memory_proportion`. - streaming_memory_proportion: f64, +/// `JemallocMemoryControl` is a memory control policy that uses jemalloc statistics to control. It +/// assumes that most memory is used by streaming engine and does memory control over LRU watermark +/// based on jemalloc statistics. +#[derive(Debug)] +pub struct JemallocMemoryControl { + threshold_graceful: usize, + threshold_aggressive: usize, } -impl FixedProportionPolicy { - const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; - pub const CONFIG_STR: &str = "streaming-batch"; - const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; - const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; +impl JemallocMemoryControl { + const THRESHOLD_AGGRESSIVE: f64 = 0.9; + const THRESHOLD_GRACEFUL: f64 = 0.7; - pub fn new(streaming_memory_proportion: f64) -> Result { - if streaming_memory_proportion <= 0.0 || streaming_memory_proportion >= 1.0 { - return Err(anyhow!("streaming memory proportion should fall in (0, 1)").into()); + pub fn new(total_memory: usize) -> Self { + let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; + let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; + Self { + threshold_graceful, + threshold_aggressive, } - Ok(Self { - streaming_memory_proportion, - }) } } -impl MemoryControl for FixedProportionPolicy { +impl MemoryControl for JemallocMemoryControl { fn apply( &self, - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, + interval_ms: u32, prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats { - let batch_memory_proportion = 1.0 - self.streaming_memory_proportion; - let total_batch_memory_bytes = total_compute_memory_bytes as f64 * batch_memory_proportion; - let batch_memory_threshold = - (total_batch_memory_bytes * Self::BATCH_KILL_QUERY_THRESHOLD) as usize; - let total_stream_memory_bytes = - total_compute_memory_bytes as f64 * self.streaming_memory_proportion; - let stream_memory_threshold_graceful = - (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize; - let stream_memory_threshold_aggressive = - (total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) as usize; - - let jemalloc_allocated_mib = - advance_jemalloc_epoch(prev_memory_stats.jemalloc_allocated_mib); - - // Batch memory control - // - // When the batch memory usage exceeds the threshold, we choose the query that uses the - // most memory and kill it. - - let batch_used_memory_bytes = batch_manager.total_mem_usage(); - if batch_used_memory_bytes > batch_memory_threshold { - batch_manager.kill_queries("excessive batch memory usage".to_string()); - } - - // Streaming memory control - // - // We calculate the watermark of the LRU cache, which provides hints for streaming executors - // on cache eviction. - - let stream_used_memory_bytes = stream_manager.total_mem_usage(); - let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( - stream_used_memory_bytes, - stream_memory_threshold_graceful, - stream_memory_threshold_aggressive, - barrier_interval_ms, - prev_memory_stats, - ); - set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); - - MemoryControlStats { - batch_memory_usage: batch_used_memory_bytes, - streaming_memory_usage: stream_used_memory_bytes, - jemalloc_allocated_mib, - lru_watermark_step, - lru_watermark_time_ms, - lru_physical_now_ms: lru_physical_now, - } - } - - fn describe(&self, total_compute_memory_bytes: usize) -> String { - let total_stream_memory_bytes = - total_compute_memory_bytes as f64 * self.streaming_memory_proportion; - let total_batch_memory_bytes = - total_compute_memory_bytes as f64 * (1.0 - self.streaming_memory_proportion); - format!( - "FixedProportionPolicy: total available streaming memory is {}, total available batch memory is {}", - convert(total_stream_memory_bytes), - convert(total_batch_memory_bytes) - ) - } -} -/// `StreamingOnlyPolicy` only performs memory control on streaming tasks. It differs from -/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics, -/// which actually contains system usage other than computing tasks. This is the default memory -/// control policy. -pub struct StreamingOnlyPolicy; - -impl StreamingOnlyPolicy { - pub const CONFIG_STR: &str = "streaming-only"; - const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; - const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; -} - -impl MemoryControl for StreamingOnlyPolicy { - fn apply( - &self, - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, - prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, + _batch_manager: Arc, + _stream_manager: Arc, watermark_epoch: Arc, ) -> MemoryControlStats { let jemalloc_allocated_mib = advance_jemalloc_epoch(prev_memory_stats.jemalloc_allocated_mib); - let stream_memory_threshold_graceful = - (total_compute_memory_bytes as f64 * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize; - let stream_memory_threshold_aggressive = (total_compute_memory_bytes as f64 - * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) - as usize; // Streaming memory control // @@ -157,29 +63,21 @@ impl MemoryControl for StreamingOnlyPolicy { let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( jemalloc_allocated_mib, - stream_memory_threshold_graceful, - stream_memory_threshold_aggressive, - barrier_interval_ms, + self.threshold_graceful, + self.threshold_aggressive, + interval_ms, prev_memory_stats, ); + set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); MemoryControlStats { - batch_memory_usage: batch_manager.total_mem_usage(), - streaming_memory_usage: stream_manager.total_mem_usage(), jemalloc_allocated_mib, lru_watermark_step, lru_watermark_time_ms, lru_physical_now_ms: lru_physical_now, } } - - fn describe(&self, total_compute_memory_bytes: usize) -> String { - format!( - "StreamingOnlyPolicy: total available streaming memory is {}", - convert(total_compute_memory_bytes as f64) - ) - } } fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { @@ -198,15 +96,15 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { } fn calculate_lru_watermark( - cur_stream_used_memory_bytes: usize, - stream_memory_threshold_graceful: usize, - stream_memory_threshold_aggressive: usize, - barrier_interval_ms: u32, + cur_used_memory_bytes: usize, + threshold_graceful: usize, + threshold_aggressive: usize, + interval_ms: u32, prev_memory_stats: MemoryControlStats, ) -> (u64, u64, u64) { let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms; let last_step = prev_memory_stats.lru_watermark_step; - let last_stream_used_memory_bytes = prev_memory_stats.streaming_memory_usage; + let last_used_memory_bytes = prev_memory_stats.jemalloc_allocated_mib; // The watermark calculation works in the following way: // @@ -223,17 +121,17 @@ fn calculate_lru_watermark( // last_step. // - Otherwise, we set the step to last_step * 2. - let mut step = if cur_stream_used_memory_bytes < stream_memory_threshold_graceful { - // Do not evict if the memory usage is lower than `stream_memory_threshold_graceful` + let mut step = if cur_used_memory_bytes < threshold_graceful { + // Do not evict if the memory usage is lower than `threshold_graceful` 0 - } else if cur_stream_used_memory_bytes < stream_memory_threshold_aggressive { + } else if cur_used_memory_bytes < threshold_aggressive { // Gracefully evict - if last_stream_used_memory_bytes > cur_stream_used_memory_bytes { + if last_used_memory_bytes > cur_used_memory_bytes { 1 } else { last_step + 1 } - } else if last_stream_used_memory_bytes < cur_stream_used_memory_bytes { + } else if last_used_memory_bytes < cur_used_memory_bytes { // Aggressively evict if last_step == 0 { 2 @@ -245,15 +143,13 @@ fn calculate_lru_watermark( }; let physical_now = Epoch::physical_now(); - if (physical_now - prev_memory_stats.lru_watermark_time_ms) / (barrier_interval_ms as u64) - < step - { + if (physical_now - prev_memory_stats.lru_watermark_time_ms) / (interval_ms as u64) < step { // We do not increase the step and watermark here to prevent a too-advanced watermark. The - // original condition is `prev_watermark_time_ms + barrier_interval_ms * step > now`. + // original condition is `prev_watermark_time_ms + interval_ms * step > now`. step = last_step; watermark_time_ms = physical_now; } else { - watermark_time_ms += barrier_interval_ms as u64 * step; + watermark_time_ms += interval_ms as u64 * step; } (step, watermark_time_ms, physical_now) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 510c38e95fbbb..2bcc801253354 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -62,8 +62,7 @@ use tokio::task::JoinHandle; use crate::memory_management::memory_manager::GlobalMemoryManager; use crate::memory_management::{ - memory_control_policy_from_config, reserve_memory_bytes, storage_memory_config, - MIN_COMPUTE_MEMORY_MB, + build_memory_control_policy, reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB, }; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; @@ -134,7 +133,15 @@ pub async fn compute_node_serve( reserved_memory_bytes, ); - let memory_control_policy = memory_control_policy_from_config(&opts).unwrap(); + // NOTE: Due to some limits, we use `compute_memory_bytes + storage_memory_bytes` as + // `total_compute_memory_bytes` for memory control. This is just a workaround for some + // memory control issues and should be modified as soon as we figure out a better solution. + // + // Related issues: + // - https://github.com/risingwavelabs/risingwave/issues/8696 + // - https://github.com/risingwavelabs/risingwave/issues/8822 + let total_memory_bytes = compute_memory_bytes + storage_memory_bytes; + let memory_control_policy = build_memory_control_policy(total_memory_bytes).unwrap(); let storage_opts = Arc::new(StorageOpts::from(( &config, @@ -276,15 +283,7 @@ pub async fn compute_node_serve( let batch_mgr_clone = batch_mgr.clone(); let stream_mgr_clone = stream_mgr.clone(); - // NOTE: Due to some limits, we use `compute_memory_bytes + storage_memory_bytes` as - // `total_compute_memory_bytes` for memory control. This is just a workaround for some - // memory control issues and should be modified as soon as we figure out a better solution. - // - // Related issues: - // - https://github.com/risingwavelabs/risingwave/issues/8696 - // - https://github.com/risingwavelabs/risingwave/issues/8822 let memory_mgr = GlobalMemoryManager::new( - compute_memory_bytes + storage_memory_bytes, system_params.barrier_interval_ms(), streaming_metrics.clone(), memory_control_policy, diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index bcb8c012b7e84..017b023fa476e 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -40,8 +40,6 @@ pub struct ComputeNodeConfig { pub connector_rpc_endpoint: String, pub total_memory_bytes: usize, - pub memory_control_policy: String, - pub streaming_memory_proportion: f64, pub parallelism: usize, } diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index 90232ea58575c..ecac39d704eb6 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -61,11 +61,7 @@ impl ComputeNodeService { .arg("--parallelism") .arg(&config.parallelism.to_string()) .arg("--total-memory-bytes") - .arg(&config.total_memory_bytes.to_string()) - .arg("--memory-control-policy") - .arg(&config.memory_control_policy) - .arg("--streaming-memory-proportion") - .arg(&config.streaming_memory_proportion.to_string()); + .arg(&config.total_memory_bytes.to_string()); let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); match provide_jaeger.len() {