From 264da3dcf6f78a813e9932481162e2182d1dff3c Mon Sep 17 00:00:00 2001 From: Ian Date: Fri, 18 Oct 2024 21:44:49 -0700 Subject: [PATCH] Update Otel to Support Rollups (#765) * Sending flow to logs in otel as an experiment * rollup support for otel * Fixing bug in how values are mapped to otel values * removing histo info from rollup because of cardinality --- pkg/formats/otel/otel.go | 118 ++++++++++++++++++++++--------- pkg/formats/otel/trap_handler.go | 4 +- 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/pkg/formats/otel/otel.go b/pkg/formats/otel/otel.go index 3ec42802..09f6694e 100644 --- a/pkg/formats/otel/otel.go +++ b/pkg/formats/otel/otel.go @@ -202,9 +202,80 @@ func (f *OtelFormat) From(raw *kt.Output) ([]map[string]interface{}, error) { } func (f *OtelFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) { + res := f.toOtelDataRollup(rolls) + if len(res) == 0 { + return nil, nil + } + + f.mux.Lock() + defer f.mux.Unlock() + for _, m := range res { + if _, ok := f.vecs[m.Name]; !ok { + lm := m + cv, err := otelm.Float64ObservableGauge( + lm.Name, + metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error { + for _, mm := range f.getLatestInputs(lm.Name) { + o.Observe(mm.Value, metric.WithAttributeSet(mm.GetTagValues())) + } + return nil + }), + ) + if err != nil { + return nil, err + } + f.vecs[lm.Name] = cv + f.inputs[lm.Name] = make([]OtelData, 0) + } + // Save this for later, for the next time the async callback is run. + f.inputs[m.Name] = append(f.inputs[m.Name], m) + } + return nil, nil } +var ( + rollupMap = map[string]string{ + "src_geo": "src_country", + "l4_src_port": "src_port", + "dst_geo": "dst_country", + "l4_dst_port": "dst_port", + } +) + +func (f *OtelFormat) toOtelDataRollup(in []rollup.Rollup) []OtelData { + ms := make([]OtelData, 0, len(in)) + for _, roll := range in { + dims := roll.GetDims() + attr := map[string]interface{}{} + bad := false + for i, pt := range strings.Split(roll.Dimension, roll.KeyJoin) { + aname := dims[i] + if n, ok := rollupMap[aname]; ok { + aname = n + } + attr[aname] = pt + if pt == "0" || pt == "" || pt == "-" || pt == "--" { + bad = true + } + if aname == "src_port" || aname == "dst_port" { // Remap high ports down here. + port, _ := strconv.Atoi(pt) + if port > 32768 { + attr[aname] = "32768" + } + } + } + if !bad { + ms = append(ms, OtelData{ + Name: "kentik.rollup." + roll.Name, + Value: roll.Metric, + Tags: attr, + }) + } + } + return ms +} + func (f *OtelFormat) getLatestInputs(name string) []OtelData { f.mux.Lock() defer f.mux.Unlock() @@ -232,7 +303,7 @@ func (f *OtelFormat) toOtelMetric(in *kt.JCHF) []OtelData { return f.fromKtranslate(in) case kt.KENTIK_EVENT_SNMP_TRAP, kt.KENTIK_EVENT_EXT: // This is actually an event, send out as an event to sink directly. - err := f.trapLog.RecordTrap(in) + err := f.trapLog.RecordLog(in, "New Trap Event") if err != nil { f.Errorf("There was an error when sending an event: %v.", err) } @@ -374,42 +445,13 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData { return ms } +// There's too much data to send as metrics here. Send on as a log instead. func (f *OtelFormat) fromKflow(in *kt.JCHF) []OtelData { - // Map the basic strings into here. - attr := map[string]interface{}{} - metrics := map[string]kt.MetricInfo{"in_bytes": kt.MetricInfo{}, "out_bytes": kt.MetricInfo{}, "in_pkts": kt.MetricInfo{}, "out_pkts": kt.MetricInfo{}, "latency_ms": kt.MetricInfo{}} - f.mux.RLock() - util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false) - f.mux.RUnlock() - ms := map[string]int64{} - for m, _ := range metrics { - switch m { - case "in_bytes": - ms[m] = int64(in.InBytes * uint64(in.SampleRate)) - case "out_bytes": - ms[m] = int64(in.OutBytes * uint64(in.SampleRate)) - case "in_pkts": - ms[m] = int64(in.InPkts * uint64(in.SampleRate)) - case "out_pkts": - ms[m] = int64(in.OutPkts * uint64(in.SampleRate)) - case "latency_ms": - ms[m] = int64(in.CustomInt["appl_latency_ms"]) - } - } - - res := []OtelData{} - for k, v := range ms { - if v == 0 { // Drop zero valued metrics here. - continue - } - res = append(res, OtelData{ - Name: "kentik.flow." + k, - Value: float64(v), - Tags: attr, - }) + err := f.trapLog.RecordLog(in, "KFlow") + if err != nil { + f.Errorf("There was an error when sending an event: %v.", err) } - - return res + return nil } func (f *OtelFormat) fromSnmpDeviceMetric(in *kt.JCHF) []OtelData { @@ -619,8 +661,14 @@ func (d *OtelData) GetTagValues() attribute.Set { res = append(res, attribute.String(k, t)) case int64: res = append(res, attribute.Int64(k, t)) + case int32: + res = append(res, attribute.Int64(k, int64(t))) case float64: res = append(res, attribute.Float64(k, t)) + case uint32: + res = append(res, attribute.Int64(k, int64(t))) + case uint64: + res = append(res, attribute.Int64(k, int64(t))) default: } } diff --git a/pkg/formats/otel/trap_handler.go b/pkg/formats/otel/trap_handler.go index fdb0da00..5c536003 100644 --- a/pkg/formats/otel/trap_handler.go +++ b/pkg/formats/otel/trap_handler.go @@ -119,7 +119,7 @@ func (ol *OtelLogger) watchForClose(ctx context.Context, loggerProvider *sdk.Log } // For now, just log everything as json -func (ol *OtelLogger) RecordTrap(msg *kt.JCHF) error { +func (ol *OtelLogger) RecordLog(msg *kt.JCHF, logLine string) error { flat := msg.Flatten() strip(flat) @@ -127,7 +127,7 @@ func (ol *OtelLogger) RecordTrap(msg *kt.JCHF) error { for k, v := range flat { atrs = append(atrs, slog.Any(k, v)) } - slog.LogAttrs(ol.ctx, slog.LevelInfo, "New Trap Event", atrs...) + slog.LogAttrs(ol.ctx, slog.LevelInfo, logLine, atrs...) return nil }