From 7b53ce014b3d430e32631fa5e71ef0a10dabc0c4 Mon Sep 17 00:00:00 2001 From: Joshua Kaplan Date: Fri, 30 Jun 2023 11:07:48 -0400 Subject: [PATCH 1/4] elasticsearchexporter: handle ecs mode mapping Co-authored-by: Andrey Kaipov --- exporter/elasticsearchexporter/README.md | 4 +- exporter/elasticsearchexporter/attribute.go | 15 +++- exporter/elasticsearchexporter/config_test.go | 6 +- exporter/elasticsearchexporter/factory.go | 2 +- .../elasticsearchexporter/logs_exporter.go | 15 ++-- .../logs_exporter_test.go | 49 +++++++++++-- exporter/elasticsearchexporter/model.go | 69 ++++++++++++++++--- .../elasticsearchexporter/trace_exporter.go | 12 ++-- 8 files changed, 136 insertions(+), 36 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 1a5d2e0f02c8..b035dfbc5bb9 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -32,7 +32,7 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www [index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html) or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): +- `logs_dynamic_index` (optional): takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) - `enabled`(default=false): Enable/Disable dynamic index for log records @@ -57,7 +57,7 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `mapping`: Events are encoded to JSON. The `mapping` allows users to configure additional mapping rules. - - `mode` (default=ecs): The fields naming mode. valid modes are: + - `mode` (default=none): The fields naming mode. valid modes are: - `none`: Use original fields and event structure from the OTLP event. - `ecs`: Try to map fields defined in the [OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 115cd1930f03..c2c187899b77 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -18,12 +18,21 @@ type attrGetter interface { Attributes() pcommon.Map } -// retrieve attribute out of resource and record (span or log, if not found in resource) -func getFromBothResourceAndAttribute(name string, resource attrGetter, record attrGetter) string { +// retrieve attribute out of resource, scope, and record (span or log, if not found in resource) +func getFromAttributes(name string, resource, scope, record attrGetter) string { var str string val, exist := resource.Attributes().Get(name) if !exist { - val, exist = record.Attributes().Get(name) + val, exist = scope.Attributes().Get(name) + if !exist { + val, exist = record.Attributes().Get(name) + if exist { + str = val.AsString() + } + } + if exist { + str = val.AsString() + } } if exist { str = val.AsString() diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index bfe9e638e8e6..31f4dbdaa1a7 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -63,7 +63,7 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, @@ -125,7 +125,7 @@ func TestLoadConfig(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, @@ -170,7 +170,7 @@ func TestLoadConfig(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 1f951700d51b..f638580a749f 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -51,7 +51,7 @@ func createDefaultConfig() component.Config { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index 7511988bc2ba..e37d3cb0a079 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -51,7 +51,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte maxAttempts = cfg.Retry.MaxRequests } - model := &encodeModel{dedup: cfg.Mapping.Dedup, dedot: cfg.Mapping.Dedot} + model := &encodeModel{ + mode: cfg.Mapping.Mode, + dedup: cfg.Mapping.Dedup, + dedot: cfg.Mapping.Dedot, + } indexStr := cfg.LogsIndex if cfg.Index != "" { @@ -83,8 +87,9 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo resource := rl.Resource() ills := rl.ScopeLogs() for j := 0; j < ills.Len(); j++ { - scope := ills.At(j).Scope() - logs := ills.At(j).LogRecords() + ill := ills.At(j) + scope := ill.Scope() + logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { if cerr := ctx.Err(); cerr != nil { @@ -103,8 +108,8 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromBothResourceAndAttribute(indexPrefix, resource, record) - suffix := getFromBothResourceAndAttribute(indexSuffix, resource, record) + prefix := getFromAttributes(indexPrefix, resource, scope, record) + suffix := getFromAttributes(indexSuffix, resource, scope, record) fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 48fa8a06cc0b..73846347adae 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -152,6 +152,7 @@ func TestExporter_PushEvent(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10178") } + t.Run("publish with success", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -166,6 +167,40 @@ func TestExporter_PushEvent(t *testing.T) { rec.WaitItems(2) }) + t.Run("publish with ecs encoding", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","application":"myapp","attrKey1":"abc","attrKey2":"def","error":{"stack_trace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}` + actual := fmt.Sprintf("%s", docs[0].Document) + assert.Equal(t, expected, actual) + + return itemsAllOK(docs) + }) + + testConfig := withTestExporterConfig(func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + })(server.URL) + exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) + mustSendLogsWithAttributes(t, exporter, + // resource attrs + map[string]string{ + "application": "myapp", + "service.name": "myservice", + }, + // record attrs + map[string]string{ + "attrKey1": "abc", + "attrKey2": "def", + "exception.stacktrace": "no no no no", + }, + // record body + "hello world", + ) + rec.WaitItems(1) + }) + t.Run("publish with dynamic index", func(t *testing.T) { rec := newBulkRecorder() @@ -205,6 +240,7 @@ func TestExporter_PushEvent(t *testing.T) { map[string]string{ indexPrefix: prefix, }, + "hello world", ) rec.WaitItems(1) @@ -401,14 +437,13 @@ func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string } // send trace with span & resource attributes -func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string) { +func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string, body string) { logs := newLogsWithAttributeAndResourceMap(attrMp, resMp) - resLogs := logs.ResourceLogs().At(0) - logRecords := resLogs.ScopeLogs().At(0).LogRecords().At(0) - - scopeLogs := resLogs.ScopeLogs().AppendEmpty() - scope := scopeLogs.Scope() + resSpans := logs.ResourceLogs().At(0) + scopeLog := resSpans.ScopeLogs().At(0) + logRecords := scopeLog.LogRecords().At(0) + logRecords.Body().SetStr(body) - err := exporter.pushLogRecord(context.TODO(), resLogs.Resource(), logRecords, scope) + err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords, scopeLog.Scope()) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 74737b7d1b32..c014db4abadc 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -28,6 +28,7 @@ type mappingModel interface { // // See: https://github.com/open-telemetry/oteps/blob/master/text/logs/0097-log-data-model.md type encodeModel struct { + mode string dedup bool dedot bool } @@ -40,16 +41,64 @@ const ( func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document - document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. - document.AddTraceID("TraceId", record.TraceID()) - document.AddSpanID("SpanId", record.SpanID()) - document.AddInt("TraceFlags", int64(record.Flags())) - document.AddString("SeverityText", record.SeverityText()) - document.AddInt("SeverityNumber", int64(record.SeverityNumber())) - document.AddAttribute("Body", record.Body()) - document.AddAttributes("Attributes", record.Attributes()) - document.AddAttributes("Resource", resource.Attributes()) - document.AddAttributes("Scope", scopeToAttributes(scope)) + + switch m.mode { + case "ecs": + document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. + document.AddTraceID("trace.id", record.TraceID()) + document.AddSpanID("span.id", record.SpanID()) + + if f := record.Flags(); f != 0 { + document.AddInt("trace.flags", int64(record.Flags())) + } + + if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified { + document.AddInt("log.syslog.severity.code", int64(record.SeverityNumber())) + document.AddString("log.syslog.severity.name", record.SeverityText()) + } + + document.AddAttribute("message", record.Body()) + + fieldMapper := func(k string) string { + switch k { + case "exception.type": + return "error.type" + case "exception.message": + return "error.message" + case "exception.stacktrace": + return "error.stack_trace" + default: + return k + } + } + + resource.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + scope.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + record.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + default: + document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. + document.AddTraceID("TraceId", record.TraceID()) + document.AddSpanID("SpanId", record.SpanID()) + document.AddInt("TraceFlags", int64(record.Flags())) + document.AddString("SeverityText", record.SeverityText()) + document.AddInt("SeverityNumber", int64(record.SeverityNumber())) + document.AddAttribute("Body", record.Body()) + document.AddAttributes("Attributes", record.Attributes()) + document.AddAttributes("Resource", resource.Attributes()) + document.AddAttributes("Scope", scopeToAttributes(scope)) + } if m.dedup { document.Dedup() diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index ef421951ed4c..bdfb6ff4ebcc 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -76,10 +76,12 @@ func (e *elasticsearchTracesExporter) pushTraceData( resource := il.Resource() scopeSpans := il.ScopeSpans() for j := 0; j < scopeSpans.Len(); j++ { - scope := scopeSpans.At(j).Scope() - spans := scopeSpans.At(j).Spans() + scopeSpan := scopeSpans.At(j) + scope := scopeSpan.Scope() + spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { - if err := e.pushTraceRecord(ctx, resource, spans.At(k), scope); err != nil { + span := spans.At(k) + if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -95,8 +97,8 @@ func (e *elasticsearchTracesExporter) pushTraceData( func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromBothResourceAndAttribute(indexPrefix, resource, span) - suffix := getFromBothResourceAndAttribute(indexSuffix, resource, span) + prefix := getFromAttributes(indexPrefix, resource, scope, span) + suffix := getFromAttributes(indexSuffix, resource, scope, span) fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } From 8c03dba531438d5e8d6bc646e662d34e8f6f1338 Mon Sep 17 00:00:00 2001 From: "Joshua.Kaplan" Date: Thu, 11 Jan 2024 14:35:11 -0700 Subject: [PATCH 2/4] Add changelogs --- .chloggen/feat_ecs-format.yaml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/feat_ecs-format.yaml diff --git a/.chloggen/feat_ecs-format.yaml b/.chloggen/feat_ecs-format.yaml new file mode 100644 index 000000000000..c6ac57feefa0 --- /dev/null +++ b/.chloggen/feat_ecs-format.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Initial pass in implementing the `ecs` mapping mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29742] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From b025b4d4a0351cccbf001b6648dabd67118a3deb Mon Sep 17 00:00:00 2001 From: "Joshua.Kaplan" Date: Thu, 11 Jan 2024 14:39:44 -0700 Subject: [PATCH 3/4] Fix test and lint error --- exporter/elasticsearchexporter/logs_exporter_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 73846347adae..87eac68fa391 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -121,7 +121,7 @@ func TestExporter_New(t *testing.T) { cfg.Mapping.Dedot = false cfg.Mapping.Dedup = true }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true}), + want: successWithInternalModel(&encodeModel{mode: "none", dedot: false, dedup: true}), }, } @@ -173,7 +173,7 @@ func TestExporter_PushEvent(t *testing.T) { rec.Record(docs) expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","application":"myapp","attrKey1":"abc","attrKey2":"def","error":{"stack_trace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}` - actual := fmt.Sprintf("%s", docs[0].Document) + actual := string(docs[0].Document) assert.Equal(t, expected, actual) return itemsAllOK(docs) From eec68c04ed322bc0ce4e24c6a7fb5a44779153d3 Mon Sep 17 00:00:00 2001 From: "Joshua.Kaplan" Date: Thu, 11 Jan 2024 14:43:15 -0700 Subject: [PATCH 4/4] Fix test --- exporter/elasticsearchexporter/logs_exporter_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 959ccc524a2b..9d6c70a8b12e 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -273,7 +273,7 @@ func TestExporter_PushEvent(t *testing.T) { defaultCfg = *cfg }) - mustSendLogsWithAttributes(t, exporter, nil, nil) + mustSendLogsWithAttributes(t, exporter, nil, nil, "") rec.WaitItems(1) }) @@ -317,6 +317,7 @@ func TestExporter_PushEvent(t *testing.T) { map[string]string{ indexPrefix: prefix, }, + "", ) rec.WaitItems(1) })