Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] add missing scope info in span/log attributes #27288

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c9c714d
add scope
JaredTan95 Oct 1, 2023
141ddad
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 7, 2023
675e3c0
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 11, 2023
d72f20f
[exporter/parquet] remove Parquet exporter (#27285)
atoulme Oct 11, 2023
9688268
Add an optional argument to converters to support hashing (#27235)
rnishtala-sumo Oct 11, 2023
407900a
feat(syslogreceiver): validate protocol name (#27581)
sumo-drosiek Oct 11, 2023
064af9b
[chore] [coralogixexporter] Docs: Update docs to reflect new informat…
povilasv Oct 11, 2023
8fc10ab
[processor/tailsampling] [chore][docs]: Fix typo in processor's READM…
rpriyanshu9 Oct 11, 2023
23ed615
[receiver/podman] rename struct and function to keep expected receive…
sakulali Oct 11, 2023
9c0ddd4
[exporter/awscloudwatchlogsexporter] Improve performance of the awscl…
rapphil Oct 11, 2023
f28934a
[exporter/prometheusremotewrite] Make maxBatchByteSize configurable (…
yotamloe Oct 11, 2023
bf5ce0d
[chore] [exporter/awscloudwatchlogs] Remove duplicate consumer check …
bryan-aguilar Oct 11, 2023
73d3955
[chore][pkg/stanza] Unexport reader's file name field (#27434)
djaglowski Oct 11, 2023
c539979
[chore] update link to stack overflow (#27626)
Oct 11, 2023
1682365
[chore] bump go versions (#27631)
Oct 11, 2023
8276cc1
[k8sclusterreceiver] remove opencensus.resourcetype resource attribut…
povilasv Oct 11, 2023
179963c
[chore][receiver/k8sobjects] add tests for k8sobjectsreceiver (#27569)
jinja2 Oct 11, 2023
0eecc7d
[chore] Remove stale go mod replace for datadog-agent (#27622)
songy23 Oct 11, 2023
0981aa3
[chore] [exporter/signalfx] Remove redundant translation (#27634)
dmitryax Oct 11, 2023
049dadf
[chore] Fix typo in README (#27639)
crobert-1 Oct 12, 2023
99f3758
[Feature][exporter/azuredataexplorerexporter] Exporter helper support…
asaharn Oct 12, 2023
c2a5972
[k8sclusterreceiver] add k8s.pod.qos_class optional resource attribut…
povilasv Oct 12, 2023
687b4bf
feat: add honeycomb marker exporter structure (#27001)
fchikwekwe Oct 12, 2023
b9b3261
[chore] [receiver/mysqlreceiver] Update readme (#27656)
kuiperda Oct 12, 2023
a4f312d
[receiver/dockerstats] Disable deprecated cpu metric (#24183)
carlossscastro Oct 13, 2023
f146086
[processor/redactionprocessor] fix mask when multiple patterns exist …
ArchangelSDY Oct 13, 2023
3d04480
[chore] Bump grpc-go to fix CVE-2023-44487 (#27664)
pavolloffay Oct 13, 2023
ca28b62
[chore][pkg/stanza] Add module README (#27657)
djaglowski Oct 13, 2023
3ee13b6
[extension/storage/filestorage] Add bbolt FSync as a config option (#…
natbur Oct 13, 2023
9c9523d
[pkg/stanza] Cache event publishers: log warn once per provider (#27658)
pjanotti Oct 13, 2023
dad51d5
Make pjanotti owner of windowseventlogreceiver (#27667)
pjanotti Oct 13, 2023
e8113c2
[exporter/prometheusremotewrite] Fix exporter not respecting retrySet…
bryan-aguilar Oct 13, 2023
bb36f05
[processor/resourcedetection] fix parsing error for windows (#27678)
VihasMakwana Oct 16, 2023
c38a394
[chore]: Expand e2e testbed (#27251)
VihasMakwana Oct 16, 2023
fc2a3b3
feat: add integration test for syslog exporter (and receiver) (#27464)
sumo-drosiek Oct 16, 2023
09aa3f2
[receiver/file] Fix the stated supported telemetry types (#27684)
crobert-1 Oct 16, 2023
9cad35e
[chore] throw error if GITHUB_TOKEN env is not set (#27677)
VihasMakwana Oct 16, 2023
9b9e7b6
[chore] clean up koanf 1.5 dep (#27670)
Oct 16, 2023
a3df71d
Splunkent client refactor (#27205)
shalper2 Oct 16, 2023
10bba48
[exporter/signalfx] [exporter/splunkhec] Remove max_connections confi…
atoulme Oct 16, 2023
edd5d3a
[exporter/signalfx] expose build version in user agent string (#27612)
atoulme Oct 16, 2023
15bc860
[chore] Fix some wording in CONTRIBUTING.md (#27771)
crobert-1 Oct 16, 2023
04447dc
[chore][exporter/splunkhec] reuse metric buffer (#27776)
atoulme Oct 16, 2023
07985dd
[chore] dependabot updates Mon Oct 16 18:31:00 UTC 2023 (#27725)
opentelemetrybot Oct 16, 2023
11cd63b
[chore] dependabot updates Tue Oct 17 00:15:58 UTC 2023 (#27790)
opentelemetrybot Oct 17, 2023
f8ef31d
[exporter/datasetexporter]: Fix NPE exception when attribute contains…
martin-majlis-s1 Oct 17, 2023
2a0e096
[pkg/stanza] Fix issue where batching caused incorrect starting point…
djaglowski Oct 17, 2023
e609f9b
Add async config block & concurrent readers to UDP input operator (#2…
hovavza Oct 17, 2023
b9268b7
[k8sclusterreceiver] change resourcequota and clusterquota units (#27…
povilasv Oct 17, 2023
8560679
[chore] update otel-core dependency (#27772)
dmitryax Oct 17, 2023
745c9c6
[cmd/otelcontribcol] Simplify exporters lifecycle tests (#27816)
dmitryax Oct 17, 2023
de81b9b
[pkg/pdatatest] Ignore span timestamps (#27798)
sakulali Oct 17, 2023
936f6a6
[chore] [cmd/otelcontribcol] Simplify and fix receivers lifecycle tes…
dmitryax Oct 17, 2023
edc3d1b
[cmd/otelcontribcol] Simplify processors lifecycle tests (#27817)
dmitryax Oct 17, 2023
dc0c0cf
[chore][pkg/stanza] Use unscoped mock persister where possible (#27809)
djaglowski Oct 17, 2023
e205e9e
[chore] dependabot updates Tue Oct 17 21:10:42 UTC 2023 (#27819)
opentelemetrybot Oct 17, 2023
b3a3d11
[chore][pkg/stanza] Extract checkpointing logic into internal package…
djaglowski Oct 18, 2023
5d19a05
[receiver/kubeletstats] Add RBAC docs (#27655)
TylerHelmuth Oct 18, 2023
d42c158
add scope in log record.
JaredTan95 Oct 18, 2023
93ee1c7
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 18, 2023
511c0f0
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 19, 2023
19b3231
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 20, 2023
af9c9c6
Merge branch 'main' into elasticsearch_exporter_add_scope
JaredTan95 Oct 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add missing scope info in span attributes"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27282]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/otelcontribcol/exporters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestDefaultExporters(t *testing.T) {
cfg := expFactories["awscloudwatchlogs"].CreateDefaultConfig().(*awscloudwatchlogsexporter.Config)
cfg.Endpoint = "http://" + endpoint
cfg.Region = "local"

// disable queue/retry to validate passing the test data synchronously
cfg.QueueSettings.Enabled = false
cfg.RetrySettings.Enabled = false
Expand Down
7 changes: 4 additions & 3 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo
resource := rl.Resource()
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
scope := ills.At(j).Scope()
logs := ills.At(j).LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -99,7 +100,7 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo
return errors.Join(errs...)
}

func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord) error {
func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromBothResourceAndAttribute(indexPrefix, resource, record)
Expand All @@ -108,7 +109,7 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

document, err := e.model.encodeLog(resource, record)
document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,12 @@ func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string
// send trace with span & resource attributes
func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string) {
logs := newLogsWithAttributeAndResourceMap(attrMp, resMp)
resSpans := logs.ResourceLogs().At(0)
logRecords := resSpans.ScopeLogs().At(0).LogRecords().At(0)
resLogs := logs.ResourceLogs().At(0)
logRecords := resLogs.ScopeLogs().At(0).LogRecords().At(0)

err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords)
scopeLogs := resLogs.ScopeLogs().AppendEmpty()
scope := scopeLogs.Scope()

err := exporter.pushLogRecord(context.TODO(), resLogs.Resource(), logRecords, scope)
require.NoError(t, err)
}
20 changes: 16 additions & 4 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span) ([]byte, error)
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
}

// encodeModel tries to keep the event as close to the original open telemetry semantics as is.
Expand All @@ -38,7 +38,7 @@ const (
attributeField = "attribute"
)

func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord) ([]byte, error) {
func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTraceID("TraceId", record.TraceID())
Expand All @@ -49,6 +49,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
document.AddAttribute("Body", record.Body())
document.AddAttributes("Attributes", record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
Expand All @@ -61,7 +62,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
return buf.Bytes(), err
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([]byte, error) {
func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
Expand All @@ -76,6 +77,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([
document.AddAttributes("Resource", resource.Attributes())
document.AddEvents("Events", span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
Expand Down Expand Up @@ -107,3 +109,13 @@ func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
func durationAsMicroseconds(start, end time.Time) int64 {
return (end.UnixNano() - start.UnixNano()) / 1000
}

func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("name", scope.Name())
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
attrs.PutStr("version", scope.Version())
for k, v := range scope.Attributes().AsRaw() {
attrs.PutStr(k, v.(string))
}
return attrs
}
8 changes: 6 additions & 2 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
)

var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}`
var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}`

func TestEncodeSpan(t *testing.T) {
model := &encodeModel{dedup: true, dedot: false}
td := mockResourceSpans()
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0))
spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope())
assert.NoError(t, err)
assert.Equal(t, expectedSpanBody, string(spanByte))
}
Expand All @@ -40,6 +40,10 @@ func mockResourceSpans() ptrace.Traces {
tEnd := time.Date(2023, 4, 19, 3, 4, 6, 6, time.UTC)

scopeSpans := resourceSpans.ScopeSpans().AppendEmpty()
scopeSpans.Scope().SetName("io.opentelemetry.rabbitmq-2.7")
scopeSpans.Scope().SetVersion("1.30.0-alpha")
scopeSpans.Scope().Attributes().PutStr("lib-foo", "lib-bar")

span := scopeSpans.Spans().AppendEmpty()
span.SetName("client span")
span.SetSpanID([8]byte{0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26})
Expand Down
7 changes: 4 additions & 3 deletions exporter/elasticsearchexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (e *elasticsearchTracesExporter) pushTraceData(
resource := il.Resource()
scopeSpans := il.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
scope := scopeSpans.At(j).Scope()
spans := scopeSpans.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
if err := e.pushTraceRecord(ctx, resource, spans.At(k)); err != nil {
if err := e.pushTraceRecord(ctx, resource, spans.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -91,7 +92,7 @@ func (e *elasticsearchTracesExporter) pushTraceData(
return errors.Join(errs...)
}

func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span) error {
func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromBothResourceAndAttribute(indexPrefix, resource, span)
Expand All @@ -100,7 +101,7 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

document, err := e.model.encodeSpan(resource, span)
document, err := e.model.encodeSpan(resource, span, scope)
if err != nil {
return fmt.Errorf("Failed to encode trace record: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/traces_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExp
traces := newTracesWithAttributeAndResourceMap(attrMp, resMp)
resSpans := traces.ResourceSpans().At(0)
span := resSpans.ScopeSpans().At(0).Spans().At(0)
scope := resSpans.ScopeSpans().At(0).Scope()

err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span)
err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope)
require.NoError(t, err)
}