diff --git a/ext/ddtrace_profiling_native_extension/heap_recorder.c b/ext/ddtrace_profiling_native_extension/heap_recorder.c index d3ac41a91a2..65cfc909801 100644 --- a/ext/ddtrace_profiling_native_extension/heap_recorder.c +++ b/ext/ddtrace_profiling_native_extension/heap_recorder.c @@ -6,6 +6,7 @@ #include #define MAX_FRAMES_LIMIT 10000 +#define MAX_QUEUE_LIMIT 10000 // A compact representation of a stacktrace frame for a heap allocation. typedef struct { @@ -99,6 +100,13 @@ typedef struct { live_object_data object_data; } partial_heap_recording; +typedef struct { + // Has ownership of this, needs to clean-it-up if not transferred. + heap_record *heap_record; + long obj_id; + live_object_data object_data; +} uncommitted_sample; + struct heap_recorder { // Map[key: heap_record_key*, record: heap_record*] // NOTE: We always use heap_record_key.type == HEAP_STACK for storage but support lookups @@ -114,6 +122,12 @@ struct heap_recorder { // Data for a heap recording that was started but not yet ended partial_heap_recording active_recording; + // Storage for queued samples built while samples are being taken but records_mutex is locked. + // These will be flushed back to record tables on the next sample execution that can take + // a write lock on heap_records (or explicitly via ::heap_recorder_flush) + uncommitted_sample *queued_samples; + size_t queued_samples_len; + // Reusable location array, implementing a flyweight pattern for things like iteration. ddog_prof_Location *reusable_locations; }; @@ -125,6 +139,8 @@ static int st_object_record_entry_free_if_invalid(st_data_t, st_data_t, st_data_ static int st_object_records_iterate(st_data_t, st_data_t, st_data_t); static int update_object_record_entry(st_data_t*, st_data_t*, st_data_t, int); static void commit_allocation(heap_recorder*, heap_record*, long, live_object_data); +static void flush_queue(heap_recorder*); +static bool enqueue_allocation(heap_recorder*, heap_record*, long, live_object_data); // ========================== // Heap Recorder External API @@ -147,6 +163,8 @@ heap_recorder* heap_recorder_new(void) { // it as invalid/unset value. .object_data = {0}, }; + recorder->queued_samples = ruby_xcalloc(MAX_QUEUE_LIMIT, sizeof(uncommitted_sample)); + recorder->queued_samples_len = 0; return recorder; } @@ -164,6 +182,7 @@ void heap_recorder_free(heap_recorder *heap_recorder) { pthread_mutex_destroy(&heap_recorder->records_mutex); + ruby_xfree(heap_recorder->queued_samples); ruby_xfree(heap_recorder->reusable_locations); ruby_xfree(heap_recorder); @@ -241,12 +260,18 @@ void end_heap_allocation_recording(struct heap_recorder *heap_recorder, ddog_pro int error = pthread_mutex_trylock(&heap_recorder->records_mutex); if (error == EBUSY) { // We weren't able to get a lock - // TODO: Add some queuing system so we can do something other than drop this data. - cleanup_heap_record_if_unused(heap_recorder, heap_record); + bool enqueued = enqueue_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data); + if (!enqueued) { + cleanup_heap_record_if_unused(heap_recorder, heap_record); + } return; } if (error) ENFORCE_SUCCESS_GVL(error); + // We were able to get a lock to heap_records so lets flush any pending samples + // that might have been queued previously before adding this new one. + flush_queue(heap_recorder); + // And then commit the new allocation. commit_allocation(heap_recorder, heap_record, obj_id, active_recording->object_data); @@ -258,7 +283,10 @@ void heap_recorder_flush(heap_recorder *heap_recorder) { return; } + ENFORCE_SUCCESS_GVL(pthread_mutex_lock(&heap_recorder->records_mutex)); + flush_queue(heap_recorder); st_foreach(heap_recorder->object_records, st_object_record_entry_free_if_invalid, (st_data_t) heap_recorder); + ENFORCE_SUCCESS_GVL(pthread_mutex_unlock(&heap_recorder->records_mutex)); } // Internal data we need while performing iteration over live objects. @@ -290,6 +318,21 @@ void heap_recorder_for_each_live_object( ENFORCE_SUCCESS_HELPER(pthread_mutex_unlock(&heap_recorder->records_mutex), with_gvl); } +void heap_recorder_testonly_lock(heap_recorder *heap_recorder) { + if (heap_recorder == NULL) { + rb_raise(rb_eRuntimeError, "Null heap recorder"); + } + pthread_mutex_lock(&heap_recorder->records_mutex); +} + +size_t heap_recorder_testonly_unlock(heap_recorder *heap_recorder) { + if (heap_recorder == NULL) { + rb_raise(rb_eRuntimeError, "Null heap recorder"); + } + pthread_mutex_unlock(&heap_recorder->records_mutex); + return heap_recorder->queued_samples_len; +} + // ========================== // Heap Recorder Internal API // ========================== @@ -459,6 +502,34 @@ static void cleanup_heap_record_if_unused(heap_recorder *heap_recorder, heap_rec heap_record_free(heap_record); } +// WARN: Expects records_mutex to be held +static void flush_queue(heap_recorder *heap_recorder) { + for (size_t i = 0; i < heap_recorder->queued_samples_len; i++) { + uncommitted_sample *queued_sample = &heap_recorder->queued_samples[i]; + commit_allocation(heap_recorder, queued_sample->heap_record, queued_sample->obj_id, queued_sample->object_data); + *queued_sample = (uncommitted_sample) {0}; + } + heap_recorder->queued_samples_len = 0; +} + +// WARN: This can get called during Ruby GC. NO HEAP ALLOCATIONS OR EXCEPTIONS ARE ALLOWED. +static bool enqueue_allocation(heap_recorder *heap_recorder, heap_record *heap_record, long obj_id, live_object_data object_data) { + if (heap_recorder->queued_samples_len >= MAX_QUEUE_LIMIT) { + // FIXME: If we're droppping a free sample here, the accuracy of our heap profiles will be affected. + // Should we completely give up and stop sending heap profiles or should we trigger a flag that we + // can then use to add a warning in the UI? At the very least we'd want telemetry here. + return false; + } + + heap_recorder->queued_samples[heap_recorder->queued_samples_len] = (uncommitted_sample) { + .heap_record = heap_record, + .obj_id = obj_id, + .object_data = object_data, + }; + heap_recorder->queued_samples_len++; + return true; +} + // =============== // Heap Record API // =============== diff --git a/ext/ddtrace_profiling_native_extension/heap_recorder.h b/ext/ddtrace_profiling_native_extension/heap_recorder.h index fed7c3fc30f..3d8c17c9772 100644 --- a/ext/ddtrace_profiling_native_extension/heap_recorder.h +++ b/ext/ddtrace_profiling_native_extension/heap_recorder.h @@ -91,3 +91,11 @@ void heap_recorder_for_each_live_object( bool (*for_each_callback)(heap_recorder_iteration_data data, void* extra_arg), void *for_each_callback_extra_arg, bool with_gvl); + +// v--- TEST-ONLY APIs ---v + +// Lock the heap recorder as if iterating through it +void heap_recorder_testonly_lock(heap_recorder *heap_recorder); +// Unlock the heap recorder as if iterating through it and return the number of +// queued allocations sampled +size_t heap_recorder_testonly_unlock(heap_recorder *heap_recorder); diff --git a/ext/ddtrace_profiling_native_extension/stack_recorder.c b/ext/ddtrace_profiling_native_extension/stack_recorder.c index 4918f8caea8..b71f21d6f72 100644 --- a/ext/ddtrace_profiling_native_extension/stack_recorder.c +++ b/ext/ddtrace_profiling_native_extension/stack_recorder.c @@ -220,6 +220,8 @@ static void serializer_set_start_timestamp_for_next_profile(struct stack_recorde static VALUE _native_record_endpoint(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance, VALUE local_root_span_id, VALUE endpoint); static void reset_profile(ddog_prof_Profile *profile, ddog_Timespec *start_time /* Can be null */); static VALUE _native_track_object(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance, VALUE new_obj, VALUE weight); +static VALUE _native_start_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance); +static VALUE _native_end_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance); void stack_recorder_init(VALUE profiling_module) { @@ -245,6 +247,10 @@ void stack_recorder_init(VALUE profiling_module) { rb_define_singleton_method(testing_module, "_native_slot_two_mutex_locked?", _native_is_slot_two_mutex_locked, 1); rb_define_singleton_method(testing_module, "_native_record_endpoint", _native_record_endpoint, 3); rb_define_singleton_method(testing_module, "_native_track_object", _native_track_object, 3); + rb_define_singleton_method(testing_module, "_native_start_fake_slow_heap_serialization", + _native_start_fake_slow_heap_serialization, 1); + rb_define_singleton_method(testing_module, "_native_end_fake_slow_heap_serialization", + _native_end_fake_slow_heap_serialization, 1); ok_symbol = ID2SYM(rb_intern_const("ok")); error_symbol = ID2SYM(rb_intern_const("error")); @@ -745,3 +751,23 @@ static void reset_profile(ddog_prof_Profile *profile, ddog_Timespec *start_time rb_raise(rb_eRuntimeError, "Failed to reset profile: %"PRIsVALUE, get_error_details_and_drop(&reset_result.err)); } } + +// This method exists only to enable testing Datadog::Profiling::StackRecorder behavior using RSpec. +// It SHOULD NOT be used for other purposes. +static VALUE _native_start_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance) { + struct stack_recorder_state *state; + TypedData_Get_Struct(recorder_instance, struct stack_recorder_state, &stack_recorder_typed_data, state); + + heap_recorder_testonly_lock(state->heap_recorder); + + return Qnil; +} + +// This method exists only to enable testing Datadog::Profiling::StackRecorder behavior using RSpec. +// It SHOULD NOT be used for other purposes. +static VALUE _native_end_fake_slow_heap_serialization(DDTRACE_UNUSED VALUE _self, VALUE recorder_instance) { + struct stack_recorder_state *state; + TypedData_Get_Struct(recorder_instance, struct stack_recorder_state, &stack_recorder_typed_data, state); + + return ULONG2NUM(heap_recorder_testonly_unlock(state->heap_recorder)); +} diff --git a/spec/datadog/profiling/stack_recorder_spec.rb b/spec/datadog/profiling/stack_recorder_spec.rb index f2918d7a179..a029abedffe 100644 --- a/spec/datadog/profiling/stack_recorder_spec.rb +++ b/spec/datadog/profiling/stack_recorder_spec.rb @@ -347,6 +347,13 @@ def sample_types_from(decoded_profile) let(:samples) { samples_from_pprof(encoded_pprof) } + def sample_allocation(obj, _unused = 0) + # Heap sampling currently requires this 2-step process to first pass data about the allocated object... + described_class::Testing._native_track_object(stack_recorder, obj, sample_rate) + Datadog::Profiling::Collectors::Stack::Testing + ._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 400, false) + end + before do allocations = [a_string, an_array, 'a fearsome string', [-10..-1], a_hash, { 'z' => -1, 'y' => '-2', 'x' => false }] @num_allocations = 0 @@ -355,12 +362,9 @@ def sample_types_from(decoded_profile) described_class::Testing._native_track_object(stack_recorder, obj, sample_rate) # ...and then pass data about the allocation stacktrace (with 2 distinct stacktraces) if i.even? - Datadog::Profiling::Collectors::Stack::Testing - ._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 400, false) + sample_allocation(obj, 1) else - # 401 used instead of 400 here just to make the branches different and appease Rubocop - Datadog::Profiling::Collectors::Stack::Testing - ._native_sample(Thread.current, stack_recorder, metric_values, labels, numeric_labels, 401, false) + sample_allocation(obj, 2) end @num_allocations += 1 end @@ -419,6 +423,34 @@ def sample_types_from(decoded_profile) expect(summed_values).to eq(expected_summed_values) end + + it "aren't lost when they happen concurrently with a long serialization" do + described_class::Testing._native_start_fake_slow_heap_serialization(stack_recorder) + + test_num_allocated_object = 3 + live_objects = Array.new(test_num_allocated_object) + + test_num_allocated_object.times do |i| + live_objects[i] = "this is string number #{i}" + sample_allocation(live_objects[i]) + end + + allocation_line = __LINE__ - 3 + + queued_allocations = described_class::Testing._native_end_fake_slow_heap_serialization(stack_recorder) + + expect(queued_allocations).to eq test_num_allocated_object + + heap_samples_in_test_matcher = lambda { |sample| + (sample.values[:'heap-live-samples'] || 0) > 0 && sample.locations.any? do |location| + location.lineno == allocation_line && location.path == __FILE__ + end + } + + relevant_sample = heap_samples.find(&heap_samples_in_test_matcher) + expect(relevant_sample).not_to be nil + expect(relevant_sample.values[:'heap-live-samples']).to eq test_num_allocated_object * sample_rate + end end end