Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce memory control policy abstraction #8253

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
async-trait = "0.1"
async_stack_trace = { path = "../utils/async_stack_trace" }
clap = { version = "3", features = ["derive"] }
Expand Down
184 changes: 39 additions & 145 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,16 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_batch::task::BatchManager;
#[cfg(target_os = "linux")]
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

use super::policy::MemoryControlPolicy;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation overhead,
/// network buffer, etc.) in megabytes.
pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512;
#[cfg(any())]
/// The proportion of stream memory to all available memory for computing.
const STREAM_MEMORY_PROPORTION: f64 = 0.7;
#[cfg(any())]
/// The proportion of batch memory to all available memory for computing.
const BATCH_MEMORY_PROPORTION: f64 = 1.0 - STREAM_MEMORY_PROPORTION;

/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
/// the memory usage.
Expand All @@ -45,49 +39,41 @@ pub struct GlobalMemoryManager {
/// Barrier interval.
barrier_interval_ms: u32,
metrics: Arc<StreamingMetrics>,
/// The memory control policy for computing tasks.
memory_control_policy: MemoryControlPolicy,
}

pub type GlobalMemoryManagerRef = Arc<GlobalMemoryManager>;

impl GlobalMemoryManager {
#[cfg(target_os = "linux")]
#[cfg(any())]
const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8;
#[cfg(target_os = "linux")]
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
#[cfg(target_os = "linux")]
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;

pub fn new(
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
metrics: Arc<StreamingMetrics>,
memory_control_policy: MemoryControlPolicy,
) -> Arc<Self> {
// Arbitrarily set a minimal barrier interval in case it is too small,
// especially when it's 0.
let barrier_interval_ms = std::cmp::max(barrier_interval_ms, 10);

tracing::debug!(
"memory control policy: {}",
memory_control_policy.describe(total_compute_memory_bytes)
);

Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
total_compute_memory_bytes,
barrier_interval_ms,
metrics,
memory_control_policy,
})
}

pub fn get_watermark_epoch(&self) -> Arc<AtomicU64> {
self.watermark_epoch.clone()
}

#[cfg(target_os = "linux")]
fn set_watermark_time_ms(&self, time_ms: u64) {
use std::sync::atomic::Ordering;

let epoch = Epoch::from_physical_time(time_ms).0;
let watermark_epoch = self.watermark_epoch.as_ref();
watermark_epoch.store(epoch, Ordering::Relaxed);
}

// FIXME: remove such limitation after #7180
/// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons.
/// See the comments for the macro `enable_jemalloc_on_linux!()`
Expand All @@ -106,145 +92,53 @@ impl GlobalMemoryManager {
) {
use std::time::Duration;

use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

// Turn off memory control by default.
#[cfg(any())]
{
use pretty_bytes::converter::convert;

let total_batch_memory_bytes =
self.total_compute_memory_bytes as f64 * BATCH_MEMORY_PROPORTION;
let batch_memory_threshold =
(total_batch_memory_bytes * Self::BATCH_KILL_QUERY_THRESHOLD) as usize;
let total_stream_memory_bytes =
self.total_compute_memory_bytes as f64 * STREAM_MEMORY_PROPORTION;
let stream_memory_threshold_graceful =
(total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_GRACEFUL) as usize;
let stream_memory_threshold_aggressive =
(total_stream_memory_bytes * Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE) as usize;

tracing::debug!(
"Total memory for batch tasks: {}, total memory for streaming tasks: {}",
convert(total_batch_memory_bytes),
convert(total_stream_memory_bytes)
);
}

let stream_memory_threshold_graceful = (self.total_compute_memory_bytes as f64
* Self::STREAM_EVICTION_THRESHOLD_GRACEFUL)
as usize;
let stream_memory_threshold_aggressive = (self.total_compute_memory_bytes as f64
* Self::STREAM_EVICTION_THRESHOLD_AGGRESSIVE)
as usize;

let mut watermark_time_ms = Epoch::physical_now();
let mut last_stream_used_memory_bytes = 0;
let mut step = 0;
use risingwave_common::util::epoch::Epoch;

let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap();
let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap();
let mut last_jemalloc_allocated_mib = 0;
use crate::memory_management::policy::MemoryControlStats;

let mut tick_interval =
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
let mut memory_control_stats = MemoryControlStats {
batch_memory_usage: 0,
streaming_memory_usage: 0,
jemalloc_allocated_mib: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
lru_physical_now_ms: Epoch::physical_now(),
};

loop {
// Wait for a while to check if need eviction.
tick_interval.tick().await;

if let Err(e) = jemalloc_epoch_mib.advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
}

let jemalloc_allocated_mib = jemalloc_allocated_mib.read().unwrap_or_else(|e| {
tracing::warn!("Jemalloc read allocated failed! {:?}", e);
last_jemalloc_allocated_mib
});
last_jemalloc_allocated_mib = jemalloc_allocated_mib;

// ## Batch memory control
//
// When the batch memory usage exceeds the threshold, we choose the query that uses the
// most memory and kills it.

let batch_used_memory_bytes = batch_manager.total_mem_usage();
// We currently turn this off until batch memory control becomes stable.
#[cfg(any())]
if batch_used_memory_bytes > batch_memory_threshold {
batch_manager.kill_queries("excessive batch memory usage".to_string());
}

// ## Streaming memory control
//
// 1. When the streaming memory usage is below the graceful threshold, we do not evict
// any caches, and simply reset the step to 0.
//
// 2. When the memory usage is between the graceful and aggressive threshold:
// - If the last eviction memory usage decreases after last eviction, we set the
// eviction step to 1.
// - Otherwise, we set the step to last_step + 1.
//
// 3. When the memory usage exceeds the aggressive threshold:
// - If the memory usage decreases after the last eviction, we set the eviction step
// to last_step.
// - Otherwise, we set the step to last_step * 2.

#[cfg(any())]
let cur_stream_used_memory_bytes = stream_manager.total_mem_usage();
// We use the memory stats collected by jemalloc as available streaming memory for now.
let cur_stream_used_memory_bytes = jemalloc_allocated_mib;
let last_step = step;
step = if cur_stream_used_memory_bytes < stream_memory_threshold_graceful {
// Do not evict if the memory usage is lower than `mem_threshold_graceful`
0
} else if cur_stream_used_memory_bytes < stream_memory_threshold_aggressive {
// Gracefully evict
if last_stream_used_memory_bytes > cur_stream_used_memory_bytes {
1
} else {
step + 1
}
} else if last_stream_used_memory_bytes < cur_stream_used_memory_bytes {
// Aggressively evict
if step == 0 {
2
} else {
step * 2
}
} else {
step
};

last_stream_used_memory_bytes = cur_stream_used_memory_bytes;

// if watermark_time_ms + self.barrier_interval_ms as u64 * step > now, we do not
// increase the step, and set the epoch to now time epoch.
let physical_now = Epoch::physical_now();
if (physical_now - watermark_time_ms) / (self.barrier_interval_ms as u64) < step {
step = last_step;
watermark_time_ms = physical_now;
} else {
watermark_time_ms += self.barrier_interval_ms as u64 * step;
}
memory_control_stats = self.memory_control_policy.apply(
self.total_compute_memory_bytes,
self.barrier_interval_ms,
memory_control_stats,
batch_manager.clone(),
stream_manager.clone(),
self.watermark_epoch.clone(),
);

self.metrics
.lru_current_watermark_time_ms
.set(watermark_time_ms as i64);
self.metrics.lru_physical_now_ms.set(physical_now as i64);
self.metrics.lru_watermark_step.set(step as i64);
.set(memory_control_stats.lru_watermark_time_ms as i64);
self.metrics
.lru_physical_now_ms
.set(memory_control_stats.lru_physical_now_ms as i64);
self.metrics
.lru_watermark_step
.set(memory_control_stats.lru_watermark_step as i64);
self.metrics.lru_runtime_loop_count.inc();
self.metrics
.jemalloc_allocated_bytes
.set(jemalloc_allocated_mib as i64);
.set(memory_control_stats.jemalloc_allocated_mib as i64);
self.metrics
.stream_total_mem_usage
.set(stream_manager.total_mem_usage() as i64);
.set(memory_control_stats.streaming_memory_usage as i64);
self.metrics
.batch_total_mem_usage
.set(batch_used_memory_bytes as i64);

self.set_watermark_time_ms(watermark_time_ms);
.set(memory_control_stats.batch_memory_usage as i64);
}
}
}
1 change: 1 addition & 0 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod memory_manager;
pub mod policy;
Loading