diff --git a/cmd/query/app/querysvc/adjuster/adjuster.go b/cmd/query/app/querysvc/adjuster/adjuster.go index 748eee10b07..e6c4f4039fb 100644 --- a/cmd/query/app/querysvc/adjuster/adjuster.go +++ b/cmd/query/app/querysvc/adjuster/adjuster.go @@ -18,11 +18,11 @@ type Adjuster interface { } // Func is a type alias that wraps a function and makes an Adjuster from it. -type Func func(trace ptrace.Traces) (ptrace.Traces, error) +type Func func(traces ptrace.Traces) (ptrace.Traces, error) // Adjust implements Adjuster interface for the Func alias. -func (f Func) Adjust(trace ptrace.Traces) (ptrace.Traces, error) { - return f(trace) +func (f Func) Adjust(traces ptrace.Traces) (ptrace.Traces, error) { + return f(traces) } // Sequence creates an adjuster that combines a series of adjusters @@ -44,17 +44,17 @@ type sequence struct { failFast bool } -func (c sequence) Adjust(trace ptrace.Traces) (ptrace.Traces, error) { +func (c sequence) Adjust(traces ptrace.Traces) (ptrace.Traces, error) { var errs []error for _, adjuster := range c.adjusters { var err error - trace, err = adjuster.Adjust(trace) + traces, err = adjuster.Adjust(traces) if err != nil { if c.failFast { - return trace, err + return traces, err } errs = append(errs, err) } } - return trace, errors.Join(errs...) + return traces, errors.Join(errs...) } diff --git a/cmd/query/app/querysvc/adjuster/ipattribute.go b/cmd/query/app/querysvc/adjuster/ipattribute.go new file mode 100644 index 00000000000..0ddb9b45447 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/ipattribute.go @@ -0,0 +1,77 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "bytes" + "encoding/binary" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +var ipAttributesToCorrect = map[string]struct{}{ + "ip": {}, + "peer.ipv4": {}, +} + +// IPAttribute returns an adjuster that replaces numeric "ip" attributes, +// which usually contain IPv4 packed into uint32, with their string +// representation (e.g. "8.8.8.8""). +func IPAttribute() Adjuster { + return Func(func(traces ptrace.Traces) (ptrace.Traces, error) { + adjuster := ipAttributeAdjuster{} + resourceSpans := traces.ResourceSpans() + for i := 0; i < resourceSpans.Len(); i++ { + rs := resourceSpans.At(i) + adjuster.adjust(rs.Resource().Attributes()) + scopeSpans := rs.ScopeSpans() + for j := 0; j < scopeSpans.Len(); j++ { + ss := scopeSpans.At(j) + spans := ss.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + adjuster.adjust(span.Attributes()) + } + } + } + return traces, nil + }) +} + +type ipAttributeAdjuster struct{} + +func (ipAttributeAdjuster) adjust(attributes pcommon.Map) { + adjusted := make(map[string]string) + attributes.Range(func(k string, v pcommon.Value) bool { + if _, ok := ipAttributesToCorrect[k]; !ok { + return true + } + var value uint32 + switch v.Type() { + case pcommon.ValueTypeInt: + //nolint: gosec // G115 + value = uint32(v.Int()) + case pcommon.ValueTypeDouble: + value = uint32(v.Double()) + default: + return true + } + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], value) + var sBuf bytes.Buffer + for i, b := range buf { + if i > 0 { + sBuf.WriteRune('.') + } + sBuf.WriteString(strconv.FormatUint(uint64(b), 10)) + } + adjusted[k] = sBuf.String() + return true + }) + for k, v := range adjusted { + attributes.PutStr(k, v) + } +} diff --git a/cmd/query/app/querysvc/adjuster/ipattribute_test.go b/cmd/query/app/querysvc/adjuster/ipattribute_test.go new file mode 100644 index 00000000000..cfd9ddeea26 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/ipattribute_test.go @@ -0,0 +1,81 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestIPAttributeAdjuster(t *testing.T) { + traces := ptrace.NewTraces() + resourceSpans := traces.ResourceSpans().AppendEmpty() + + resourceAttributes := resourceSpans.Resource().Attributes() + resourceAttributes.PutInt("a", 42) + resourceAttributes.PutInt("ip", 1<<24|2<<16|3<<8|4) + resourceAttributes.PutStr("peer.ipv4", "something") + + spans := resourceSpans.ScopeSpans().AppendEmpty().Spans() + + createSpan := func(attrs map[string]any) { + span := spans.AppendEmpty() + for key, value := range attrs { + switch v := value.(type) { + case int: + span.Attributes().PutInt(key, int64(v)) + case string: + span.Attributes().PutStr(key, v) + case float64: + span.Attributes().PutDouble(key, v) + } + } + } + + createSpan(map[string]any{ + "a": 42, + "ip": int(1<<25 | 2<<16 | 3<<8 | 4), + "peer.ipv4": "something else", + }) + + createSpan(map[string]any{ + "ip": float64(1<<26 | 2<<16 | 3<<8 | 4), + }) + + assertAttribute := func(attributes pcommon.Map, key string, expected any) { + val, ok := attributes.Get(key) + require.True(t, ok) + switch v := expected.(type) { + case int: + require.EqualValues(t, v, val.Int()) + case string: + require.EqualValues(t, v, val.Str()) + } + } + + trace, err := IPAttribute().Adjust(traces) + require.NoError(t, err) + + resourceSpan := trace.ResourceSpans().At(0) + assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len()) + + assertAttribute(resourceSpan.Resource().Attributes(), "a", 42) + assertAttribute(resourceSpan.Resource().Attributes(), "ip", "1.2.3.4") + assertAttribute(resourceSpan.Resource().Attributes(), "peer.ipv4", "something") + + gotSpans := resourceSpan.ScopeSpans().At(0).Spans() + assert.Equal(t, 2, gotSpans.Len()) + + assert.Equal(t, 3, gotSpans.At(0).Attributes().Len()) + assertAttribute(gotSpans.At(0).Attributes(), "a", 42) + assertAttribute(gotSpans.At(0).Attributes(), "ip", "2.2.3.4") + assertAttribute(gotSpans.At(0).Attributes(), "peer.ipv4", "something else") + + assert.Equal(t, 1, gotSpans.At(1).Attributes().Len()) + assertAttribute(gotSpans.At(1).Attributes(), "ip", "4.2.3.4") +}