Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Preserve attribute names and metric names on prefix conflict in OTel mapping mode #35651

Merged
merged 10 commits into from
Oct 17, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Preserve attribute names and metric names on prefix conflict in OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35651]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: e.g. if there are attributes "a" and "a.b", they should be sent to Elasticsearch as is, instead of "a.value" and "a.b", in OTel mapping mode

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
127 changes: 127 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,35 @@ func TestExporterLogs(t *testing.T) {
assert.Equal(t, `{"some.scope.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendLogs(t, exporter, newLogsWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

func TestExporterMetrics(t *testing.T) {
Expand Down Expand Up @@ -1300,6 +1329,75 @@ func TestExporterMetrics(t *testing.T) {
assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode metric name conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetric := metrics.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

fooBarMetric := scopeMetric.Metrics().AppendEmpty()
fooBarMetric.SetName("foo.bar")
fooBarMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

fooMetric := scopeMetric.Metrics().AppendEmpty()
fooMetric.SetName("foo")
fooMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

fooBarBazMetric := scopeMetric.Metrics().AppendEmpty()
fooBarBazMetric.SetName("foo.bar.baz")
fooBarBazMetric.SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(1)
expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendMetrics(t, exporter, newMetricsWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("publish summary", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down Expand Up @@ -1600,6 +1698,35 @@ func TestExporterTraces(t *testing.T) {
assert.Equal(t, `{"some.resource.attribute":["foo","bar"]}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
}
})

t.Run("otel mode attribute key prefix conflict", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

mustSendTraces(t, exporter, newTracesWithAttributes(map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}, map[string]any{
"a": "a",
"a.b": "a.b",
}))

rec.WaitItems(1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

// TestExporterAuth verifies that the Elasticsearch exporter supports
Expand Down
32 changes: 17 additions & 15 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ func (doc *Document) sort() {
// The filtering only keeps the last value for a key.
//
// Dedup ensure that keys are sorted.
func (doc *Document) Dedup() {
func (doc *Document) Dedup(appendValueOnConflict bool) {
// 1. Always ensure the fields are sorted, Dedup support requires
// Fields to be sorted.
doc.sort()

// 2. rename fields if a primitive value is overwritten by an object.
// 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict.
// For example the pair (path.x=1, path.x.a="test") becomes:
// (path.x.value=1, path.x.a="test").
//
Expand All @@ -227,16 +227,18 @@ func (doc *Document) Dedup() {
// field in favor of the `value` field in the document.
//
// This step removes potential conflicts when dedotting and serializing fields.
var renamed bool
for i := 0; i < len(doc.fields)-1; i++ {
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
renamed = true
doc.fields[i].key = key + ".value"
if appendValueOnConflict {
var renamed bool
for i := 0; i < len(doc.fields)-1; i++ {
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
renamed = true
doc.fields[i].key = key + ".value"
}
}
if renamed {
doc.sort()
}
}
if renamed {
doc.sort()
}

// 3. mark duplicates as 'ignore'
Expand All @@ -251,7 +253,7 @@ func (doc *Document) Dedup() {

// 4. fix objects that might be stored in arrays
for i := range doc.fields {
doc.fields[i].value.Dedup()
doc.fields[i].value.Dedup(appendValueOnConflict)
}
}

Expand Down Expand Up @@ -487,13 +489,13 @@ func (v *Value) sort() {
// Dedup recursively dedups keys in stored documents.
//
// NOTE: The value MUST be sorted.
func (v *Value) Dedup() {
func (v *Value) Dedup(appendValueOnConflict bool) {
switch v.kind {
case KindObject:
v.doc.Dedup()
v.doc.Dedup(appendValueOnConflict)
case KindArr:
for i := range v.arr {
v.arr[i].Dedup()
v.arr[i].Dedup(appendValueOnConflict)
}
}
}
Expand Down
40 changes: 29 additions & 11 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ func TestObjectModel_CreateMap(t *testing.T) {

func TestObjectModel_Dedup(t *testing.T) {
tests := map[string]struct {
build func() Document
want Document
build func() Document
appendValueOnConflict bool
want Document
}{
"no duplicates": {
build: func() (doc Document) {
doc.AddInt("a", 1)
doc.AddInt("c", 3)
return doc
},
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}},
},
"duplicate keys": {
build: func() (doc Document) {
Expand All @@ -104,7 +106,8 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("a", 2)
return doc
},
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
},
"duplicate after flattening from map: namespace object at end": {
build: func() Document {
Expand All @@ -114,7 +117,8 @@ func TestObjectModel_Dedup(t *testing.T) {
am.PutEmptyMap("namespace").PutInt("a", 23)
return DocumentFromAttributes(am)
},
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}},
},
"duplicate after flattening from map: namespace object at beginning": {
build: func() Document {
Expand All @@ -124,7 +128,8 @@ func TestObjectModel_Dedup(t *testing.T) {
am.PutStr("toplevel", "test")
return DocumentFromAttributes(am)
},
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}},
},
"dedup in arrays": {
build: func() (doc Document) {
Expand All @@ -136,6 +141,7 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded}))
return doc
},
appendValueOnConflict: true,
want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{
{"a", ignoreValue},
{"a", IntValue(2)},
Expand All @@ -148,7 +154,8 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("namespace.a", 2)
return doc
},
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}},
},
"dedup removes primitive if value exists": {
build: func() (doc Document) {
Expand All @@ -157,14 +164,25 @@ func TestObjectModel_Dedup(t *testing.T) {
doc.AddInt("namespace.value", 3)
return doc
},
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
appendValueOnConflict: true,
want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}},
},
"dedup without append value on conflict": {
build: func() (doc Document) {
doc.AddInt("namespace", 1)
doc.AddInt("namespace.a", 2)
doc.AddInt("namespace.value", 3)
return doc
},
appendValueOnConflict: false,
want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
doc := test.build()
doc.Dedup()
doc.Dedup(test.appendValueOnConflict)
assert.Equal(t, test.want, doc)
})
}
Expand Down Expand Up @@ -282,7 +300,7 @@ func TestDocument_Serialize_Flat(t *testing.T) {
m := pcommon.NewMap()
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
doc.Dedup(true)
err := doc.Serialize(&buf, false, false)
require.NoError(t, err)

Expand Down Expand Up @@ -343,7 +361,7 @@ func TestDocument_Serialize_Dedot(t *testing.T) {
m := pcommon.NewMap()
assert.NoError(t, m.FromRaw(test.attrs))
doc := DocumentFromAttributes(m)
doc.Dedup()
doc.Dedup(true)
err := doc.Serialize(&buf, true, false)
require.NoError(t, err)

Expand Down
9 changes: 6 additions & 3 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)
carsonip marked this conversation as resolved.
Show resolved Hide resolved

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
Expand Down Expand Up @@ -267,7 +268,8 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
}

func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) {
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
Expand Down Expand Up @@ -646,7 +648,8 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st
default:
document = m.encodeSpanDefaultMode(resource, span, scope)
}
document.Dedup()
// For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false
document.Dedup(m.mode != MappingOTel)
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel)
return buf.Bytes(), err
Expand Down