Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MemTrackingMetrics to ease memory tracking for non-limited memory consumers #1691

Merged
merged 9 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 70 additions & 64 deletions datafusion/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

use crate::error::{DataFusionError, Result};
use async_trait::async_trait;
use hashbrown::HashMap;
use hashbrown::HashSet;
use log::debug;
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::sync::{Arc, Condvar, Mutex};

static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);

Expand Down Expand Up @@ -245,10 +245,10 @@ The memory management architecture is the following:
/// Manage memory usage during physical plan execution
#[derive(Debug)]
pub struct MemoryManager {
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
pool_size: usize,
requesters_total: Arc<Mutex<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as a follow on PR this can be changed to be an AtomicUsize and avoid the mutex and I think the fetch and update code will be nicer.

I think that would be a nice to have - not required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the requester_total is combined with the later Condvar, to stop late arrived requesters frequently spilling (since the earlier consumers may already occupy much memory). They wait for notification when holding less than 1/2n memory. Any suggestions on this?

The code here would be much simplified when substituted Arc<Mutex> by AtomicUsize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lock only guarantees the two operations updating requesters_total and calling cv.notify_all will be performed atomically, but it looks like this doesn't really buy us anything? The waiter on self.cv can wake up and get preempted right away by other threads that might update requesters_total. I am curious from your point of view what benefit this critical region provides.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I was wrong, considering the cv will reacquire the lock on wake up, a mutex is needed if we need to make sure the woken up thread will not be operating with a different requesters_total value.

trackers_total: AtomicUsize,
cv: Condvar,
}

Expand All @@ -267,41 +267,47 @@ impl MemoryManager {
);

Arc::new(Self {
requesters: Arc::new(Mutex::new(HashMap::new())),
trackers: Arc::new(Mutex::new(HashMap::new())),
requesters: Arc::new(Mutex::new(HashSet::new())),
pool_size,
requesters_total: Arc::new(Mutex::new(0)),
trackers_total: AtomicUsize::new(0),
cv: Condvar::new(),
})
}
}
}

fn get_tracker_total(&self) -> usize {
let trackers = self.trackers.lock().unwrap();
if trackers.len() > 0 {
trackers.values().fold(0usize, |acc, y| match y.upgrade() {
None => acc,
Some(t) => acc + t.mem_used(),
})
} else {
0
}
self.trackers_total.load(Ordering::SeqCst)
}

/// Register a new memory consumer for memory usage tracking
pub(crate) fn register_consumer(&self, consumer: &Arc<dyn MemoryConsumer>) {
let id = consumer.id().clone();
match consumer.type_() {
ConsumerType::Requesting => {
let mut requesters = self.requesters.lock().unwrap();
requesters.insert(id, Arc::downgrade(consumer));
}
ConsumerType::Tracking => {
let mut trackers = self.trackers.lock().unwrap();
trackers.insert(id, Arc::downgrade(consumer));
}
}
pub(crate) fn grow_tracker_usage(&self, delta: usize) {
self.trackers_total.fetch_add(delta, Ordering::SeqCst);
}

pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
let update =
self.trackers_total
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this potentially end up in an infinite loop if x is zero as the closure would always return None?

After seeing this way, I think I agree that the original implementation using Mutex was better -- sorry about that @yjshen I didn't realize the subtleties involved here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fetch_update won't let infinite loop here. A simple case zero minus ten results in underflow:
https://play.rust-lang.org/?version=stable&mode=release&edition=2021

I agree the mutex version is easier to write and reason. I can revert the last commit if preferred.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is ready to go I think -- we can always keep iterating in the next one. Sorry for the delay in merging @yjshen

if x >= delta {
Some(x - delta)
} else {
None
}
});
update.expect(&*format!(
"Tracker total memory shrink by {} underflow, current value is ",
delta
));
}

fn get_requester_total(&self) -> usize {
*self.requesters_total.lock().unwrap()
}

/// Register a new memory requester
pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
self.requesters.lock().unwrap().insert(requester_id.clone());
}

fn max_mem_for_requesters(&self) -> usize {
Expand All @@ -317,7 +323,6 @@ impl MemoryManager {

let granted;
loop {
let remaining = rqt_max - *rqt_current_used;
let max_per_rqt = rqt_max / num_rqt;
let min_per_rqt = max_per_rqt / 2;

Expand All @@ -326,6 +331,7 @@ impl MemoryManager {
break;
}

let remaining = rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
if remaining >= required {
granted = true;
*rqt_current_used += required;
Expand All @@ -347,46 +353,37 @@ impl MemoryManager {

fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
let mut requesters_total = self.requesters_total.lock().unwrap();
assert!(*requesters_total >= freed);
*requesters_total -= freed;
*requesters_total += acquired;
self.cv.notify_all()
}

/// Drop a memory consumer from memory usage tracking
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId) {
/// Drop a memory consumer and reclaim the memory
pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
// find in requesters first
{
let mut requesters = self.requesters.lock().unwrap();
if requesters.remove(id).is_some() {
return;
if requesters.remove(id) {
let mut total = self.requesters_total.lock().unwrap();
assert!(*total >= mem_used);
*total -= mem_used;
}
}
let mut trackers = self.trackers.lock().unwrap();
trackers.remove(id);
self.shrink_tracker_usage(mem_used);
self.cv.notify_all();
}
}

impl Display for MemoryManager {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let requesters =
self.requesters
.lock()
.unwrap()
.values()
.fold(vec![], |mut acc, consumer| match consumer.upgrade() {
None => acc,
Some(c) => {
acc.push(format!("{}", c));
acc
}
});
let tracker_mem = self.get_tracker_total();
write!(f,
"MemoryManager usage statistics: total {}, tracker used {}, total {} requesters detail: \n {},",
human_readable_size(self.pool_size),
human_readable_size(tracker_mem),
&requesters.len(),
requesters.join("\n"))
"MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}",
human_readable_size(self.pool_size),
human_readable_size(self.get_tracker_total()),
self.requesters.lock().unwrap().len(),
human_readable_size(self.get_requester_total()),
)
}
}

Expand Down Expand Up @@ -418,6 +415,8 @@ mod tests {
use super::*;
use crate::error::Result;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::MemoryConsumer;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use async_trait::async_trait;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -487,6 +486,7 @@ mod tests {

impl DummyTracker {
fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self {
runtime.grow_tracker_usage(mem_used);
Self {
id: MemoryConsumerId::new(partition),
runtime,
Expand Down Expand Up @@ -528,23 +528,29 @@ mod tests {
.with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap());
let runtime = Arc::new(RuntimeEnv::new(config).unwrap());

let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>));
DummyTracker::new(0, runtime.clone(), 5);
assert_eq!(runtime.memory_manager.get_tracker_total(), 5);

let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10));
runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>));
let tracker1 = DummyTracker::new(0, runtime.clone(), 10);
assert_eq!(runtime.memory_manager.get_tracker_total(), 15);

let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15));
runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>));
DummyTracker::new(0, runtime.clone(), 15);
assert_eq!(runtime.memory_manager.get_tracker_total(), 30);

runtime.drop_consumer(tracker2.id());
runtime.drop_consumer(tracker1.id(), tracker1.mem_used);
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);

// MemTrackingMetrics as an easy way to track memory
let ms = ExecutionPlanMetricsSet::new();
let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone());
tracking_metric.init_mem_used(15);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 --very nice

assert_eq!(runtime.memory_manager.get_tracker_total(), 35);

drop(tracking_metric);
assert_eq!(runtime.memory_manager.get_tracker_total(), 20);

let requester1 = Arc::new(DummyRequester::new(0, runtime.clone()));
runtime.register_consumer(&(requester1.clone() as Arc<dyn MemoryConsumer>));
let requester1 = DummyRequester::new(0, runtime.clone());
runtime.register_requester(requester1.id());

// first requester entered, should be able to use any of the remaining 80
requester1.do_with_mem(40).await.unwrap();
Expand All @@ -553,8 +559,8 @@ mod tests {
assert_eq!(requester1.mem_used(), 50);
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50);

let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
runtime.register_consumer(&(requester2.clone() as Arc<dyn MemoryConsumer>));
let requester2 = DummyRequester::new(0, runtime.clone());
runtime.register_requester(requester2.id());

requester2.do_with_mem(20).await.unwrap();
requester2.do_with_mem(30).await.unwrap();
Expand Down
24 changes: 16 additions & 8 deletions datafusion/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use crate::{
error::Result,
execution::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_manager::{
MemoryConsumer, MemoryConsumerId, MemoryManager, MemoryManagerConfig,
},
memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
},
};

Expand Down Expand Up @@ -71,13 +69,23 @@ impl RuntimeEnv {
}

/// Register the consumer to get it tracked
pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) {
self.memory_manager.register_consumer(memory_consumer);
pub fn register_requester(&self, id: &MemoryConsumerId) {
self.memory_manager.register_requester(id);
}

/// Drop the consumer from get tracked
pub fn drop_consumer(&self, id: &MemoryConsumerId) {
self.memory_manager.drop_consumer(id)
/// Drop the consumer from get tracked, reclaim memory
pub fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
self.memory_manager.drop_consumer(id, mem_used)
}

/// Grow tracker memory of `delta`
pub fn grow_tracker_usage(&self, delta: usize) {
self.memory_manager.grow_tracker_usage(delta)
}

/// Shrink tracker memory of `delta`
pub fn shrink_tracker_usage(&self, delta: usize) {
self.memory_manager.shrink_tracker_usage(delta)
}
}

Expand Down
12 changes: 7 additions & 5 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
Expand All @@ -43,21 +43,23 @@ pub struct SizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
baseline_metrics: BaselineMetrics,
metrics: MemTrackingMetrics,
}

impl SizedRecordBatchStream {
/// Create a new RecordBatchIterator
pub fn new(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
baseline_metrics: BaselineMetrics,
metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
metrics.init_mem_used(size);
SizedRecordBatchStream {
schema,
index: 0,
batches,
baseline_metrics,
metrics,
}
}
}
Expand All @@ -75,7 +77,7 @@ impl Stream for SizedRecordBatchStream {
} else {
None
});
self.baseline_metrics.record_poll(poll)
self.metrics.record_poll(poll)
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc

use super::SendableRecordBatchStream;
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use async_trait::async_trait;

/// Explain execution plan operator. This operator contains the string
Expand Down Expand Up @@ -148,12 +148,12 @@ impl ExecutionPlan for ExplainExec {
)?;

let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, partition);
let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);

Ok(Box::pin(SizedRecordBatchStream::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
baseline_metrics,
tracking_metrics,
)))
}

Expand Down
Loading