Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(memory-manager): should not use prev_memory_stats.streaming_memory_usage #9384

Merged
merged 7 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -778,13 +778,6 @@ template:
# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

# The policy for compute node memory control.
memory-control-policy: streaming-only

# The proportion of streaming memory to all available memory for computing. Only works when
# `memory_control_policy` is set to "streaming-batch".
streaming-memory-proportion: 0.7

# Parallelism of tasks per compute node
parallelism: 4

Expand Down
16 changes: 0 additions & 16 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
pub parallelism: usize,

/// The policy for compute node memory control. Valid values:
/// - streaming-only
/// - streaming-batch
#[clap(
long,
env = "RW_MEMORY_CONTROL_POLICY",
default_value = "streaming-only"
)]
pub memory_control_policy: String,

/// The proportion of streaming memory to all available memory for computing. Only works when
/// `memory_control_policy` is set to "streaming-batch". Ignored otherwise. See
/// [`FixedProportionPolicy`] for more details.
#[clap(long, env = "RW_STREAMING_MEMORY_PROPORTION", default_value_t = 0.7)]
pub streaming_memory_proportion: f64,

#[clap(flatten)]
override_config: OverrideConfigOpts,
}
Expand Down
33 changes: 11 additions & 22 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,37 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

use super::MemoryControlPolicy;
use super::MemoryControlRef;
use crate::memory_management::MemoryControlStats;

/// Compute node uses [`GlobalMemoryManager`] to limit the memory usage.
pub struct GlobalMemoryManager {
/// All cached data before the watermark should be evicted.
watermark_epoch: Arc<AtomicU64>,
/// Total memory that can be allocated by the compute node for computing tasks (stream & batch)
/// in bytes.
total_compute_memory_bytes: usize,
/// Barrier interval.
barrier_interval_ms: u32,
/// Loop interval of running control policy
interval_ms: u32,
metrics: Arc<StreamingMetrics>,
/// The memory control policy for computing tasks.
memory_control_policy: MemoryControlPolicy,
memory_control_policy: MemoryControlRef,
}

pub type GlobalMemoryManagerRef = Arc<GlobalMemoryManager>;

impl GlobalMemoryManager {
pub fn new(
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
interval_ms: u32,
metrics: Arc<StreamingMetrics>,
memory_control_policy: MemoryControlPolicy,
memory_control_policy: MemoryControlRef,
) -> Arc<Self> {
// Arbitrarily set a minimal barrier interval in case it is too small,
// especially when it's 0.
let barrier_interval_ms = std::cmp::max(barrier_interval_ms, 10);
let interval_ms = std::cmp::max(interval_ms, 10);

tracing::info!(
"memory control policy: {}",
memory_control_policy.describe(total_compute_memory_bytes)
);
tracing::info!("memory control policy: {:?}", &memory_control_policy);

Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
total_compute_memory_bytes,
barrier_interval_ms,
interval_ms,
metrics,
memory_control_policy,
})
Expand All @@ -78,11 +70,9 @@ impl GlobalMemoryManager {
) {
// Keep same interval with the barrier interval
let mut tick_interval =
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
tokio::time::interval(Duration::from_millis(self.interval_ms as u64));

let mut memory_control_stats = MemoryControlStats {
batch_memory_usage: 0,
streaming_memory_usage: 0,
jemalloc_allocated_mib: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
Expand All @@ -94,8 +84,7 @@ impl GlobalMemoryManager {
tick_interval.tick().await;

memory_control_stats = self.memory_control_policy.apply(
self.total_compute_memory_bytes,
self.barrier_interval_ms,
self.interval_ms,
memory_control_stats,
batch_manager.clone(),
stream_manager.clone(),
Expand Down
53 changes: 11 additions & 42 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::error::Result;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation
Expand All @@ -47,62 +45,37 @@ pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.5;
pub const STORAGE_FILE_CACHE_MEMORY_PROPORTION: f64 = 0.1;
pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
/// `MemoryControlStats` contains the state from previous control loop
#[derive(Default)]
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;
pub type MemoryControlRef = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
pub trait MemoryControl: Send + Sync + std::fmt::Debug {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

#[cfg(target_os = "linux")]
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
use anyhow::anyhow;

use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy};

let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
pub fn build_memory_control_policy(total_memory_bytes: usize) -> Result<MemoryControlRef> {
use self::policy::JemallocMemoryControl;

Ok(Box::new(JemallocMemoryControl::new(total_memory_bytes)))
}

#[cfg(not(target_os = "linux"))]
pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
pub fn build_memory_control_policy(_total_memory_bytes: usize) -> Result<MemoryControlRef> {
// We disable memory control on operating systems other than Linux now because jemalloc
// stats do not work well.
tracing::warn!("memory control is only enabled on Linux now");
Expand All @@ -111,24 +84,20 @@ pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<Memo

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
/// is disabled on non-Linux OS.
#[derive(Debug)]
pub struct DummyPolicy;

impl MemoryControl for DummyPolicy {
fn apply(
&self,
_total_compute_memory_bytes: usize,
_barrier_interval_ms: u32,
_interval_ms: u32,
_prev_memory_stats: MemoryControlStats,
_batch_manager: Arc<BatchManager>,
_stream_manager: Arc<LocalStreamManager>,
_watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats {
MemoryControlStats::default()
}

fn describe(&self, _total_compute_memory_bytes: usize) -> String {
"DummyPolicy".to_string()
}
}

/// Each compute node reserves some memory for stack and code segment of processes, allocation
Expand Down
Loading