Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect live bytes during GC #768

Merged
merged 8 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ work_packet_stats = []
# Count the malloc'd memory into the heap size
malloc_counted_size = []

# Count the size of all live objects in GC
count_live_bytes_in_gc = []

# Do not modify the following line - ci-common.sh matches it
# -- Mutally exclusive features --
# Only one feature from each group can be provided. Otherwise build will fail.
Expand Down
20 changes: 18 additions & 2 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,22 +534,38 @@ pub fn process_bulk(builder: &mut MMTKBuilder, options: &str) -> bool {
builder.set_options_bulk_by_str(options)
}

/// Return used memory in bytes.
/// Return used memory in bytes. MMTk accounts for memory in pages, thus this method always returns a value in
/// page granularity.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
pub fn used_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan.get_used_pages() << LOG_BYTES_IN_PAGE
}

/// Return free memory in bytes.
/// Return free memory in bytes. MMTk accounts for memory in pages, thus this method always returns a value in
/// page granularity.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
pub fn free_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan.get_free_pages() << LOG_BYTES_IN_PAGE
}

/// Return the size of all the live objects in bytes in the last GC. MMTk usually accounts for memory in pages.
/// This is a special method that we count the size of every live object in a GC, and sum up the total bytes.
/// We provide this method so users can compare with `used_bytes` (which does page accounting), and know if
/// the heap is fragmented.
/// The value returned by this method is only updated when we finish tracing in a GC. A recommended timing
/// to call this method is at the end of a GC (e.g. when the runtime is about to resume threads).
#[cfg(feature = "count_live_bytes_in_gc")]
pub fn live_bytes_in_last_gc<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan
.base()
.live_bytes_in_last_gc
.load(Ordering::SeqCst)
}

/// Return the starting address of the heap. *Note that currently MMTk uses
/// a fixed address range as heap.*
pub fn starting_heap_address() -> Address {
Expand Down
5 changes: 5 additions & 0 deletions src/plan/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ pub struct BasePlan<VM: VMBinding> {
/// A counteer that keeps tracks of the number of bytes allocated by malloc
#[cfg(feature = "malloc_counted_size")]
malloc_bytes: AtomicUsize,
/// This stores the size in bytes for all the live objects in last GC. This counter is only updated in the GC release phase.
#[cfg(feature = "count_live_bytes_in_gc")]
pub live_bytes_in_last_gc: AtomicUsize,
/// Wrapper around analysis counters
#[cfg(feature = "analysis")]
pub analysis_manager: AnalysisManager<VM>,
Expand Down Expand Up @@ -547,6 +550,8 @@ impl<VM: VMBinding> BasePlan<VM> {
allocation_bytes: AtomicUsize::new(0),
#[cfg(feature = "malloc_counted_size")]
malloc_bytes: AtomicUsize::new(0),
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes_in_last_gc: AtomicUsize::new(0),
#[cfg(feature = "analysis")]
analysis_manager,
}
Expand Down
6 changes: 6 additions & 0 deletions src/plan/markcompact/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl<VM: VMBinding> GCWork<VM> for UpdateReferences<VM> {
#[cfg(feature = "extreme_assertions")]
mmtk.edge_logger.reset();

// We do two passes of transitive closures. We clear the live bytes from the first pass.
#[cfg(feature = "count_live_bytes_in_gc")]
mmtk.scheduler
.worker_group
.get_and_clear_worker_live_bytes();

// TODO investigate why the following will create duplicate edges
// scheduler.work_buckets[WorkBucketStage::RefForwarding]
// .add(ScanStackRoots::<ForwardingProcessEdges<VM>>::new());
Expand Down
2 changes: 1 addition & 1 deletion src/plan/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ObjectQueue for VectorQueue<ObjectReference> {
/// A transitive closure visitor to collect all the edges of an object.
pub struct ObjectsClosure<'a, E: ProcessEdgesWork> {
buffer: VectorQueue<EdgeOf<E>>,
worker: &'a mut GCWorker<E::VM>,
pub(crate) worker: &'a mut GCWorker<E::VM>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer needs to be pub(crate).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this. When we do worker.shared.increase_live_bytes() in ScanObjectsWork::do_work_common(), we already created ObjectsClosure which takes &mut GCWorker. We can't use worker directly, and we have to use it through ObjectsClosure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. I didn't notice that.

}

impl<'a, E: ProcessEdgesWork> ObjectsClosure<'a, E> {
Expand Down
41 changes: 41 additions & 0 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
let result = w.designated_work.push(Box::new(ReleaseCollector));
debug_assert!(result.is_ok());
}

#[cfg(feature = "count_live_bytes_in_gc")]
{
let live_bytes = mmtk
.scheduler
.worker_group
.get_and_clear_worker_live_bytes();
self.plan
.base()
.live_bytes_in_last_gc
.store(live_bytes, std::sync::atomic::Ordering::SeqCst);
}
}
}

Expand Down Expand Up @@ -227,6 +239,28 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
self.elapsed.as_millis()
);

#[cfg(feature = "count_live_bytes_in_gc")]
{
let live_bytes = mmtk
.plan
.base()
.live_bytes_in_last_gc
.load(std::sync::atomic::Ordering::SeqCst);
let used_bytes =
mmtk.plan.get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE;
debug_assert!(
live_bytes <= used_bytes,
"Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.",
live_bytes, used_bytes
);
info!(
"Live objects = {} bytes ({:04.1}% of {} used pages)",
live_bytes,
live_bytes as f64 * 100.0 / used_bytes as f64,
mmtk.plan.get_used_pages()
);
}

// We assume this is the only running work packet that accesses plan at the point of execution
#[allow(clippy::cast_ref_to_mut)]
let plan_mut: &mut dyn Plan<VM = VM> = unsafe { &mut *(&*mmtk.plan as *const _ as *mut _) };
Expand Down Expand Up @@ -815,6 +849,13 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {
{
let mut closure = ObjectsClosure::<Self::E>::new(worker);
for object in objects_to_scan.iter().copied() {
// For any object we need to scan, we count its liv bytes
#[cfg(feature = "count_live_bytes_in_gc")]
closure
.worker
.shared
.increase_live_bytes(VM::VMObjectModel::get_current_size(object));

if <VM as VMBinding>::VMScanning::support_edge_enqueuing(tls, object) {
trace!("Scan object (edge) {}", object);
// If an object supports edge-enqueuing, we enqueue its edges.
Expand Down
27 changes: 27 additions & 0 deletions src/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use atomic::Atomic;
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
use crossbeam::deque::{self, Stealer};
use crossbeam::queue::ArrayQueue;
#[cfg(feature = "count_live_bytes_in_gc")]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};

Expand All @@ -30,6 +32,11 @@ pub fn current_worker_ordinal() -> Option<ThreadId> {
pub struct GCWorkerShared<VM: VMBinding> {
/// Worker-local statistics data.
stat: AtomicRefCell<WorkerLocalStat<VM>>,
/// Accumulated bytes for live objects in this GC. When each worker scans
/// objects, we increase the live bytes. We get this value from each worker
/// at the end of a GC, and reset this counter.
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes: AtomicUsize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need atomic for this? It's worker-local right?

Copy link
Member Author

@qinsoon qinsoon Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need atomic for this? It's worker-local right?

It is in the shared part of a worker. We cannot get a mutable reference to it. It needs to be in the shared part, as we need to iterate through all workers, and sum it up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. I was just thinking if we can make it cheap enough by not using atomic for worker local stats, we could enable this feature by default.

Copy link
Collaborator

@k-sareen k-sareen Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make it worker-local and each worker reports the live bytes after their transitive closure? Then we don't have to explicitly iterate through all the workers, we just collate the numbers we get from the transitive closure.

Copy link
Collaborator

@wks wks Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be any stage after the transitive closure stages (including weak reference processing stages).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to defer the "live byte count" to each policy as opposed to each worker? But then it stops being worker-local so it would need synchronization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to defer the "live byte count" to each policy as opposed to each worker? But then it stops being worker-local so it would need synchronization.

Yeah. It need at least the same level synchronization as the current code. And the counting code is scattered to each policy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use AtomicUsize::as_mut_ptr and do non-atomic add with unsafe code when we do the counting. But I still question if we want to make this enabled by default.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't have a strong opinion. Keeping track of fragmentation is definitely useful though. If it's possible to do it cheaply for every GC, may as well do it. If it's not possible then I don't think should spend more time on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qinsoon Sorry about the typo. I meant ReleaseCollector. Yes. It will be a problem for MarkCompact. If we make the counter a private field of a Worker, it will require another rendezvous (designated works) between the marking phase and the forwarding phase. Given this situation, I don't mind if we use AtomicUsize for now.

/// A queue of GCWork that can only be processed by the owned thread.
///
/// Note: Currently, designated work cannot be added from the GC controller thread, or
Expand All @@ -44,10 +51,22 @@ impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
Self {
stat: Default::default(),
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes: AtomicUsize::new(0),
designated_work: ArrayQueue::new(16),
stealer,
}
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub(crate) fn increase_live_bytes(&self, bytes: usize) {
self.live_bytes.fetch_add(bytes, Ordering::Relaxed);
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub(crate) fn get_and_clear_live_bytes(&self) -> usize {
self.live_bytes.swap(0, Ordering::SeqCst)
}
}

/// Used to synchronize mutually exclusive operations between workers and controller,
Expand Down Expand Up @@ -427,4 +446,12 @@ impl<VM: VMBinding> WorkerGroup<VM> {
.iter()
.any(|w| !w.designated_work.is_empty())
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub fn get_and_clear_worker_live_bytes(&self) -> usize {
self.workers_shared
.iter()
.map(|w| w.get_and_clear_live_bytes())
.sum()
}
}