Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MemTrackingMetrics to ease memory tracking for non-limited memory consumers #1691

Merged
merged 9 commits into from
Jan 29, 2022
28 changes: 18 additions & 10 deletions datafusion/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ pub struct MemoryManager {
requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
pool_size: usize,
requesters_total: Arc<Mutex<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as a follow on PR this can be changed to be an AtomicUsize and avoid the mutex and I think the fetch and update code will be nicer.

I think that would be a nice to have - not required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the requester_total is combined with the later Condvar, to stop late arrived requesters frequently spilling (since the earlier consumers may already occupy much memory). They wait for notification when holding less than 1/2n memory. Any suggestions on this?

The code here would be much simplified when substituted Arc<Mutex> by AtomicUsize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lock only guarantees the two operations updating requesters_total and calling cv.notify_all will be performed atomically, but it looks like this doesn't really buy us anything? The waiter on self.cv can wake up and get preempted right away by other threads that might update requesters_total. I am curious from your point of view what benefit this critical region provides.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I was wrong, considering the cv will reacquire the lock on wake up, a mutex is needed if we need to make sure the woken up thread will not be operating with a different requesters_total value.

trackers_total: Arc<Mutex<usize>>,
trackers_total: AtomicUsize,
cv: Condvar,
}

Expand All @@ -270,25 +270,35 @@ impl MemoryManager {
requesters: Arc::new(Mutex::new(HashSet::new())),
pool_size,
requesters_total: Arc::new(Mutex::new(0)),
trackers_total: Arc::new(Mutex::new(0)),
trackers_total: AtomicUsize::new(0),
cv: Condvar::new(),
})
}
}
}

fn get_tracker_total(&self) -> usize {
*self.trackers_total.lock().unwrap()
self.trackers_total.load(Ordering::SeqCst)
}

pub(crate) fn grow_tracker_usage(&self, delta: usize) {
*self.trackers_total.lock().unwrap() += delta;
self.trackers_total.fetch_add(delta, Ordering::SeqCst);
}

pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
let mut total = self.trackers_total.lock().unwrap();
assert!(*total >= delta);
*total -= delta;
let update =
self.trackers_total
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this potentially end up in an infinite loop if x is zero as the closure would always return None?

After seeing this way, I think I agree that the original implementation using Mutex was better -- sorry about that @yjshen I didn't realize the subtleties involved here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fetch_update won't let infinite loop here. A simple case zero minus ten results in underflow:
https://play.rust-lang.org/?version=stable&mode=release&edition=2021

I agree the mutex version is easier to write and reason. I can revert the last commit if preferred.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is ready to go I think -- we can always keep iterating in the next one. Sorry for the delay in merging @yjshen

if x >= delta {
Some(x - delta)
} else {
None
}
});
update.expect(&*format!(
"Tracker total memory shrink by {} underflow, current value is ",
delta
));
}

fn get_requester_total(&self) -> usize {
Expand Down Expand Up @@ -360,9 +370,7 @@ impl MemoryManager {
*total -= mem_used;
}
}
let mut total = self.trackers_total.lock().unwrap();
assert!(*total >= mem_used);
*total -= mem_used;
self.shrink_tracker_usage(mem_used);
self.cv.notify_all();
}
}
Expand Down