Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect live bytes during GC #768

Merged
merged 8 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ work_packet_stats = []
# Count the malloc'd memory into the heap size
malloc_counted_size = []

# Count the size of all live objects in GC
count_live_bytes_in_gc = []

# Do not modify the following line - ci-common.sh matches it
# -- Mutally exclusive features --
# Only one feature from each group can be provided. Otherwise build will fail.
Expand Down
20 changes: 18 additions & 2 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,22 +521,38 @@ pub fn process_bulk(builder: &mut MMTKBuilder, options: &str) -> bool {
builder.set_options_bulk_by_str(options)
}

/// Return used memory in bytes.
/// Return used memory in bytes. MMTk accounts for memory in pages, thus this method always returns a value in
/// page granularity.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
pub fn used_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan.get_used_pages() << LOG_BYTES_IN_PAGE
}

/// Return free memory in bytes.
/// Return free memory in bytes. MMTk accounts for memory in pages, thus this method always returns a value in
/// page granularity.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
pub fn free_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan.get_free_pages() << LOG_BYTES_IN_PAGE
}

/// Return the size of all the live objects in bytes in the last GC. MMTk usually accounts for memory in pages.
/// This is a special method that we count the size of every live object in a GC, and sum up the total bytes.
/// We provide this method so users can compare with `used_bytes` (which does page accounting), and know if
/// the heap is fragmented.
/// The value returned by this method is only updated when we finish tracing in a GC. A recommended timing
/// to call this method is at the end of a GC (e.g. when the runtime is about to resume threads).
#[cfg(feature = "count_live_bytes_in_gc")]
pub fn live_bytes_in_last_gc<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
mmtk.plan
.base()
.live_bytes_in_last_gc
.load(Ordering::SeqCst)
}

/// Return the starting address of the heap. *Note that currently MMTk uses
/// a fixed address range as heap.*
pub fn starting_heap_address() -> Address {
Expand Down
5 changes: 5 additions & 0 deletions src/plan/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ pub struct BasePlan<VM: VMBinding> {
/// A counteer that keeps tracks of the number of bytes allocated by malloc
#[cfg(feature = "malloc_counted_size")]
malloc_bytes: AtomicUsize,
/// This stores the size in bytes for all the live objects in last GC. This counter is only updated in the GC release phase.
#[cfg(feature = "count_live_bytes_in_gc")]
pub live_bytes_in_last_gc: AtomicUsize,
/// Wrapper around analysis counters
#[cfg(feature = "analysis")]
pub analysis_manager: AnalysisManager<VM>,
Expand Down Expand Up @@ -552,6 +555,8 @@ impl<VM: VMBinding> BasePlan<VM> {
allocation_bytes: AtomicUsize::new(0),
#[cfg(feature = "malloc_counted_size")]
malloc_bytes: AtomicUsize::new(0),
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes_in_last_gc: AtomicUsize::new(0),
#[cfg(feature = "analysis")]
analysis_manager,
}
Expand Down
6 changes: 6 additions & 0 deletions src/plan/markcompact/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl<VM: VMBinding> GCWork<VM> for UpdateReferences<VM> {
#[cfg(feature = "extreme_assertions")]
mmtk.edge_logger.reset();

// We do two passes of transitive closures. We clear the live bytes from the first pass.
#[cfg(feature = "count_live_bytes_in_gc")]
mmtk.scheduler
.worker_group
.get_and_clear_worker_live_bytes();

// TODO investigate why the following will create duplicate edges
// scheduler.work_buckets[WorkBucketStage::RefForwarding]
// .add(ScanStackRoots::<ForwardingProcessEdges<VM>>::new());
Expand Down
2 changes: 1 addition & 1 deletion src/plan/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ObjectQueue for VectorQueue<ObjectReference> {
/// A transitive closure visitor to collect all the edges of an object.
pub struct ObjectsClosure<'a, E: ProcessEdgesWork> {
buffer: VectorQueue<EdgeOf<E>>,
worker: &'a mut GCWorker<E::VM>,
pub(crate) worker: &'a mut GCWorker<E::VM>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer needs to be pub(crate).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this. When we do worker.shared.increase_live_bytes() in ScanObjectsWork::do_work_common(), we already created ObjectsClosure which takes &mut GCWorker. We can't use worker directly, and we have to use it through ObjectsClosure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. I didn't notice that.

}

impl<'a, E: ProcessEdgesWork> ObjectsClosure<'a, E> {
Expand Down
59 changes: 57 additions & 2 deletions src/scheduler/gc_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
let result = w.designated_work.push(Box::new(ReleaseCollector));
debug_assert!(result.is_ok());
}

#[cfg(feature = "count_live_bytes_in_gc")]
{
let live_bytes = mmtk
.scheduler
.worker_group
.get_and_clear_worker_live_bytes();
self.plan
.base()
.live_bytes_in_last_gc
.store(live_bytes, std::sync::atomic::Ordering::SeqCst);
}
}
}

Expand Down Expand Up @@ -232,6 +244,28 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
self.elapsed.as_millis()
);

#[cfg(feature = "count_live_bytes_in_gc")]
{
let live_bytes = mmtk
.plan
.base()
.live_bytes_in_last_gc
.load(std::sync::atomic::Ordering::SeqCst);
let used_bytes =
mmtk.plan.get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE;
debug_assert!(
live_bytes <= used_bytes,
"Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.",
live_bytes, used_bytes
);
info!(
"Live objects = {} bytes ({:04.1}% of {} used pages)",
live_bytes,
live_bytes as f64 * 100.0 / used_bytes as f64,
mmtk.plan.get_used_pages()
);
}

// We assume this is the only running work packet that accesses plan at the point of execution
#[allow(clippy::cast_ref_to_mut)]
let plan_mut: &mut dyn Plan<VM = VM> = unsafe { &mut *(&*mmtk.plan as *const _ as *mut _) };
Expand Down Expand Up @@ -323,6 +357,16 @@ impl<E: ProcessEdgesWork> ObjectTracerContext<E::VM> for ProcessEdgesWorkTracerC
fn with_tracer<R, F>(&self, worker: &mut GCWorker<E::VM>, func: F) -> R
where
F: FnOnce(&mut Self::TracerType) -> R,
{
self.with_tracer_and_worker(worker, |tracer, _| func(tracer))
}
}

impl<E: ProcessEdgesWork> ProcessEdgesWorkTracerContext<E> {
// This Also exposes worker to the callback function. This is not a public method.
fn with_tracer_and_worker<R, F>(&self, worker: &mut GCWorker<E::VM>, func: F) -> R
where
F: FnOnce(&mut ProcessEdgesWorkTracer<E>, &mut GCWorker<E::VM>) -> R,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, it is unsafe to give the user access to both the tracer and the worker. The tracer is a closure that calls trace_object underneath, and the worker is also one of the arguments of trace_object. I planned to forbid this by adding a lifetime annotation, but I couldn't do it without the General Associated Type (GAT) feature before Rust 1.65.

See:

/// FIXME: The current code works because of the unsafe method `ProcessEdgesWork::set_worker`.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with this is that VM bindings can use ObjectTracerContext to get temporary access to trace_object during Scanning::process_weak_refs and Scanning::forward_weak_refs. Adding a with_tracer_and_worker will allow ScanObjects to record object sizes, but not for VM-specific weak reference processing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_tracer takes a mutable reference of worker, so I cannot use worker any more in ScanObjects::do_work_common. We need to a way to get the worker back from the tracer. Another option is to expose some methods from ObjectTracer so we can get the worker or directly increase the counter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with this is that VM bindings can use ObjectTracerContext to get temporary access to trace_object during Scanning::process_weak_refs and Scanning::forward_weak_refs. Adding a with_tracer_and_worker will allow ScanObjects to record object sizes, but not for VM-specific weak reference processing.

The remaining question is how to count live bytes for weak references. As the weak reference processing is totally at the binding side, we may have to expose a method and let the binding to call it when they scan weak refs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remaining question is how to count live bytes for weak references. As the weak reference processing is totally at the binding side, we may have to expose a method and let the binding to call it when they scan weak refs.

If we count the object size at ScanObjectsWork, it will not be a problem. The binding keeps objects alive during the weak reference processing stage using trace_object (via ObjectTracer::trace_object, which is implemented with ProcessEdgesWork::trace_object underneath and eventually gets to the space-specific trace_object). Any object visited by trace_object will eventually be visited by ScanObjectsWork.

{
let mmtk = worker.mmtk;

Expand All @@ -339,7 +383,7 @@ impl<E: ProcessEdgesWork> ObjectTracerContext<E::VM> for ProcessEdgesWorkTracerC
};

// The caller can use the tracer here.
let result = func(&mut tracer);
let result = func(&mut tracer, worker);

// Flush the queued nodes.
tracer.flush_if_not_empty();
Expand Down Expand Up @@ -826,6 +870,12 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {
// If an object supports edge-enqueuing, we enqueue its edges.
<VM as VMBinding>::VMScanning::scan_object(tls, object, &mut closure);
self.post_scan_object(object);

#[cfg(feature = "count_live_bytes_in_gc")]
closure
.worker
.shared
.increase_live_bytes(VM::VMObjectModel::get_current_size(object));
qinsoon marked this conversation as resolved.
Show resolved Hide resolved
} else {
// If an object does not support edge-enqueuing, we have to use
// `Scanning::scan_object_and_trace_edges` and offload the job of updating the
Expand All @@ -845,7 +895,7 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {
phantom_data: PhantomData,
};

object_tracer_context.with_tracer(worker, |object_tracer| {
object_tracer_context.with_tracer_and_worker(worker, |object_tracer, _worker| {
// Scan objects and trace their edges at the same time.
for object in scan_later.iter().copied() {
trace!("Scan object (node) {}", object);
Expand All @@ -855,6 +905,11 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {
object_tracer,
);
self.post_scan_object(object);

#[cfg(feature = "count_live_bytes_in_gc")]
_worker
.shared
.increase_live_bytes(VM::VMObjectModel::get_current_size(object));
qinsoon marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down
25 changes: 25 additions & 0 deletions src/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub fn current_worker_ordinal() -> Option<ThreadId> {
pub struct GCWorkerShared<VM: VMBinding> {
/// Worker-local statistics data.
stat: AtomicRefCell<WorkerLocalStat<VM>>,
/// Accumulated bytes for live objects in this GC. When each worker scans
/// objects, we increase the live bytes. We get this value from each worker
/// at the end of a GC, and reset this counter.
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes: AtomicUsize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need atomic for this? It's worker-local right?

Copy link
Member Author

@qinsoon qinsoon Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need atomic for this? It's worker-local right?

It is in the shared part of a worker. We cannot get a mutable reference to it. It needs to be in the shared part, as we need to iterate through all workers, and sum it up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. I was just thinking if we can make it cheap enough by not using atomic for worker local stats, we could enable this feature by default.

Copy link
Collaborator

@k-sareen k-sareen Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make it worker-local and each worker reports the live bytes after their transitive closure? Then we don't have to explicitly iterate through all the workers, we just collate the numbers we get from the transitive closure.

Copy link
Collaborator

@wks wks Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be any stage after the transitive closure stages (including weak reference processing stages).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to defer the "live byte count" to each policy as opposed to each worker? But then it stops being worker-local so it would need synchronization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's easier to defer the "live byte count" to each policy as opposed to each worker? But then it stops being worker-local so it would need synchronization.

Yeah. It need at least the same level synchronization as the current code. And the counting code is scattered to each policy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use AtomicUsize::as_mut_ptr and do non-atomic add with unsafe code when we do the counting. But I still question if we want to make this enabled by default.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't have a strong opinion. Keeping track of fragmentation is definitely useful though. If it's possible to do it cheaply for every GC, may as well do it. If it's not possible then I don't think should spend more time on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qinsoon Sorry about the typo. I meant ReleaseCollector. Yes. It will be a problem for MarkCompact. If we make the counter a private field of a Worker, it will require another rendezvous (designated works) between the marking phase and the forwarding phase. Given this situation, I don't mind if we use AtomicUsize for now.

/// A queue of GCWork that can only be processed by the owned thread.
///
/// Note: Currently, designated work cannot be added from the GC controller thread, or
Expand All @@ -45,10 +50,22 @@ impl<VM: VMBinding> GCWorkerShared<VM> {
pub fn new(stealer: Option<Stealer<Box<dyn GCWork<VM>>>>) -> Self {
Self {
stat: Default::default(),
#[cfg(feature = "count_live_bytes_in_gc")]
live_bytes: AtomicUsize::new(0),
designated_work: ArrayQueue::new(16),
stealer,
}
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub(crate) fn increase_live_bytes(&self, bytes: usize) {
self.live_bytes.fetch_add(bytes, Ordering::Relaxed);
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub(crate) fn get_and_clear_live_bytes(&self) -> usize {
self.live_bytes.swap(0, Ordering::SeqCst)
}
}

/// A GC worker. This part is privately owned by a worker thread.
Expand Down Expand Up @@ -285,6 +302,14 @@ impl<VM: VMBinding> WorkerGroup<VM> {
.iter()
.any(|w| !w.designated_work.is_empty())
}

#[cfg(feature = "count_live_bytes_in_gc")]
pub fn get_and_clear_worker_live_bytes(&self) -> usize {
self.workers_shared
.iter()
.map(|w| w.get_and_clear_live_bytes())
.sum()
}
}

/// This ensures the worker always decrements the parked worker count on all control flow paths.
Expand Down