Skip to content

Commit

Permalink
[exporter/elasticsearch] Add OTel mapping mode for metrics (#34248)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Add OTel mapping mode for metrics.

OTel mapping mode is a new mapping mode described in
https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter#elasticsearch-document-mapping

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

Added exporter test

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored Aug 19, 2024
1 parent 2d4fe3d commit eb2c463
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 35 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add OTel mapping mode for metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34248]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
13 changes: 7 additions & 6 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, document io.WriterTo) error
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand Down Expand Up @@ -108,8 +108,8 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo) error {
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document})
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
}

// End is a no-op.
Expand Down Expand Up @@ -243,10 +243,11 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
Index: index,
Body: document,
DynamicTemplates: dynamicTemplates,
}
select {
case <-ctx.Done():
Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down
9 changes: 5 additions & 4 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (e *elasticsearchExporter) pushLogRecord(
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document))
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand Down Expand Up @@ -215,7 +215,8 @@ func (e *elasticsearchExporter) pushMetricsData(
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
}

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil {
if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp, dpValue); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -290,7 +291,7 @@ func (e *elasticsearchExporter) pushMetricsData(
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil {
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -397,5 +398,5 @@ func (e *elasticsearchExporter) pushTraceRecord(
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document))
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}
75 changes: 74 additions & 1 deletion exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ func TestExporterMetrics(t *testing.T) {
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL)
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "ecs"
})
dp := pmetric.NewNumberDataPoint()
dp.SetDoubleValue(123.456)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
Expand All @@ -519,6 +521,7 @@ func TestExporterMetrics(t *testing.T) {

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.MetricsIndex = "metrics.index"
cfg.Mapping.Mode = "ecs"
})
metrics := newMetricsWithAttributeAndResourceMap(
map[string]string{
Expand Down Expand Up @@ -549,6 +552,7 @@ func TestExporterMetrics(t *testing.T) {

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.MetricsIndex = "metrics.index"
cfg.Mapping.Mode = "ecs"
})
metrics := newMetricsWithAttributeAndResourceMap(
map[string]string{
Expand Down Expand Up @@ -767,6 +771,75 @@ func TestExporterMetrics(t *testing.T) {
assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeA.Metrics()
fooMetric := metricSlice.AppendEmpty()
fooMetric.SetName("metric.foo")
fooDps := fooMetric.SetEmptyHistogram().DataPoints()
fooDp := fooDps.AppendEmpty()
fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})
fooOtherDp := fooDps.AppendEmpty()
fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0})
fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7})

sumMetric := metricSlice.AppendEmpty()
sumMetric.SetName("metric.sum")
sumDps := sumMetric.SetEmptySum().DataPoints()
sumDp := sumDps.AppendEmpty()
sumDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
sumDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0)))
sumDp.SetDoubleValue(1.5)

summaryMetric := metricSlice.AppendEmpty()
summaryMetric.SetName("metric.summary")
summaryDps := summaryMetric.SetEmptySummary().DataPoints()
summaryDp := summaryDps.AppendEmpty()
summaryDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0)))
summaryDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0)))
summaryDp.SetCount(1)
summaryDp.SetSum(1.5)

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(2)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("publish summary", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
32 changes: 29 additions & 3 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package objmodel // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"encoding/hex"
"io"
"maps"
"math"
"sort"
"strings"
Expand All @@ -48,7 +49,8 @@ import (
// Document is an intermediate representation for converting open telemetry records with arbitrary attributes
// into a JSON document that can be processed by Elasticsearch.
type Document struct {
fields []field
fields []field
dynamicTemplates map[string]string
}

type field struct {
Expand Down Expand Up @@ -81,6 +83,7 @@ const (
KindObject
KindTimestamp
KindIgnore
KindUnflattenableObject // Unflattenable object is an object that should not be flattened at serialization time
)

const tsLayout = "2006-01-02T15:04:05.000000000Z"
Expand All @@ -105,13 +108,24 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document {

fields := make([]field, 0, am.Len())
fields = appendAttributeFields(fields, path, am)
return Document{fields}
return Document{fields: fields}
}

func (doc *Document) Clone() *Document {
fields := make([]field, len(doc.fields))
copy(fields, doc.fields)
return &Document{fields}
return &Document{fields: fields, dynamicTemplates: maps.Clone(doc.dynamicTemplates)}
}

func (doc *Document) AddDynamicTemplate(path, template string) {
if doc.dynamicTemplates == nil {
doc.dynamicTemplates = make(map[string]string)
}
doc.dynamicTemplates[path] = template
}

func (doc *Document) DynamicTemplates() map[string]string {
return doc.dynamicTemplates
}

// AddTimestamp adds a raw timestamp value to the Document.
Expand Down Expand Up @@ -293,6 +307,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
// for current use cases and the proper fix will be slightly too complex. YAGNI.
var otelPrefixSet = map[string]struct{}{
"attributes.": {},
"metrics.": {},
}

func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
Expand Down Expand Up @@ -422,6 +437,12 @@ func TimestampValue(ts time.Time) Value {
return Value{kind: KindTimestamp, ts: ts}
}

// UnflattenableObjectValue creates a unflattenable object from a map
func UnflattenableObjectValue(m pcommon.Map) Value {
sub := DocumentFromAttributes(m)
return Value{kind: KindUnflattenableObject, doc: sub}
}

// ValueFromAttribute converts a AttributeValue into a value.
func ValueFromAttribute(attr pcommon.Value) Value {
switch attr.Type() {
Expand Down Expand Up @@ -506,6 +527,11 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot, otel)
case KindUnflattenableObject:
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, true, otel)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
Expand Down
Loading

0 comments on commit eb2c463

Please sign in to comment.