Skip to content

Commit

Permalink
[PROF-8667] Heap Profiling - Part 3 - Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexJF committed Dec 13, 2023
1 parent 051c803 commit b4d7f97
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 7 deletions.
75 changes: 73 additions & 2 deletions ext/ddtrace_profiling_native_extension/heap_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <errno.h>

#define MAX_FRAMES_LIMIT 10000
#define MAX_QUEUE_LIMIT 10000

// A compact representation of a stacktrace frame for a heap allocation.
typedef struct {
Expand Down Expand Up @@ -98,6 +99,13 @@ typedef struct {
live_object_data object_data;
} partial_heap_recording;

typedef struct {
// Has ownership of this, needs to clean-it-up if not transferred.
heap_record *heap_record;
long obj_id;
live_object_data object_data;
} uncommitted_sample;

struct heap_recorder {
// Map[key: heap_record_key*, record: heap_record*]
// NOTE: We always use heap_record_key.type == HEAP_STACK for storage but support lookups
Expand All @@ -113,6 +121,12 @@ struct heap_recorder {
// Data for a heap recording that was started but not yet ended
partial_heap_recording active_recording;

// Storage for queued samples built while samples are being taken but records_mutex is locked.
// These will be flushed back to record tables on the next sample execution that can take
// a write lock on heap_records (or explicitly via ::heap_recorder_flush)
uncommitted_sample *queued_samples;
size_t queued_samples_len;

// Reusable location array, implementing a flyweight pattern for things like iteration.
ddog_prof_Location *reusable_locations;
};
Expand All @@ -124,6 +138,8 @@ static int st_object_record_entry_free_if_invalid(st_data_t, st_data_t, st_data_
static int st_object_records_iterate(st_data_t, st_data_t, st_data_t);
static int update_object_record_entry(st_data_t*, st_data_t*, st_data_t, int);
static void commit_allocation(heap_recorder*, heap_record*, long, live_object_data);
static void flush_queue(heap_recorder*);
static bool enqueue_allocation(heap_recorder*, heap_record*, long, live_object_data);

// ==========================
// Heap Recorder External API
Expand All @@ -146,6 +162,8 @@ heap_recorder* heap_recorder_new(void) {
// it as invalid/unset value.
.object_data = {0},
};
recorder->queued_samples = ruby_xcalloc(MAX_QUEUE_LIMIT, sizeof(uncommitted_sample));
recorder->queued_samples_len = 0;

return recorder;
}
Expand All @@ -163,6 +181,7 @@ void heap_recorder_free(heap_recorder *heap_recorder) {

pthread_mutex_destroy(&heap_recorder->records_mutex);

ruby_xfree(heap_recorder->queued_samples);
ruby_xfree(heap_recorder->reusable_locations);

ruby_xfree(heap_recorder);
Expand Down Expand Up @@ -240,12 +259,18 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro
int error = pthread_mutex_trylock(&heap_recorder->records_mutex);
if (error == EBUSY) {
// We weren't able to get a lock
// TODO: Add some queuing system so we can do something other than drop this data.
cleanup_heap_record_if_unused(heap_recorder, heap_record);
bool enqueued = enqueue_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data);
if (!enqueued) {
cleanup_heap_record_if_unused(heap_recorder, heap_record);
}
return;
}
if (error) ENFORCE_SUCCESS_GVL(error);

// We were able to get a lock to heap_records so lets flush any pending samples
// that might have been queued previously before adding this new one.
flush_queue(heap_recorder);

// And then commit the new allocation.
commit_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data);

Expand All @@ -257,7 +282,10 @@ void heap_recorder_flush(heap_recorder *heap_recorder) {
return;
}

ENFORCE_SUCCESS_GVL(pthread_mutex_lock(&heap_recorder->records_mutex));
flush_queue(heap_recorder);
st_foreach(heap_recorder->object_records, st_object_record_entry_free_if_invalid, (st_data_t) heap_recorder);
ENFORCE_SUCCESS_GVL(pthread_mutex_unlock(&heap_recorder->records_mutex));
}

// Internal data we need while performing iteration over live objects.
Expand Down Expand Up @@ -289,6 +317,21 @@ void heap_recorder_for_each_live_object(
ENFORCE_SUCCESS_HELPER(pthread_mutex_unlock(&heap_recorder->records_mutex), with_gvl);
}

void heap_recorder_testonly_lock(heap_recorder *heap_recorder) {
if (heap_recorder == NULL) {
rb_raise(rb_eRuntimeError, "Null heap recorder");
}
pthread_mutex_lock(&heap_recorder->records_mutex);
}

size_t heap_recorder_testonly_unlock(heap_recorder *heap_recorder) {
if (heap_recorder == NULL) {
rb_raise(rb_eRuntimeError, "Null heap recorder");
}
pthread_mutex_unlock(&heap_recorder->records_mutex);
return heap_recorder->queued_samples_len;
}

// ==========================
// Heap Recorder Internal API
// ==========================
Expand Down Expand Up @@ -458,6 +501,34 @@ static void cleanup_heap_record_if_unused(heap_recorder *heap_recorder, heap_rec
heap_record_free(heap_record);
}

// WARN: Expects records_mutex to be held
static void flush_queue(heap_recorder *heap_recorder) {
for (size_t i = 0; i < heap_recorder->queued_samples_len; i++) {
uncommitted_sample *queued_sample = &heap_recorder->queued_samples[i];
commit_allocation(heap_recorder, queued_sample->heap_record, queued_sample->obj_id, queued_sample->object_data);
*queued_sample = (uncommitted_sample) {0};
}
heap_recorder->queued_samples_len = 0;
}

// WARN: This can get called during Ruby GC. NO HEAP ALLOCATIONS OR EXCEPTIONS ARE ALLOWED.
static bool enqueue_allocation(heap_recorder *heap_recorder, heap_record *heap_record, long obj_id, live_object_data object_data) {
if (heap_recorder->queued_samples_len >= MAX_QUEUE_LIMIT) {
// FIXME: If we're droppping a free sample here, the accuracy of our heap profiles will be affected.
// Should we completely give up and stop sending heap profiles or should we trigger a flag that we
// can then use to add a warning in the UI? At the very least we'd want telemetry here.
return false;
}

heap_recorder->queued_samples[heap_recorder->queued_samples_len] = (uncommitted_sample) {
.heap_record = heap_record,
.obj_id = obj_id,
.object_data = object_data,
};
heap_recorder->queued_samples_len++;
return true;
}

// ===============
// Heap Record API
// ===============
Expand Down
8 changes: 8 additions & 0 deletions ext/ddtrace_profiling_native_extension/heap_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ void heap_recorder_for_each_live_object(
bool (*for_each_callback)(heap_recorder_iteration_data data, void* extra_arg),
void *for_each_callback_extra_arg,
bool with_gvl);

// v--- TEST-ONLY APIs ---v

// Lock the heap recorder as if iterating through it
void heap_recorder_testonly_lock(heap_recorder *heap_recorder);
// Unlock the heap recorder as if iterating through it and return the number of
// queued allocations sampled
size_t heap_recorder_testonly_unlock(heap_recorder *heap_recorder);
26 changes: 26 additions & 0 deletions ext/ddtrace_profiling_native_extension/stack_recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ static void serializer_set_start_timestamp_for_next_profile(struct stack_recorde
static VALUE _native_record_endpoint(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance, VALUE local_root_span_id, VALUE endpoint);
static void reset_profile(ddog_prof_Profile *profile, ddog_Timespec *start_time /* Can be null */);
static VALUE _native_track_object(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance, VALUE new_obj, VALUE weight);
static VALUE _native_start_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance);
static VALUE _native_end_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance);


void stack_recorder_init(VALUE profiling_module) {
Expand All @@ -245,6 +247,10 @@ void stack_recorder_init(VALUE profiling_module) {
rb_define_singleton_method(testing_module, "_native_slot_two_mutex_locked?", _native_is_slot_two_mutex_locked, 1);
rb_define_singleton_method(testing_module, "_native_record_endpoint", _native_record_endpoint, 3);
rb_define_singleton_method(testing_module, "_native_track_object", _native_track_object, 3);
rb_define_singleton_method(testing_module, "_native_start_fake_slow_heap_serialization",
_native_start_fake_slow_heap_serialization, 1);
rb_define_singleton_method(testing_module, "_native_end_fake_slow_heap_serialization",
_native_end_fake_slow_heap_serialization, 1);

ok_symbol = ID2SYM(rb_intern_const("ok"));
error_symbol = ID2SYM(rb_intern_const("error"));
Expand Down Expand Up @@ -745,3 +751,23 @@ static void reset_profile(ddog_prof_Profile *profile, ddog_Timespec *start_time
rb_raise(rb_eRuntimeError, "Failed to reset profile: %"PRIsVALUE, get_error_details_and_drop(&reset_result.err));
}
}

// This method exists only to enable testing Datadog::Profiling::StackRecorder behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_start_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance) {
struct stack_recorder_state *state;
TypedData_Get_Struct(recorder_instance, struct stack_recorder_state, &stack_recorder_typed_data, state);

heap_recorder_testonly_lock(state->heap_recorder);

return Qnil;
}

// This method exists only to enable testing Datadog::Profiling::StackRecorder behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_end_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance) {
struct stack_recorder_state *state;
TypedData_Get_Struct(recorder_instance, struct stack_recorder_state, &stack_recorder_typed_data, state);

return ULONG2NUM(heap_recorder_testonly_unlock(state->heap_recorder));
}
42 changes: 37 additions & 5 deletions spec/datadog/profiling/stack_recorder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ def sample_types_from(decoded_profile)

let(:samples) { samples_from_pprof(encoded_pprof) }

def sample_allocation(obj, _unused = 0)
# Heap sampling currently requires this 2-step process to first pass data about the allocated object...
described_class::Testing._native_track_object(stack_recorder, obj, sample_rate)
Datadog::Profiling::Collectors::Stack::Testing
._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 400, false)
end

before do
allocations = [a_string, an_array, 'a fearsome string', [-10..-1], a_hash, { 'z' => -1, 'y' => '-2', 'x' => false }]
@num_allocations = 0
Expand All @@ -355,12 +362,9 @@ def sample_types_from(decoded_profile)
described_class::Testing._native_track_object(stack_recorder, obj, sample_rate)
# ...and then pass data about the allocation stacktrace (with 2 distinct stacktraces)
if i.even?
Datadog::Profiling::Collectors::Stack::Testing
._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 400, false)
sample_allocation(obj, 1)
else
# 401 used instead of 400 here just to make the branches different and appease Rubocop
Datadog::Profiling::Collectors::Stack::Testing
._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 401, false)
sample_allocation(obj, 2)
end
@num_allocations += 1
end
Expand Down Expand Up @@ -419,6 +423,34 @@ def sample_types_from(decoded_profile)

expect(summed_values).to eq(expected_summed_values)
end

it "aren't lost when they happen concurrently with a long serialization" do
described_class::Testing._native_start_fake_slow_heap_serialization(stack_recorder)

test_num_allocated_object = 3
live_objects = Array.new(test_num_allocated_object)

test_num_allocated_object.times do |i|
live_objects[i] = "this is string number #{i}"
sample_allocation(live_objects[i])
end

allocation_line = __LINE__ - 3

queued_allocations = described_class::Testing._native_end_fake_slow_heap_serialization(stack_recorder)

expect(queued_allocations).to eq test_num_allocated_object

heap_samples_in_test_matcher = lambda { |sample|
(sample.values[:'heap-live-samples'] || 0) > 0 && sample.locations.any? do |location|
location.lineno == allocation_line && location.path == __FILE__
end
}

relevant_sample = heap_samples.find(&heap_samples_in_test_matcher)
expect(relevant_sample).not_to be nil
expect(relevant_sample.values[:'heap-live-samples']).to eq test_num_allocated_object * sample_rate
end
end
end

Expand Down

0 comments on commit b4d7f97

Please sign in to comment.