Skip to content

Commit

Permalink
Update Otel to Support Rollups (#765)
Browse files Browse the repository at this point in the history
* Sending flow to logs in otel as an experiment

* rollup support for otel

* Fixing bug in how values are mapped to otel values

* removing histo info from rollup because of cardinality
  • Loading branch information
i3149 authored Oct 19, 2024
1 parent 248bfbe commit 264da3d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 37 deletions.
118 changes: 83 additions & 35 deletions pkg/formats/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,80 @@ func (f *OtelFormat) From(raw *kt.Output) ([]map[string]interface{}, error) {
}

func (f *OtelFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) {
res := f.toOtelDataRollup(rolls)
if len(res) == 0 {
return nil, nil
}

f.mux.Lock()
defer f.mux.Unlock()
for _, m := range res {
if _, ok := f.vecs[m.Name]; !ok {
lm := m
cv, err := otelm.Float64ObservableGauge(
lm.Name,
metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error {
for _, mm := range f.getLatestInputs(lm.Name) {
o.Observe(mm.Value, metric.WithAttributeSet(mm.GetTagValues()))
}
return nil
}),
)
if err != nil {
return nil, err
}
f.vecs[lm.Name] = cv
f.inputs[lm.Name] = make([]OtelData, 0)
}
// Save this for later, for the next time the async callback is run.
f.inputs[m.Name] = append(f.inputs[m.Name], m)
}

return nil, nil
}

var (
rollupMap = map[string]string{
"src_geo": "src_country",
"l4_src_port": "src_port",
"dst_geo": "dst_country",
"l4_dst_port": "dst_port",
}
)

func (f *OtelFormat) toOtelDataRollup(in []rollup.Rollup) []OtelData {
ms := make([]OtelData, 0, len(in))
for _, roll := range in {
dims := roll.GetDims()
attr := map[string]interface{}{}
bad := false
for i, pt := range strings.Split(roll.Dimension, roll.KeyJoin) {
aname := dims[i]
if n, ok := rollupMap[aname]; ok {
aname = n
}
attr[aname] = pt
if pt == "0" || pt == "" || pt == "-" || pt == "--" {
bad = true
}
if aname == "src_port" || aname == "dst_port" { // Remap high ports down here.
port, _ := strconv.Atoi(pt)
if port > 32768 {
attr[aname] = "32768"
}
}
}
if !bad {
ms = append(ms, OtelData{
Name: "kentik.rollup." + roll.Name,
Value: roll.Metric,
Tags: attr,
})
}
}
return ms
}

func (f *OtelFormat) getLatestInputs(name string) []OtelData {
f.mux.Lock()
defer f.mux.Unlock()
Expand Down Expand Up @@ -232,7 +303,7 @@ func (f *OtelFormat) toOtelMetric(in *kt.JCHF) []OtelData {
return f.fromKtranslate(in)
case kt.KENTIK_EVENT_SNMP_TRAP, kt.KENTIK_EVENT_EXT:
// This is actually an event, send out as an event to sink directly.
err := f.trapLog.RecordTrap(in)
err := f.trapLog.RecordLog(in, "New Trap Event")
if err != nil {
f.Errorf("There was an error when sending an event: %v.", err)
}
Expand Down Expand Up @@ -374,42 +445,13 @@ func (f *OtelFormat) fromKSynth(in *kt.JCHF) []OtelData {
return ms
}

// There's too much data to send as metrics here. Send on as a log instead.
func (f *OtelFormat) fromKflow(in *kt.JCHF) []OtelData {
// Map the basic strings into here.
attr := map[string]interface{}{}
metrics := map[string]kt.MetricInfo{"in_bytes": kt.MetricInfo{}, "out_bytes": kt.MetricInfo{}, "in_pkts": kt.MetricInfo{}, "out_pkts": kt.MetricInfo{}, "latency_ms": kt.MetricInfo{}}
f.mux.RLock()
util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false)
f.mux.RUnlock()
ms := map[string]int64{}
for m, _ := range metrics {
switch m {
case "in_bytes":
ms[m] = int64(in.InBytes * uint64(in.SampleRate))
case "out_bytes":
ms[m] = int64(in.OutBytes * uint64(in.SampleRate))
case "in_pkts":
ms[m] = int64(in.InPkts * uint64(in.SampleRate))
case "out_pkts":
ms[m] = int64(in.OutPkts * uint64(in.SampleRate))
case "latency_ms":
ms[m] = int64(in.CustomInt["appl_latency_ms"])
}
}

res := []OtelData{}
for k, v := range ms {
if v == 0 { // Drop zero valued metrics here.
continue
}
res = append(res, OtelData{
Name: "kentik.flow." + k,
Value: float64(v),
Tags: attr,
})
err := f.trapLog.RecordLog(in, "KFlow")
if err != nil {
f.Errorf("There was an error when sending an event: %v.", err)
}

return res
return nil
}

func (f *OtelFormat) fromSnmpDeviceMetric(in *kt.JCHF) []OtelData {
Expand Down Expand Up @@ -619,8 +661,14 @@ func (d *OtelData) GetTagValues() attribute.Set {
res = append(res, attribute.String(k, t))
case int64:
res = append(res, attribute.Int64(k, t))
case int32:
res = append(res, attribute.Int64(k, int64(t)))
case float64:
res = append(res, attribute.Float64(k, t))
case uint32:
res = append(res, attribute.Int64(k, int64(t)))
case uint64:
res = append(res, attribute.Int64(k, int64(t)))
default:
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/formats/otel/trap_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func (ol *OtelLogger) watchForClose(ctx context.Context, loggerProvider *sdk.Log
}

// For now, just log everything as json
func (ol *OtelLogger) RecordTrap(msg *kt.JCHF) error {
func (ol *OtelLogger) RecordLog(msg *kt.JCHF, logLine string) error {
flat := msg.Flatten()
strip(flat)

atrs := make([]slog.Attr, 0)
for k, v := range flat {
atrs = append(atrs, slog.Any(k, v))
}
slog.LogAttrs(ol.ctx, slog.LevelInfo, "New Trap Event", atrs...)
slog.LogAttrs(ol.ctx, slog.LevelInfo, logLine, atrs...)

return nil
}
Expand Down

0 comments on commit 264da3d

Please sign in to comment.