From 89bc3f626bae2822ecd4b2631983fae1ab984a6d Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Sat, 1 Jun 2024 08:48:46 -0500 Subject: [PATCH] [chore] Adds internal pdatautil.FlattenLogs (#33322) Followup to #33311. Adds the ability to flatten logs. --- internal/pdatautil/logs.go | 23 +++++ internal/pdatautil/logs_test.go | 153 ++++++++++++++++++++++++++++---- 2 files changed, 158 insertions(+), 18 deletions(-) diff --git a/internal/pdatautil/logs.go b/internal/pdatautil/logs.go index b27557d09e55..344bc6c2a8a2 100644 --- a/internal/pdatautil/logs.go +++ b/internal/pdatautil/logs.go @@ -10,6 +10,29 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) +// FlattenResourceLogs moves each LogRecord onto a dedicated ResourceLogs and ScopeLogs. +// Modifications are made in place. Order of LogRecords is preserved. +func FlattenLogs(rls plog.ResourceLogsSlice) { + tmp := plog.NewResourceLogsSlice() + rls.MoveAndAppendTo(tmp) + for i := 0; i < tmp.Len(); i++ { + groupedResource := tmp.At(i) + for j := 0; j < groupedResource.ScopeLogs().Len(); j++ { + groupedScope := groupedResource.ScopeLogs().At(j) + for k := 0; k < groupedScope.LogRecords().Len(); k++ { + flatResource := rls.AppendEmpty() + groupedResource.Resource().Attributes().CopyTo(flatResource.Resource().Attributes()) + flatScope := flatResource.ScopeLogs().AppendEmpty() + flatScope.SetSchemaUrl(groupedScope.SchemaUrl()) + flatScope.Scope().SetName(groupedScope.Scope().Name()) + flatScope.Scope().SetVersion(groupedScope.Scope().Version()) + groupedScope.Scope().Attributes().CopyTo(flatScope.Scope().Attributes()) + groupedScope.LogRecords().At(k).CopyTo(flatScope.LogRecords().AppendEmpty()) + } + } + } +} + // GroupByResourceLogs groups ScopeLogs by Resource. Modifications are made in place. func GroupByResourceLogs(rls plog.ResourceLogsSlice) { // Hash each ResourceLogs based on identifying information. diff --git a/internal/pdatautil/logs_test.go b/internal/pdatautil/logs_test.go index e45632bdafc1..48329fac7876 100644 --- a/internal/pdatautil/logs_test.go +++ b/internal/pdatautil/logs_test.go @@ -14,6 +14,121 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" ) +func TestFlattenResourceLogs(t *testing.T) { + testCases := []struct { + name string + input []resourceLogs + expected []resourceLogs + }{ + { + name: "empty", + input: []resourceLogs{}, + expected: []resourceLogs{}, + }, + { + name: "single", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 111), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 111), + ), + }, + }, + { + name: "flatten_single_scope_in_single_resource", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, newScopeLogs(11, 101)), + newResourceLogs(1, newScopeLogs(11, 102)), + newResourceLogs(1, newScopeLogs(11, 103)), + }, + }, + { + name: "flatten_multiple_scopes_in_single_resource", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, newScopeLogs(11, 101)), + newResourceLogs(1, newScopeLogs(11, 102)), + newResourceLogs(1, newScopeLogs(11, 103)), + newResourceLogs(1, newScopeLogs(22, 201)), + newResourceLogs(1, newScopeLogs(22, 202)), + newResourceLogs(1, newScopeLogs(22, 203)), + }, + }, + { + name: "flatten_single_scope_in_multiple_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + ), + newResourceLogs(2, + newScopeLogs(11, 104, 105, 106), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, newScopeLogs(11, 101)), + newResourceLogs(1, newScopeLogs(11, 102)), + newResourceLogs(1, newScopeLogs(11, 103)), + newResourceLogs(2, newScopeLogs(11, 104)), + newResourceLogs(2, newScopeLogs(11, 105)), + newResourceLogs(2, newScopeLogs(11, 106)), + }, + }, + { + name: "flatten_multiple_scopes_in_multiple_resources", + input: []resourceLogs{ + newResourceLogs(1, + newScopeLogs(11, 101, 102, 103), + newScopeLogs(22, 201, 202, 203), + ), + newResourceLogs(2, + newScopeLogs(11, 104, 105, 106), + newScopeLogs(22, 204, 205, 206), + ), + }, + expected: []resourceLogs{ + newResourceLogs(1, newScopeLogs(11, 101)), + newResourceLogs(1, newScopeLogs(11, 102)), + newResourceLogs(1, newScopeLogs(11, 103)), + newResourceLogs(1, newScopeLogs(22, 201)), + newResourceLogs(1, newScopeLogs(22, 202)), + newResourceLogs(1, newScopeLogs(22, 203)), + newResourceLogs(2, newScopeLogs(11, 104)), + newResourceLogs(2, newScopeLogs(11, 105)), + newResourceLogs(2, newScopeLogs(11, 106)), + newResourceLogs(2, newScopeLogs(22, 204)), + newResourceLogs(2, newScopeLogs(22, 205)), + newResourceLogs(2, newScopeLogs(22, 206)), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := setupResourceLogsSlice(tc.input) + expected := setupResourceLogsSlice(tc.expected) + FlattenLogs(actual) + assert.Equal(t, expected.Len(), actual.Len()) + for i := 0; i < expected.Len(); i++ { + assert.NoError(t, plogtest.CompareResourceLogs(expected.At(i), actual.At(i))) + } + }) + } +} + func TestGroupByResourceLogs(t *testing.T) { testCases := []struct { name string @@ -296,15 +411,8 @@ func TestGroupByResourceLogs(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual := plog.NewResourceLogsSlice() - for _, r := range tc.input { - r.setup(actual.AppendEmpty()) - } - expected := plog.NewResourceLogsSlice() - for _, r := range tc.expected { - r.setup(expected.AppendEmpty()) - } - + actual := setupResourceLogsSlice(tc.input) + expected := setupResourceLogsSlice(tc.expected) GroupByResourceLogs(actual) assert.Equal(t, expected.Len(), actual.Len()) for i := 0; i < expected.Len(); i++ { @@ -372,15 +480,8 @@ func TestGroupByScopeLogs(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual := plog.NewScopeLogsSlice() - for _, s := range tc.input { - s.setup(actual.AppendEmpty()) - } - expected := plog.NewScopeLogsSlice() - for _, s := range tc.expected { - s.setup(expected.AppendEmpty()) - } - + actual := setupScopeLogsSlice(tc.input) + expected := setupScopeLogsSlice(tc.expected) GroupByScopeLogs(actual) assert.Equal(t, expected.Len(), actual.Len()) for i := 0; i < expected.Len(); i++ { @@ -449,6 +550,14 @@ func (r resourceLogs) setup(rl plog.ResourceLogs) { } } +func setupResourceLogsSlice(trls []resourceLogs) plog.ResourceLogsSlice { + rls := plog.NewResourceLogsSlice() + for _, trl := range trls { + trl.setup(rls.AppendEmpty()) + } + return rls +} + type scopeLogs struct { num int recordNums []int @@ -473,3 +582,11 @@ func (s scopeLogs) setup(sl plog.ScopeLogs) { lr.Body().SetInt(int64(n)) } } + +func setupScopeLogsSlice(tsls []scopeLogs) plog.ScopeLogsSlice { + sls := plog.NewScopeLogsSlice() + for _, tsl := range tsls { + tsl.setup(sls.AppendEmpty()) + } + return sls +}