Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PROF-8667] Heap Profiling - Part 3 - Frees and Queue #3283

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 104 additions & 11 deletions ext/ddtrace_profiling_native_extension/heap_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <errno.h>

#define MAX_FRAMES_LIMIT 10000
#define MAX_QUEUE_LIMIT 10000

// A compact representation of a stacktrace frame for a heap allocation.
typedef struct {
Expand Down Expand Up @@ -99,6 +100,15 @@ typedef struct {
live_object_data object_data;
} partial_heap_recording;

typedef struct {
// Has ownership of this, needs to clean-it-up if not transferred.
heap_stack *stack;
VALUE obj;
live_object_data object_data;
bool free;
bool skip;
} uncommitted_sample;

struct heap_recorder {
// Map[key: heap_record_key*, record: heap_record*]
// NOTE: We always use heap_record_key.type == HEAP_STACK for storage but support lookups
Expand All @@ -114,6 +124,12 @@ struct heap_recorder {
// Data for a heap recording that was started but not yet ended
partial_heap_recording active_recording;

// Storage for queued samples built while samples are being taken but records_mutex is locked.
// These will be flushed back to record tables on the next sample execution that can take
// a write lock on heap_records (or explicitly via ::heap_recorder_flush)
uncommitted_sample *queued_samples;
size_t queued_samples_len;

// Reusable location array, implementing a flyweight pattern for things like iteration.
ddog_prof_Location *reusable_locations;
};
Expand All @@ -124,6 +140,10 @@ static int update_object_record_entry(st_data_t*, st_data_t*, st_data_t, int);
static void commit_allocation_with_heap_stack(heap_recorder*, heap_stack*, VALUE, live_object_data);
static void commit_allocation_with_heap_record(heap_recorder*, heap_record*, VALUE, live_object_data);
static void commit_free(heap_recorder*, VALUE, object_record*);
static void flush_queue(heap_recorder*);
static void enqueue_sample(heap_recorder*, uncommitted_sample);
static void enqueue_allocation(heap_recorder*, heap_stack*, VALUE, live_object_data);
static void enqueue_free(heap_recorder*, VALUE);

// ==========================
// Heap Recorder External API
Expand All @@ -143,6 +163,8 @@ heap_recorder* heap_recorder_new(void) {
.obj = Qnil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⬆️ I just noticed that for the recorder we're using xmalloc instead of xcalloc -- maybe worth using xcalloc? Just in case we forget to initialize something at some point and we get a more sane default.

.object_data = {0},
};
recorder->queued_samples = ruby_xcalloc(MAX_QUEUE_LIMIT, sizeof(uncommitted_sample));
recorder->queued_samples_len = 0;

return recorder;
}
Expand All @@ -157,6 +179,7 @@ void heap_recorder_free(struct heap_recorder* recorder) {
pthread_mutex_destroy(&recorder->records_mutex);

ruby_xfree(recorder->reusable_locations);
ruby_xfree(recorder->queued_samples);

ruby_xfree(recorder);
}
Expand Down Expand Up @@ -199,15 +222,23 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro
if (error) {
// unhappy and hopefully rare path
if (error == EBUSY) {
// We weren't able to get a lock
// TODO: Add some queuing system so we can do something other than drop this data.
// We weren't able to get a lock, enqueue it for later processing
// When enqueueing, always use a new stack, we can't guarantee a reference to a shared one
// won't be cleared in-between due to processing of a free.
// We could potentially improve this but this is not supposed to be the critical path anyway.
heap_stack *stack = heap_stack_new(locations);
enqueue_allocation(heap_recorder, stack, new_obj, active_recording->object_data);
} else {
// Something unexpected happened, lets error out
ENFORCE_SUCCESS_GVL(error)
}
return;
}
Comment on lines 222 to 236
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Maybe we could structure this as

if (error == EBUSY) {
  // ... stuff
  return;
}
if (error) ENFORCE_SUCCESS_GVL(error);

// Happy path goes here

to avoid the weird return that's hanging after the else but actually belongs to the if side only ;)


// We were able to get a lock to heap_records so lets flush any pending samples
// that might have been queued previously before adding this new one.
flush_queue(heap_recorder);

// And then commit the new allocation.
if (heap_record == NULL) {
// If we didn't find a matching heap record we'll need to create a new one so go with
Expand All @@ -223,10 +254,6 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro
ENFORCE_SUCCESS_GVL(pthread_mutex_unlock(&heap_recorder->records_mutex));
}

// TODO: Remove when things get implemented
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"

// WARN: This can get called during Ruby GC. NO HEAP ALLOCATIONS OR EXCEPTIONS ARE ALLOWED.
void record_heap_free(heap_recorder *heap_recorder, VALUE obj) {
object_record *object_record = NULL;
Expand All @@ -238,21 +265,36 @@ void record_heap_free(heap_recorder *heap_recorder, VALUE obj) {
st_lookup(heap_recorder->object_records, (st_data_t) obj, (st_data_t*) &object_record);

if (object_record == NULL) {
// we don't seem to be tracking this object on the table atm
// check if the allocation sample is in the queue
for (size_t i = 0; i < heap_recorder->queued_samples_len; i++) {
uncommitted_sample *queued_sample = &heap_recorder->queued_samples[i];
if (queued_sample->obj == obj && !queued_sample->skip) {
queued_sample->skip = true;
break;
}
}
Comment on lines 267 to +276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's something we want/can do something at this point (or maybe this is another reason not to use the freeobj tracepoint), but I'm thinking this can have a potential downside in a case such as:

  • The queue has a bunch of stuff to do
  • Ruby starts a GC, so queue is not processed
  • Ruby releases A LOT of objects, with in the extreme case none of them actually being tracked by us
  • We need to go through the (potentially long) queue for every one of this objects
  • Only after the GC do we process the queue


// free of an untracked object, return early
return;
}

// If we got this far, we freed a tracked object so we need to update and remove records!
// TODO: Implement. We need some form of allocation-free queueing system to allow us to
// handle this free without doing mutations on our state while potentially inside a Ruby GC.
// However, there's a caveat: we're under tight constraints and may be running during a GC where we are forbidden
// to do any more allocations. In certain situations, even calling ruby_xfree on an object_record may trigger
// such allocations (https://github.com/ruby/ruby/blob/ffb1eb37e74334ae85d6bfee07d784a145e23dd8/gc.c#L12599).
// We also do not want to risk triggering reentrant free sampling. Therefore, we take the extremely cautious
// approach of enqueuing this free to be applied at next allocation recording or flush with no explicit heap
// allocations or frees, direct or otherwise, happening during the execution of this method.
enqueue_free(heap_recorder, obj);
}

void heap_recorder_flush(heap_recorder *heap_recorder) {
// TODO: Implement
ENFORCE_SUCCESS_GVL(pthread_mutex_lock(&heap_recorder->records_mutex));
flush_queue(heap_recorder);
ENFORCE_SUCCESS_GVL(pthread_mutex_unlock(&heap_recorder->records_mutex));
}

#pragma GCC diagnostic pop

// Internal data we need while performing iteration over live objects.
typedef struct {
// The callback we need to call for each object.
Expand Down Expand Up @@ -461,6 +503,57 @@ static void commit_free(heap_recorder *heap_recorder, VALUE obj, object_record *
object_record_free(object_record);
}

// WARN: Expects records_mutex to be held
static void flush_queue(heap_recorder *heap_recorder) {
for (size_t i = 0; i < heap_recorder->queued_samples_len; i++) {
uncommitted_sample *queued_sample = &heap_recorder->queued_samples[i];
if (!queued_sample->skip) {
if (queued_sample->free) {
commit_free(heap_recorder, queued_sample->obj, NULL);
} else {
commit_allocation_with_heap_stack(heap_recorder, queued_sample->stack, queued_sample->obj, queued_sample->object_data);
}
}

*queued_sample = (uncommitted_sample) {0};
}
heap_recorder->queued_samples_len = 0;
}

// WARN: This can get called during Ruby GC. NO HEAP ALLOCATIONS OR EXCEPTIONS ARE ALLOWED.
static void enqueue_sample(heap_recorder *heap_recorder, uncommitted_sample new_sample) {
if (heap_recorder->queued_samples_len >= MAX_QUEUE_LIMIT) {
// FIXME: If we're droppping a free sample here, the accuracy of our heap profiles will be affected.
// Should we completely give up and stop sending heap profiles or should we trigger a flag that we
// can then use to add a warning in the UI? At the very least we'd want telemetry here.
return;
}

heap_recorder->queued_samples[heap_recorder->queued_samples_len] = new_sample;
heap_recorder->queued_samples_len++;
}

static void enqueue_allocation(heap_recorder *heap_recorder, heap_stack *heap_stack, VALUE obj, live_object_data object_data) {
enqueue_sample(heap_recorder, (uncommitted_sample) {
.stack = heap_stack,
.obj = obj,
.object_data = object_data,
.free = false,
.skip = false,
});
}

// WARN: This can get called during Ruby GC. NO HEAP ALLOCATIONS OR EXCEPTIONS ARE ALLOWED.
static void enqueue_free(heap_recorder *heap_recorder, VALUE obj) {
enqueue_sample(heap_recorder, (uncommitted_sample) {
.stack = NULL,
.obj = obj,
.object_data = {0},
.free = true,
.skip = false,
});
}

// ===============
// Heap Record API
// ===============
Expand Down
1 change: 0 additions & 1 deletion spec/datadog/profiling/stack_recorder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ def sample_types_from(decoded_profile)
let(:heap_samples_enabled) { true }

it 'are included in the profile' do
pending 'free handling is not implemented yet'
# We sample from 2 distinct locations but heap samples don't have the same
# labels as the others so they get duped.
expect(samples.size).to eq(4)
Expand Down
Loading