Skip to content

Commit

Permalink
Add support for "user labels" on profiles in new profiler
Browse files Browse the repository at this point in the history
This commit adds the concept of "user labels" on profiles. Currently,
when we generate a sample for a profile, we add a label in the generated
pprof for the thread name and the thread ID. The legacy old_stack.rb
collector _also_ adds labels for the active trace span ID, trace ID, and
trace resource name (via Datadog::Profiling::TraceIdentifiers), however
the new profiler does not currently set these.

The "user labels" concept extends this further by allowing applications
to set arbitrary label key/value pairs on the current thread, which will
then appear in any samples taken from that thread. This is done by
calling Datadog::Profiling::Collectors::CpuAndWallTime#set_user_label -
either with a key/value to set, or a nil value to unset.

The intended use for this is to allow application developers who have
obtained their pprof files (either by downloading from the Datadog UI,
or by setting a custom `profiling.exporter.transport` implementation) to
explore them using e.g. the https://github.com/google/pprof tool and
facet them by arbitrary dimensions. For example, I have implemented this
feature so we can see how the performance of certain methods varies
depending on which customer is calliung them.
  • Loading branch information
KJ Tsanaktsidis committed Oct 27, 2022
1 parent ba2a1d6 commit 192e5c8
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct per_thread_context {
thread_cpu_time_id thread_cpu_time_id;
long cpu_time_at_previous_sample_ns; // Can be INVALID_TIME until initialized or if getting it fails for another reason
long wall_time_at_previous_sample_ns; // Can be INVALID_TIME until initialized
VALUE user_labels;

struct {
// Both of these fields are set by on_gc_start and kept until sample_after_gc is called.
Expand Down Expand Up @@ -141,6 +142,7 @@ static long cpu_time_now_ns(struct per_thread_context *thread_context);
static long wall_time_now_ns(bool raise_on_failure);
static long thread_id_for(VALUE thread);
static VALUE _native_stats(VALUE self, VALUE collector_instance);
static VALUE _native_set_user_label(VALUE _self, VALUE collector_instance, VALUE thread, VALUE label_key, VALUE label_value);

void collectors_cpu_and_wall_time_init(VALUE profiling_module) {
VALUE collectors_module = rb_define_module_under(profiling_module, "Collectors");
Expand All @@ -160,6 +162,7 @@ void collectors_cpu_and_wall_time_init(VALUE profiling_module) {

rb_define_singleton_method(collectors_cpu_and_wall_time_class, "_native_initialize", _native_initialize, 3);
rb_define_singleton_method(collectors_cpu_and_wall_time_class, "_native_inspect", _native_inspect, 1);
rb_define_singleton_method(collectors_cpu_and_wall_time_class, "_native_set_user_label", _native_set_user_label, 4);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 1);
rb_define_singleton_method(testing_module, "_native_on_gc_start", _native_on_gc_start, 1);
rb_define_singleton_method(testing_module, "_native_on_gc_finish", _native_on_gc_finish, 1);
Expand Down Expand Up @@ -210,9 +213,11 @@ static void cpu_and_wall_time_collector_typed_data_free(void *state_ptr) {
}

// Mark Ruby thread references we keep as keys in hash_map_per_thread_context
static int hash_map_per_thread_context_mark(st_data_t key_thread, DDTRACE_UNUSED st_data_t _value, DDTRACE_UNUSED st_data_t _argument) {
static int hash_map_per_thread_context_mark(st_data_t key_thread, st_data_t value, DDTRACE_UNUSED st_data_t _argument) {
VALUE thread = (VALUE) key_thread;
rb_gc_mark(thread);
struct per_thread_context *thread_context = (struct per_thread_context *) value;
rb_gc_mark(thread_context->user_labels);
return ST_CONTINUE;
}

Expand Down Expand Up @@ -279,6 +284,31 @@ static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_
return Qtrue;
}

struct insert_user_label_context {
int ix;
ddog_Label *labels;
VALUE *user_labels_to_pin;
};

static int insert_user_label(VALUE userkey, VALUE uservalue, VALUE ctx_raw) {
struct insert_user_label_context *ctx = (struct insert_user_label_context *) ctx_raw;

// This is only valid so long as the thread_context->user_labels is valid and UNCHANGED for the duration of the
// below call to sample_thread; char_slice_from_ruby_string takes pointers to the underlying RString, but the
// end of sample_thread below eventually calls into libdatadog which copies these pointers.
// Thankfully, we can guarantee that user_labels is unchanged and still valid for the duration because we freeze
// strings added to the array and keep a reference to them on the C stack (thus ensuring they are pinned if a GC
// occurrs).
ctx->labels[ctx->ix] = (ddog_Label) {
.key = char_slice_from_ruby_string(userkey),
.str = char_slice_from_ruby_string(uservalue)
};
ctx->user_labels_to_pin[ctx->ix * 2] = userkey;
ctx->user_labels_to_pin[ctx->ix * 2 + 1] = uservalue;
ctx->ix++;
return ST_CONTINUE;
}

// This function gets called from the Collectors::CpuAndWallTimeWorker to trigger the actual sampling.
//
// Assumption 1: This function is called in a thread that is holding the Global VM Lock. Caller is responsible for enforcing this.
Expand Down Expand Up @@ -504,17 +534,31 @@ static void trigger_sample_for_thread(
VALUE thread_name = thread_name_for(thread);
bool have_thread_name = thread_name != Qnil;

int label_count = 1 + (have_thread_name ? 1 : 0);
int user_label_count = rb_hash_size_num(thread_context->user_labels);
int label_count = 1 + (have_thread_name ? 1 : 0) + user_label_count;
ddog_Label labels[label_count];

// This VLA serves no purpose other than to make sure that all the user label strings we added
// get considered "on the C stack" and thus pinned if a GC happens while we still hold an internal
// pointer to the string data in a ddprof_ffi_Slice_label struct.
VALUE user_labels_to_pin[user_label_count * 2];

struct insert_user_label_context insert_ctx = {
.ix = 1,
.labels = labels,
.user_labels_to_pin = user_labels_to_pin
};

labels[0] = (ddog_Label) {.key = DDOG_CHARSLICE_C("thread id"), .str = thread_context->thread_id_char_slice};
if (have_thread_name) {
labels[1] = (ddog_Label) {
.key = DDOG_CHARSLICE_C("thread name"),
.str = char_slice_from_ruby_string(thread_name)
};
insert_ctx.ix++;
}

rb_hash_foreach(thread_context->user_labels, insert_user_label, (VALUE)&insert_ctx);
sample_thread(
thread,
state->sampling_buffer,
Expand All @@ -523,6 +567,9 @@ static void trigger_sample_for_thread(
(ddog_Slice_label) {.ptr = labels, .len = label_count},
type
);

RB_GC_GUARD(user_labels_to_pin[0]);
RB_GC_GUARD(user_labels_to_pin[user_label_count - 1]);
}

// This method exists only to enable testing Datadog::Profiling::Collectors::CpuAndWallTime behavior using RSpec.
Expand Down Expand Up @@ -572,6 +619,8 @@ static void initialize_context(VALUE thread, struct per_thread_context *thread_c
thread_context->gc_tracking.cpu_time_at_finish_ns = INVALID_TIME;
thread_context->gc_tracking.wall_time_at_start_ns = INVALID_TIME;
thread_context->gc_tracking.wall_time_at_finish_ns = INVALID_TIME;

thread_context->user_labels = rb_hash_new();
}

static VALUE _native_inspect(DDTRACE_UNUSED VALUE _self, VALUE collector_instance) {
Expand Down Expand Up @@ -743,3 +792,29 @@ static VALUE _native_stats(DDTRACE_UNUSED VALUE _self, VALUE collector_instance)

return stats_as_ruby_hash(state);
}

static VALUE _native_set_user_label(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE thread, VALUE label_key, VALUE label_value) {
struct cpu_and_wall_time_collector_state *state;
TypedData_Get_Struct(collector_instance, struct cpu_and_wall_time_collector_state, &cpu_and_wall_time_collector_typed_data, state);
struct per_thread_context *thread_context = get_or_create_context_for(thread, state);
if (!RB_TYPE_P(label_key, T_STRING)) {
rb_raise(rb_eArgError, "label key was not a string");
}
if (RTEST(label_value)) {
if (!RB_TYPE_P(label_value, T_STRING)) {
rb_raise(rb_eArgError, "label value was not a string");
}
VALUE key_to_insert = label_key;
if (!RB_OBJ_FROZEN(key_to_insert)) {
key_to_insert = rb_str_freeze(rb_str_dup(label_key));
}
VALUE value_to_insert = label_value;
if (!RB_OBJ_FROZEN(value_to_insert)) {
value_to_insert = rb_str_freeze(rb_str_dup(label_value));
}
rb_hash_aset(thread_context->user_labels, key_to_insert, value_to_insert);
} else {
rb_hash_delete(thread_context->user_labels, label_key);
}
return Qnil;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ static VALUE _native_is_running(DDTRACE_UNUSED VALUE self, VALUE instance);
static void testing_signal_handler(DDTRACE_UNUSED int _signal, DDTRACE_UNUSED siginfo_t *_info, DDTRACE_UNUSED void *_ucontext);
static VALUE _native_install_testing_signal_handler(DDTRACE_UNUSED VALUE self);
static VALUE _native_remove_testing_signal_handler(DDTRACE_UNUSED VALUE self);
static VALUE _native_cpu_and_wall_time_collector(VALUE self, VALUE instance);

// Global state -- be very careful when accessing or modifying it

Expand Down Expand Up @@ -122,6 +123,7 @@ void collectors_cpu_and_wall_time_worker_init(VALUE profiling_module) {
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_initialize", _native_initialize, 2);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_sampling_loop", _native_sampling_loop, 1);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_stop", _native_stop, 1);
rb_define_singleton_method(collectors_cpu_and_wall_time_worker_class, "_native_cpu_and_wall_time_collector", _native_cpu_and_wall_time_collector, 1);
rb_define_singleton_method(testing_module, "_native_current_sigprof_signal_handler", _native_current_sigprof_signal_handler, 0);
rb_define_singleton_method(testing_module, "_native_is_running?", _native_is_running, 1);
rb_define_singleton_method(testing_module, "_native_install_testing_signal_handler", _native_install_testing_signal_handler, 0);
Expand Down Expand Up @@ -389,3 +391,9 @@ static VALUE _native_remove_testing_signal_handler(DDTRACE_UNUSED VALUE self) {
remove_sigprof_signal_handler();
return Qtrue;
}

static VALUE _native_cpu_and_wall_time_collector(DDTRACE_UNUSED VALUE self, VALUE instance) {
struct cpu_and_wall_time_worker_state *state;
TypedData_Get_Struct(instance, struct cpu_and_wall_time_worker_state, &cpu_and_wall_time_worker_typed_data, state);
return state->cpu_and_wall_time_collector_instance;
}
4 changes: 4 additions & 0 deletions lib/datadog/profiling/collectors/cpu_and_wall_time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def inspect
result[-1] = "#{self.class._native_inspect(self)}>"
result
end

def set_user_label(key, val)
self.class._native_set_user_label(self, Thread.current, key, val)
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/profiling/collectors/cpu_and_wall_time_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def stop(*_)
@failure_exception = nil
end
end

def cpu_and_wall_time_collector
self.class._native_cpu_and_wall_time_collector(self)
end
end
end
end
Expand Down

0 comments on commit 192e5c8

Please sign in to comment.