From 1849cf883173b57b6dcb65734524ef2f82e66fa7 Mon Sep 17 00:00:00 2001 From: Irina Date: Fri, 22 Apr 2022 13:22:39 +0100 Subject: [PATCH] [exporter/] Update timestamp processing logic (#9369) * [exporter/] Update timestamp processing logic * [exporter/] Update timestamp processing logic. Handle zero observed timestamp case Co-authored-by: Daniel Jaglowski --- CHANGELOG.md | 2 +- .../azuremonitorexporter/log_to_envelope.go | 15 +++- .../azuremonitorexporter/logexporter_test.go | 70 +++++++++++++------ exporter/azuremonitorexporter/time_utils.go | 2 + exporter/lokiexporter/exporter.go | 16 ++++- exporter/lokiexporter/exporter_test.go | 43 ++++++++++++ exporter/lokiexporter/time.go | 19 +++++ exporter/observiqexporter/converter.go | 11 ++- exporter/observiqexporter/converter_test.go | 20 ++++++ internal/stanza/converter.go | 6 +- 10 files changed, 171 insertions(+), 33 deletions(-) create mode 100644 exporter/lokiexporter/time.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 03bf802274aa..b3887d49833f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,9 +21,9 @@ - `pkg/translator/prometheusremotewrite`: Allow to disable sanitize metric labels (#8270) - `basicauthextension`: Implement `configauth.ClientAuthenticator` so that the extension can also be used as HTTP client basic authenticator.(#8847) +- `azuremonitorexporter`, `lokiexporter`, `observiqexporter`: Update timestamp processing logic (#9130) - `cumulativetodeltaprocessor`: add new include/exclude configuration options with regex support (#8952) - `cmd/mdatagen`: Update generated functions to have simple parse function to handle string parsing consistently and limit code duplication across receivers (#7574) - - `attributesprocessor`: Support filter by severity (#9132) ### 🧰 Bug fixes 🧰 diff --git a/exporter/azuremonitorexporter/log_to_envelope.go b/exporter/azuremonitorexporter/log_to_envelope.go index f1742bd9e15c..07c87713d377 100644 --- a/exporter/azuremonitorexporter/log_to_envelope.go +++ b/exporter/azuremonitorexporter/log_to_envelope.go @@ -18,6 +18,7 @@ import ( "time" "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" ) @@ -43,7 +44,7 @@ type logPacker struct { func (packer *logPacker) LogRecordToEnvelope(logRecord plog.LogRecord) *contracts.Envelope { envelope := contracts.NewEnvelope() envelope.Tags = make(map[string]string) - envelope.Time = toTime(logRecord.Timestamp()).Format(time.RFC3339Nano) + envelope.Time = toTime(timestampFromLogRecord(logRecord)).Format(time.RFC3339Nano) data := contracts.NewData() @@ -95,3 +96,15 @@ func newLogPacker(logger *zap.Logger) *logPacker { } return packer } + +func timestampFromLogRecord(lr plog.LogRecord) pcommon.Timestamp { + if lr.Timestamp() != 0 { + return lr.Timestamp() + } + + if lr.ObservedTimestamp() != 0 { + return lr.ObservedTimestamp() + } + + return pcommon.NewTimestampFromTime(timeNow()) +} diff --git a/exporter/azuremonitorexporter/logexporter_test.go b/exporter/azuremonitorexporter/logexporter_test.go index deb9137035cd..7b4676b46d6c 100644 --- a/exporter/azuremonitorexporter/logexporter_test.go +++ b/exporter/azuremonitorexporter/logexporter_test.go @@ -36,35 +36,61 @@ const ( ) var ( - testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"dotnet"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"}]}]}]}`) + testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"dotnet"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"},{"timeUnixNano":"0","observedTimeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"},{"timeUnixNano":"0","observedTimeUnixNano":"0","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"}]}]}]}`) ) // Tests proper wrapping of a log record to an envelope func TestLogRecordToEnvelope(t *testing.T) { - logRecord := getTestLogRecord(t) - logPacker := getLogPacker() - envelope := logPacker.LogRecordToEnvelope(logRecord) + ts := time.Date(2021, 12, 11, 10, 9, 8, 1, time.UTC) + timeNow = func() time.Time { + return ts + } - require.NotNil(t, envelope) - assert.Equal(t, defaultEnvelopeName, envelope.Name) - assert.Equal(t, toTime(logRecord.Timestamp()).Format(time.RFC3339Nano), envelope.Time) - require.NotNil(t, envelope.Data) - envelopeData := envelope.Data.(*contracts.Data) - assert.Equal(t, defaultdBaseType, envelopeData.BaseType) + tests := []struct { + name string + logRecord plog.LogRecord + }{ + { + name: "timestamp is correct", + logRecord: getTestLogRecord(t, 0), + }, + { + name: "timestamp is empty", + logRecord: getTestLogRecord(t, 1), + }, + { + name: "timestamp is empty and observed timestamp is empty", + logRecord: getTestLogRecord(t, 2), + }, + } - require.NotNil(t, envelopeData.BaseData) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logRecord := tt.logRecord + logPacker := getLogPacker() + envelope := logPacker.LogRecordToEnvelope(logRecord) - messageData := envelopeData.BaseData.(*contracts.MessageData) - assert.Equal(t, messageData.Message, logRecord.Body().StringVal()) - assert.Equal(t, messageData.SeverityLevel, contracts.Information) + require.NotNil(t, envelope) + assert.Equal(t, defaultEnvelopeName, envelope.Name) + assert.Equal(t, toTime(timestampFromLogRecord(logRecord)).Format(time.RFC3339Nano), envelope.Time) + require.NotNil(t, envelope.Data) + envelopeData := envelope.Data.(*contracts.Data) + assert.Equal(t, defaultdBaseType, envelopeData.BaseType) - hexTraceID := logRecord.TraceID().HexString() - assert.Equal(t, messageData.Properties[traceIDTag], hexTraceID) - assert.Equal(t, envelope.Tags[contracts.OperationId], hexTraceID) + require.NotNil(t, envelopeData.BaseData) - assert.Equal(t, messageData.Properties[spanIDTag], logRecord.SpanID().HexString()) - assert.Equal(t, messageData.Properties[categoryNameTag], logRecord.Name()) + messageData := envelopeData.BaseData.(*contracts.MessageData) + assert.Equal(t, messageData.Message, logRecord.Body().StringVal()) + assert.Equal(t, messageData.SeverityLevel, contracts.Information) + hexTraceID := logRecord.TraceID().HexString() + assert.Equal(t, messageData.Properties[traceIDTag], hexTraceID) + assert.Equal(t, envelope.Tags[contracts.OperationId], hexTraceID) + + assert.Equal(t, messageData.Properties[spanIDTag], logRecord.SpanID().HexString()) + assert.Equal(t, messageData.Properties[categoryNameTag], logRecord.Name()) + }) + } } // Test conversion from logRecord.SeverityText() to contracts.SeverityLevel() @@ -85,7 +111,7 @@ func TestExporterLogDataCallback(t *testing.T) { assert.NoError(t, exporter.onLogData(context.Background(), logs)) - mockTransportChannel.AssertNumberOfCalls(t, "Send", 1) + mockTransportChannel.AssertNumberOfCalls(t, "Send", 3) } func getLogsExporter(config *Config, transportChannel transportChannel) *logExporter { @@ -107,13 +133,13 @@ func getTestLogs(tb testing.TB) plog.Logs { return logs } -func getTestLogRecord(tb testing.TB) plog.LogRecord { +func getTestLogRecord(tb testing.TB, index int) plog.LogRecord { var logRecord plog.LogRecord logs := getTestLogs(tb) resourceLogs := logs.ResourceLogs() scopeLogs := resourceLogs.At(0).ScopeLogs() logRecords := scopeLogs.At(0).LogRecords() - logRecord = logRecords.At(0) + logRecord = logRecords.At(index) return logRecord } diff --git a/exporter/azuremonitorexporter/time_utils.go b/exporter/azuremonitorexporter/time_utils.go index d2044af6bbb1..0940247c1b54 100644 --- a/exporter/azuremonitorexporter/time_utils.go +++ b/exporter/azuremonitorexporter/time_utils.go @@ -44,3 +44,5 @@ func formatDuration(d time.Duration) string { return fmt.Sprintf("%02d.%02d:%02d:%02d.%06d", day, h, m, s, us) } + +var timeNow = time.Now diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 349cde8bd1d4..a527ff442039 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -319,7 +319,7 @@ func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Reso b.WriteString(lr.Body().StringVal()) return &logproto.Entry{ - Timestamp: time.Unix(0, int64(lr.Timestamp())), + Timestamp: timestampFromLogRecord(lr), Line: b.String(), }, nil } @@ -330,7 +330,19 @@ func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Reso return nil, err } return &logproto.Entry{ - Timestamp: time.Unix(0, int64(lr.Timestamp())), + Timestamp: timestampFromLogRecord(lr), Line: line, }, nil } + +func timestampFromLogRecord(lr plog.LogRecord) time.Time { + if lr.Timestamp() != 0 { + return time.Unix(0, int64(lr.Timestamp())) + } + + if lr.ObservedTimestamp() != 0 { + return time.Unix(0, int64(lr.ObservedTimestamp())) + } + + return time.Unix(0, int64(pcommon.NewTimestampFromTime(timeNow()))) +} diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 8aa5933b261a..6a0ddd4f2c05 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -687,3 +687,46 @@ func TestConvertRecordAttributesToLabels(t *testing.T) { }) } } + +func TestExporter_timestampFromLogRecord(t *testing.T) { + ts := time.Date(2021, 12, 11, 10, 9, 8, 1, time.UTC) + timeNow = func() time.Time { + return ts + } + + tests := []struct { + name string + timestamp time.Time + observedTimestamp time.Time + expectedTimestamp time.Time + }{ + { + name: "timestamp is correct", + timestamp: timeNow(), + expectedTimestamp: timeNow(), + }, + { + name: "timestamp is empty", + observedTimestamp: timeNow(), + expectedTimestamp: timeNow(), + }, + { + name: "timestamp is empty and observed timestamp is empty", + expectedTimestamp: timeNow(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lr := plog.NewLogRecord() + if !tt.timestamp.IsZero() { + lr.SetTimestamp(pcommon.NewTimestampFromTime(tt.timestamp)) + } + if !tt.observedTimestamp.IsZero() { + lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(tt.observedTimestamp)) + } + + assert.Equal(t, time.Unix(0, int64(pcommon.NewTimestampFromTime(tt.expectedTimestamp))), timestampFromLogRecord(lr)) + }) + } +} diff --git a/exporter/lokiexporter/time.go b/exporter/lokiexporter/time.go new file mode 100644 index 000000000000..ebe9a08c9ef9 --- /dev/null +++ b/exporter/lokiexporter/time.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lokiexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter" + +import "time" + +var timeNow = time.Now diff --git a/exporter/observiqexporter/converter.go b/exporter/observiqexporter/converter.go index ab734bfa89e6..27dfed193eb6 100644 --- a/exporter/observiqexporter/converter.go +++ b/exporter/observiqexporter/converter.go @@ -125,10 +125,15 @@ func resourceAndInstrumentationLogToEntry(resMap map[string]interface{}, log plo } func timestampFromRecord(log plog.LogRecord) string { - if log.Timestamp() == 0 { - return timeNow().UTC().Format(timestampFieldOutputLayout) + if log.Timestamp() != 0 { + return log.Timestamp().AsTime().UTC().Format(timestampFieldOutputLayout) } - return log.Timestamp().AsTime().UTC().Format(timestampFieldOutputLayout) + + if log.ObservedTimestamp() != 0 { + return log.ObservedTimestamp().AsTime().UTC().Format(timestampFieldOutputLayout) + } + + return timeNow().UTC().Format(timestampFieldOutputLayout) } func messageFromRecord(log plog.LogRecord) string { diff --git a/exporter/observiqexporter/converter_test.go b/exporter/observiqexporter/converter_test.go index d7b029035570..868ce4d3f37b 100644 --- a/exporter/observiqexporter/converter_test.go +++ b/exporter/observiqexporter/converter_test.go @@ -297,6 +297,26 @@ func TestLogdataToObservIQFormat(t *testing.T) { }, { "No timestamp on record", + func() plog.LogRecord { + logRecord := plog.NewLogRecord() + logRecord.Body().SetStringVal("Message") + logRecord.SetObservedTimestamp(nanoTs) + return logRecord + }, + pcommon.NewResource, + "agent", + "agentID", + observIQLogEntry{ + Timestamp: stringTs, + Message: "Message", + Severity: "default", + Data: nil, + Agent: &observIQAgentInfo{Name: "agent", ID: "agentID", Version: "latest"}, + }, + false, + }, + { + "No timestamp and no observed timestamp on record", func() plog.LogRecord { logRecord := plog.NewLogRecord() logRecord.Body().SetStringVal("Message") diff --git a/internal/stanza/converter.go b/internal/stanza/converter.go index 521a4dffef98..817b66a21cda 100644 --- a/internal/stanza/converter.go +++ b/internal/stanza/converter.go @@ -333,12 +333,10 @@ func Convert(ent *entry.Entry) plog.Logs { // convertInto converts entry.Entry into provided plog.LogRecord. func convertInto(ent *entry.Entry, dest plog.LogRecord) { - t := ent.ObservedTimestamp if !ent.Timestamp.IsZero() { - t = ent.Timestamp + dest.SetTimestamp(pcommon.NewTimestampFromTime(ent.Timestamp)) } - dest.SetTimestamp(pcommon.NewTimestampFromTime(t)) - + dest.SetObservedTimestamp(pcommon.NewTimestampFromTime(ent.ObservedTimestamp)) dest.SetSeverityNumber(sevMap[ent.Severity]) dest.SetSeverityText(sevTextMap[ent.Severity])