Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][adjuster] Implement ip attribute adjuster to operate on otlp data model #6355

Merged
merged 8 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type Adjuster interface {
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(trace ptrace.Traces) (ptrace.Traces, error)
type Func func(traces ptrace.Traces) (ptrace.Traces, error)

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
return f(trace)
func (f Func) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
return f(traces)
}

// Sequence creates an adjuster that combines a series of adjusters
Expand All @@ -44,17 +44,17 @@ type sequence struct {
failFast bool
}

func (c sequence) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
func (c sequence) Adjust(traces ptrace.Traces) (ptrace.Traces, error) {
var errs []error
for _, adjuster := range c.adjusters {
var err error
trace, err = adjuster.Adjust(trace)
traces, err = adjuster.Adjust(traces)
if err != nil {
if c.failFast {
return trace, err
return traces, err
}
errs = append(errs, err)
}
}
return trace, errors.Join(errs...)
return traces, errors.Join(errs...)
}
72 changes: 72 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package adjuster

import (
"bytes"
"encoding/binary"
"strconv"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var ipAttributesToCorrect = map[string]struct{}{
"ip": {},
"peer.ipv4": {},
}

// IPAttribute returns an adjuster that replaces numeric "ip" attributes,
// which usually contain IPv4 packed into uint32, with their string
// representation (e.g. "8.8.8.8"").
func IPAttribute() Adjuster {
adjustAttributes := func(attributes pcommon.Map) {
adjusted := make(map[string]string)

attributes.Range(func(k string, v pcommon.Value) bool {
if _, ok := ipAttributesToCorrect[k]; !ok {
return true
}
var value uint32
switch v.Type() {
case pcommon.ValueTypeInt:
//nolint: gosec // G115
value = uint32(v.Int())
case pcommon.ValueTypeDouble:
value = uint32(v.Double())
default:
return true
}
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], value)
var sBuf bytes.Buffer
for i, b := range buf {
if i > 0 {
sBuf.WriteRune('.')
}
sBuf.WriteString(strconv.FormatUint(uint64(b), 10))
}
adjusted[k] = sBuf.String()
return true
})

for k, v := range adjusted {
attributes.PutStr(k, v)
}
}

return Func(func(traces ptrace.Traces) (ptrace.Traces, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro The function in v1 looks like this

	return Func(func(trace *model.Trace) (*model.Trace, error) {
		for _, span := range trace.Spans {
			adjustTags(span.Tags)
			adjustTags(span.Process.Tags)
			model.KeyValues(span.Process.Tags).Sort()
		}
		return trace, nil
	})

Couple of questions:

  • Are process tags also just part of attributes in OTLP?
  • Do these attributes need to be sorted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • process tags are translated into Resource.attributes
  • I don't think they need to be sorted here.

resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
adjustAttributes(span.Attributes())
}
}
}
return traces, nil
})
}
47 changes: 47 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package adjuster

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestIPAttributeAdjuster(t *testing.T) {
traces := ptrace.NewTraces()
spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

spanA := spans.AppendEmpty()
spanA.Attributes().PutInt("a", 42)
spanA.Attributes().PutStr("ip", "not integer")
spanA.Attributes().PutInt("ip", 1<<24|2<<16|3<<8|4)
spanA.Attributes().PutStr("peer.ipv4", "something")

spanB := spans.AppendEmpty()
spanB.Attributes().PutDouble("ip", 1<<25|2<<16|3<<8|4)

trace, err := IPAttribute().Adjust(traces)
require.NoError(t, err)

span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
attributesA := span.At(0).Attributes()

val, ok := attributesA.Get("a")
require.True(t, ok)
require.EqualValues(t, 42, val.Int())

val, ok = attributesA.Get("ip")
require.True(t, ok)
require.EqualValues(t, "1.2.3.4", val.Str())

val, ok = attributesA.Get("peer.ipv4")
require.True(t, ok)
require.EqualValues(t, "something", val.Str())

attributesB := span.At(1).Attributes()

val, ok = attributesB.Get("ip")
require.True(t, ok)
require.EqualValues(t, "2.2.3.4", val.Str())

}
Loading