Skip to content

Commit

Permalink
fix(memory-manager): should not use prev_memory_stats.streaming_memor…
Browse files Browse the repository at this point in the history
…y_usage (#9384)
  • Loading branch information
fuyufjh authored Apr 24, 2023
1 parent 0fe6f59 commit c33e1f2
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 246 deletions.
7 changes: 0 additions & 7 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -778,13 +778,6 @@ template:
# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

# The policy for compute node memory control.
memory-control-policy: streaming-only

# The proportion of streaming memory to all available memory for computing. Only works when
# `memory_control_policy` is set to "streaming-batch".
streaming-memory-proportion: 0.7

# Parallelism of tasks per compute node
parallelism: 4

Expand Down
16 changes: 0 additions & 16 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
pub parallelism: usize,

/// The policy for compute node memory control. Valid values:
/// - streaming-only
/// - streaming-batch
#[clap(
long,
env = "RW_MEMORY_CONTROL_POLICY",
default_value = "streaming-only"
)]
pub memory_control_policy: String,

/// The proportion of streaming memory to all available memory for computing. Only works when
/// `memory_control_policy` is set to "streaming-batch". Ignored otherwise. See
/// [`FixedProportionPolicy`] for more details.
#[clap(long, env = "RW_STREAMING_MEMORY_PROPORTION", default_value_t = 0.7)]
pub streaming_memory_proportion: f64,

#[clap(flatten)]
override_config: OverrideConfigOpts,
}
Expand Down
33 changes: 11 additions & 22 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,37 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

use super::MemoryControlPolicy;
use super::MemoryControlRef;
use crate::memory_management::MemoryControlStats;

/// Compute node uses [`GlobalMemoryManager`] to limit the memory usage.
pub struct GlobalMemoryManager {
/// All cached data before the watermark should be evicted.
watermark_epoch: Arc<AtomicU64>,
/// Total memory that can be allocated by the compute node for computing tasks (stream & batch)
/// in bytes.
total_compute_memory_bytes: usize,
/// Barrier interval.
barrier_interval_ms: u32,
/// Loop interval of running control policy
interval_ms: u32,
metrics: Arc<StreamingMetrics>,
/// The memory control policy for computing tasks.
memory_control_policy: MemoryControlPolicy,
memory_control_policy: MemoryControlRef,
}

pub type GlobalMemoryManagerRef = Arc<GlobalMemoryManager>;

impl GlobalMemoryManager {
pub fn new(
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
interval_ms: u32,
metrics: Arc<StreamingMetrics>,
memory_control_policy: MemoryControlPolicy,
memory_control_policy: MemoryControlRef,
) -> Arc<Self> {
// Arbitrarily set a minimal barrier interval in case it is too small,
// especially when it's 0.
let barrier_interval_ms = std::cmp::max(barrier_interval_ms, 10);
let interval_ms = std::cmp::max(interval_ms, 10);

tracing::info!(
"memory control policy: {}",
memory_control_policy.describe(total_compute_memory_bytes)
);
tracing::info!("memory control policy: {:?}", &memory_control_policy);

Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
total_compute_memory_bytes,
barrier_interval_ms,
interval_ms,
metrics,
memory_control_policy,
})
Expand All @@ -78,11 +70,9 @@ impl GlobalMemoryManager {
) {
// Keep same interval with the barrier interval
let mut tick_interval =
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
tokio::time::interval(Duration::from_millis(self.interval_ms as u64));

let mut memory_control_stats = MemoryControlStats {
batch_memory_usage: 0,
streaming_memory_usage: 0,
jemalloc_allocated_mib: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
Expand All @@ -94,8 +84,7 @@ impl GlobalMemoryManager {
tick_interval.tick().await;

memory_control_stats = self.memory_control_policy.apply(
self.total_compute_memory_bytes,
self.barrier_interval_ms,
self.interval_ms,
memory_control_stats,
batch_manager.clone(),
stream_manager.clone(),
Expand Down
53 changes: 11 additions & 42 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::error::Result;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation
Expand All @@ -47,62 +45,37 @@ pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.5;
pub const STORAGE_FILE_CACHE_MEMORY_PROPORTION: f64 = 0.1;
pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
/// `MemoryControlStats` contains the state from previous control loop
#[derive(Default)]
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;
pub type MemoryControlRef = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
pub trait MemoryControl: Send + Sync + std::fmt::Debug {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

#[cfg(target_os = "linux")]
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
use anyhow::anyhow;

use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy};

let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
pub fn build_memory_control_policy(total_memory_bytes: usize) -> Result<MemoryControlRef> {
use self::policy::JemallocMemoryControl;

Ok(Box::new(JemallocMemoryControl::new(total_memory_bytes)))
}

#[cfg(not(target_os = "linux"))]
pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
pub fn build_memory_control_policy(_total_memory_bytes: usize) -> Result<MemoryControlRef> {
// We disable memory control on operating systems other than Linux now because jemalloc
// stats do not work well.
tracing::warn!("memory control is only enabled on Linux now");
Expand All @@ -111,24 +84,20 @@ pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<Memo

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
/// is disabled on non-Linux OS.
#[derive(Debug)]
pub struct DummyPolicy;

impl MemoryControl for DummyPolicy {
fn apply(
&self,
_total_compute_memory_bytes: usize,
_barrier_interval_ms: u32,
_interval_ms: u32,
_prev_memory_stats: MemoryControlStats,
_batch_manager: Arc<BatchManager>,
_stream_manager: Arc<LocalStreamManager>,
_watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats {
MemoryControlStats::default()
}

fn describe(&self, _total_compute_memory_bytes: usize) -> String {
"DummyPolicy".to_string()
}
}

/// Each compute node reserves some memory for stack and code segment of processes, allocation
Expand Down
Loading

0 comments on commit c33e1f2

Please sign in to comment.