Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't read Go maps anymore #1413

Merged
merged 15 commits into from
Nov 27, 2024
1,115 changes: 1,115 additions & 0 deletions .gitignore

Large diffs are not rendered by default.

106 changes: 72 additions & 34 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include "tracing.h"
#include "trace_util.h"
#include "go_offsets.h"
#include "go_traceparent.h"
#include "pin_internal.h"

char __license[] SEC("license") = "Dual MIT/GPL";

enum { W3C_KEY_LENGTH = 11, W3C_VAL_LENGTH = 55 };

// Temporary information about a function invocation. It stores the invocation time of a function
// as well as the value of registers at the invocation time. This way we can retrieve them at the
// return uprobes so we can know the values of the function arguments (which are passed as registers
Expand Down Expand Up @@ -78,8 +79,9 @@ struct {
} ongoing_grpc_operate_headers SEC(".maps");

typedef struct grpc_transports {
u8 type;
connection_info_t conn;
tp_info_t tp;
u8 type;
} grpc_transports_t;

// TODO: use go_addr_key_t as key
Expand All @@ -105,6 +107,30 @@ struct {
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_sql_queries SEC(".maps");

typedef struct grpc_header_field {
u8 *key_ptr;
u64 key_len;
u8 *val_ptr;
u64 val_len;
u64 sensitive;
} grpc_header_field_t;

// assumes s2 is all lowercase
static __always_inline int bpf_memicmp(const char *s1, const char *s2, s32 size) {
for (int i = 0; i < size; i++) {
if (s1[i] != s2[i] && s1[i] != (s2[i] - 32)) // compare with each uppercase character
{
return i + 1;
}
}

return 0;
}

static __always_inline u8 valid_trace(const unsigned char *trace_id) {
return *((u64 *)trace_id) != 0 || *((u64 *)(trace_id + 8)) != 0;
}

static __always_inline void go_addr_key_from_id(go_addr_key_t *current, void *addr) {
u64 pid_tid = bpf_get_current_pid_tgid();
u32 pid = pid_from_pid_tgid(pid_tid);
Expand Down Expand Up @@ -178,24 +204,14 @@ static __always_inline void tp_clone(tp_info_t *dest, tp_info_t *src) {
}

static __always_inline void
server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
server_trace_parent(void *goroutine_addr, tp_info_t *tp, tp_info_t *found_tp) {
// May get overriden when decoding existing traceparent, but otherwise we set sample ON
tp->flags = 1;
// Get traceparent from the Request.Header
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);
if (traceparent_ptr != NULL) {
unsigned char buf[TP_MAX_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
} else {
bpf_dbg_printk("Decoding traceparent from headers %s", buf);
decode_go_traceparent(buf, tp->trace_id, tp->parent_id, &tp->flags);
}
if (found_tp) {
bpf_dbg_printk("Decoded from existing traceparent");
__builtin_memcpy(tp, found_tp, sizeof(tp_info_t));
} else {
connection_info_t *info = bpf_map_lookup_elem(&ongoing_server_connections, &g_key);
u8 found_info = 0;
Expand Down Expand Up @@ -242,29 +258,12 @@ server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
bpf_dbg_printk("tp: %s", tp_buf);
}

static __always_inline u8 client_trace_parent(void *goroutine_addr,
tp_info_t *tp_i,
void *req_header) {
// Get traceparent from the Request.Header
static __always_inline u8 client_trace_parent(void *goroutine_addr, tp_info_t *tp_i) {
u8 found_trace_id = 0;

// May get overriden when decoding existing traceparent or finding a server span, but otherwise we set sample ON
tp_i->flags = 1;

if (req_header) {
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[TP_MAX_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
} else {
found_trace_id = 1;
decode_go_traceparent(buf, tp_i->trace_id, tp_i->span_id, &tp_i->flags);
}
}
}

go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

Expand Down Expand Up @@ -401,4 +400,43 @@ static __always_inline void *unwrap_tls_conn_info(void *conn_ptr, void *tls_stat
return conn_ptr;
}

static __always_inline void process_meta_frame_headers(void *frame, tp_info_t *tp) {
if (!frame) {
return;
}

off_table_t *ot = get_offsets_table();

void *fields = 0;
u64 fields_off = go_offset_of(ot, (go_offset){.v = _meta_headers_frame_fields_ptr_pos});
bpf_probe_read(&fields, sizeof(fields), (void *)(frame + fields_off));
u64 fields_len = 0;
bpf_probe_read(&fields_len, sizeof(fields_len), (void *)(frame + fields_off + 8));
bpf_dbg_printk("fields ptr %llx, len %d", fields, fields_len);
if (fields && fields_len > 0) {
for (u8 i = 0; i < 16; i++) {
if (i >= fields_len) {
break;
}
void *field_ptr = fields + (i * sizeof(grpc_header_field_t));
//bpf_dbg_printk("field_ptr %llx", field_ptr);
grpc_header_field_t field = {};
bpf_probe_read(&field, sizeof(grpc_header_field_t), field_ptr);
//bpf_dbg_printk("grpc header %s:%s", field.key_ptr, field.val_ptr);
//bpf_dbg_printk("grpc sizes %d:%d", field.key_len, field.val_len);
if (field.key_len == W3C_KEY_LENGTH && field.val_len == W3C_VAL_LENGTH) {
u8 temp[W3C_VAL_LENGTH];

bpf_probe_read(&temp, W3C_KEY_LENGTH, field.key_ptr);
if (!bpf_memicmp((const char *)temp, "traceparent", W3C_KEY_LENGTH)) {
//bpf_dbg_printk("found grpc traceparent header");
bpf_probe_read(&temp, W3C_VAL_LENGTH, field.val_ptr);
decode_go_traceparent(temp, tp->trace_id, tp->parent_id, &tp->flags);
break;
}
}
}
}
}

#endif // GO_COMMON_H
53 changes: 38 additions & 15 deletions bpf/go_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "go_byte_arr.h"
#include "bpf_dbg.h"
#include "go_common.h"
#include "go_traceparent.h"
#include "hpack.h"
#include "ringbuf.h"

Expand Down Expand Up @@ -97,21 +96,29 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
};

if (stream_ptr) {
void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr,
sizeof(ctx_ptr),
void *st_ptr = 0;
void *tp_ptr = 0;
// Read the embedded object ptr
bpf_probe_read(&st_ptr,
sizeof(st_ptr),
(void *)(stream_ptr +
go_offset_of(ot, (go_offset){.v = _grpc_stream_ctx_ptr_pos}) +
go_offset_of(ot, (go_offset){.v = _grpc_stream_st_ptr_pos}) +
sizeof(void *)));

if (ctx_ptr) {
server_trace_parent(
goroutine_addr,
&invocation.tp,
(void *)(ctx_ptr + go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));
bpf_dbg_printk("st_ptr %llx", st_ptr);
if (st_ptr) {
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, &st_ptr);

bpf_dbg_printk("found t %llx", t);
if (t) {
bpf_dbg_printk("reading the traceparent from frame headers");
if (valid_trace(t->tp.trace_id)) {
tp_ptr = &t->tp;
}
}
}

server_trace_parent(goroutine_addr, &invocation.tp, tp_ptr);
}

if (bpf_map_update_elem(&ongoing_grpc_server_requests, &g_key, &invocation, BPF_ANY)) {
Expand All @@ -126,16 +133,32 @@ SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
bpf_dbg_printk(
"=== uprobe/http2Server_operateHeaders tr %llx goroutine %lx === ", tr, goroutine_addr);
void *frame = GO_PARAM2(ctx);
off_table_t *ot = get_offsets_table();

u64 new_offset_version = go_offset_of(ot, (go_offset){.v = _operate_headers_new});

// After grpc version 1.60, they added extra context argument to the
// function call, which adds two extra arguments.
if (new_offset_version) {
frame = GO_PARAM4(ctx);
}

bpf_dbg_printk("=== uprobe/GRPC http2Server_operateHeaders tr %llx goroutine %lx, new %d === ",
tr,
goroutine_addr,
new_offset_version);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

grpc_transports_t t = {
.type = TRANSPORT_HTTP2,
.conn = {0},
.tp = {0},
};

process_meta_frame_headers(frame, &t.tp);

bpf_map_update_elem(&ongoing_grpc_operate_headers, &g_key, &tr, BPF_ANY);
bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);

Expand Down Expand Up @@ -337,7 +360,7 @@ static __always_inline void clientConnStart(
go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));

invocation.flags = client_trace_parent(goroutine_addr, &invocation.tp, val_ptr);
invocation.flags = client_trace_parent(goroutine_addr, &invocation.tp);
} else {
// it's OK sending empty tp for a client, the userspace id generator will make random trace_id, span_id
bpf_dbg_printk("No ctx_ptr %llx", ctx_ptr);
Expand Down
3 changes: 1 addition & 2 deletions bpf/go_kafka_go.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ int uprobe_writer_write_messages(struct pt_regs *ctx) {

tp_info_t tp = {};

// We don't look up in the headers, no http/grpc request, therefore 0 as last argument
client_trace_parent(goroutine_addr, &tp, 0);
client_trace_parent(goroutine_addr, &tp);
go_addr_key_t p_key = {};
go_addr_key_from_id(&p_key, w_ptr);

Expand Down
Loading
Loading