Skip to content

Commit

Permalink
[exporter/<logs>] Update timestamp processing logic (open-telemetry#9369
Browse files Browse the repository at this point in the history
)

* [exporter/<logs>] Update timestamp processing logic

* [exporter/<logs>] Update timestamp processing logic. Handle zero observed timestamp case

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
mar4uk and djaglowski committed May 10, 2022
1 parent 23d412b commit 1849cf8
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 33 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

- `pkg/translator/prometheusremotewrite`: Allow to disable sanitize metric labels (#8270)
- `basicauthextension`: Implement `configauth.ClientAuthenticator` so that the extension can also be used as HTTP client basic authenticator.(#8847)
- `azuremonitorexporter`, `lokiexporter`, `observiqexporter`: Update timestamp processing logic (#9130)
- `cumulativetodeltaprocessor`: add new include/exclude configuration options with regex support (#8952)
- `cmd/mdatagen`: Update generated functions to have simple parse function to handle string parsing consistently and limit code duplication across receivers (#7574)

- `attributesprocessor`: Support filter by severity (#9132)

### 🧰 Bug fixes 🧰
Expand Down
15 changes: 14 additions & 1 deletion exporter/azuremonitorexporter/log_to_envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)
Expand All @@ -43,7 +44,7 @@ type logPacker struct {
func (packer *logPacker) LogRecordToEnvelope(logRecord plog.LogRecord) *contracts.Envelope {
envelope := contracts.NewEnvelope()
envelope.Tags = make(map[string]string)
envelope.Time = toTime(logRecord.Timestamp()).Format(time.RFC3339Nano)
envelope.Time = toTime(timestampFromLogRecord(logRecord)).Format(time.RFC3339Nano)

data := contracts.NewData()

Expand Down Expand Up @@ -95,3 +96,15 @@ func newLogPacker(logger *zap.Logger) *logPacker {
}
return packer
}

func timestampFromLogRecord(lr plog.LogRecord) pcommon.Timestamp {
if lr.Timestamp() != 0 {
return lr.Timestamp()
}

if lr.ObservedTimestamp() != 0 {
return lr.ObservedTimestamp()
}

return pcommon.NewTimestampFromTime(timeNow())
}
70 changes: 48 additions & 22 deletions exporter/azuremonitorexporter/logexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,61 @@ const (
)

var (
testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"dotnet"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"}]}]}]}`)
testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"dotnet"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"},{"timeUnixNano":"0","observedTimeUnixNano":"1643240673066096200","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"},{"timeUnixNano":"0","observedTimeUnixNano":"0","severityText":"Information","name":"FilterModule.Program","body":{"stringValue":"Message Body"},"flags":1,"traceId":"7b20d1349ef9b6d6f9d4d1d4a3ac2e82","spanId":"0c2ad924e1771630"}]}]}]}`)
)

// Tests proper wrapping of a log record to an envelope
func TestLogRecordToEnvelope(t *testing.T) {
logRecord := getTestLogRecord(t)
logPacker := getLogPacker()
envelope := logPacker.LogRecordToEnvelope(logRecord)
ts := time.Date(2021, 12, 11, 10, 9, 8, 1, time.UTC)
timeNow = func() time.Time {
return ts
}

require.NotNil(t, envelope)
assert.Equal(t, defaultEnvelopeName, envelope.Name)
assert.Equal(t, toTime(logRecord.Timestamp()).Format(time.RFC3339Nano), envelope.Time)
require.NotNil(t, envelope.Data)
envelopeData := envelope.Data.(*contracts.Data)
assert.Equal(t, defaultdBaseType, envelopeData.BaseType)
tests := []struct {
name string
logRecord plog.LogRecord
}{
{
name: "timestamp is correct",
logRecord: getTestLogRecord(t, 0),
},
{
name: "timestamp is empty",
logRecord: getTestLogRecord(t, 1),
},
{
name: "timestamp is empty and observed timestamp is empty",
logRecord: getTestLogRecord(t, 2),
},
}

require.NotNil(t, envelopeData.BaseData)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logRecord := tt.logRecord
logPacker := getLogPacker()
envelope := logPacker.LogRecordToEnvelope(logRecord)

messageData := envelopeData.BaseData.(*contracts.MessageData)
assert.Equal(t, messageData.Message, logRecord.Body().StringVal())
assert.Equal(t, messageData.SeverityLevel, contracts.Information)
require.NotNil(t, envelope)
assert.Equal(t, defaultEnvelopeName, envelope.Name)
assert.Equal(t, toTime(timestampFromLogRecord(logRecord)).Format(time.RFC3339Nano), envelope.Time)
require.NotNil(t, envelope.Data)
envelopeData := envelope.Data.(*contracts.Data)
assert.Equal(t, defaultdBaseType, envelopeData.BaseType)

hexTraceID := logRecord.TraceID().HexString()
assert.Equal(t, messageData.Properties[traceIDTag], hexTraceID)
assert.Equal(t, envelope.Tags[contracts.OperationId], hexTraceID)
require.NotNil(t, envelopeData.BaseData)

assert.Equal(t, messageData.Properties[spanIDTag], logRecord.SpanID().HexString())
assert.Equal(t, messageData.Properties[categoryNameTag], logRecord.Name())
messageData := envelopeData.BaseData.(*contracts.MessageData)
assert.Equal(t, messageData.Message, logRecord.Body().StringVal())
assert.Equal(t, messageData.SeverityLevel, contracts.Information)

hexTraceID := logRecord.TraceID().HexString()
assert.Equal(t, messageData.Properties[traceIDTag], hexTraceID)
assert.Equal(t, envelope.Tags[contracts.OperationId], hexTraceID)

assert.Equal(t, messageData.Properties[spanIDTag], logRecord.SpanID().HexString())
assert.Equal(t, messageData.Properties[categoryNameTag], logRecord.Name())
})
}
}

// Test conversion from logRecord.SeverityText() to contracts.SeverityLevel()
Expand All @@ -85,7 +111,7 @@ func TestExporterLogDataCallback(t *testing.T) {

assert.NoError(t, exporter.onLogData(context.Background(), logs))

mockTransportChannel.AssertNumberOfCalls(t, "Send", 1)
mockTransportChannel.AssertNumberOfCalls(t, "Send", 3)
}

func getLogsExporter(config *Config, transportChannel transportChannel) *logExporter {
Expand All @@ -107,13 +133,13 @@ func getTestLogs(tb testing.TB) plog.Logs {
return logs
}

func getTestLogRecord(tb testing.TB) plog.LogRecord {
func getTestLogRecord(tb testing.TB, index int) plog.LogRecord {
var logRecord plog.LogRecord
logs := getTestLogs(tb)
resourceLogs := logs.ResourceLogs()
scopeLogs := resourceLogs.At(0).ScopeLogs()
logRecords := scopeLogs.At(0).LogRecords()
logRecord = logRecords.At(0)
logRecord = logRecords.At(index)

return logRecord
}
2 changes: 2 additions & 0 deletions exporter/azuremonitorexporter/time_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ func formatDuration(d time.Duration) string {

return fmt.Sprintf("%02d.%02d:%02d:%02d.%06d", day, h, m, s, us)
}

var timeNow = time.Now
16 changes: 14 additions & 2 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Reso
b.WriteString(lr.Body().StringVal())

return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Timestamp: timestampFromLogRecord(lr),
Line: b.String(),
}, nil
}
Expand All @@ -330,7 +330,19 @@ func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Reso
return nil, err
}
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Timestamp: timestampFromLogRecord(lr),
Line: line,
}, nil
}

func timestampFromLogRecord(lr plog.LogRecord) time.Time {
if lr.Timestamp() != 0 {
return time.Unix(0, int64(lr.Timestamp()))
}

if lr.ObservedTimestamp() != 0 {
return time.Unix(0, int64(lr.ObservedTimestamp()))
}

return time.Unix(0, int64(pcommon.NewTimestampFromTime(timeNow())))
}
43 changes: 43 additions & 0 deletions exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,46 @@ func TestConvertRecordAttributesToLabels(t *testing.T) {
})
}
}

func TestExporter_timestampFromLogRecord(t *testing.T) {
ts := time.Date(2021, 12, 11, 10, 9, 8, 1, time.UTC)
timeNow = func() time.Time {
return ts
}

tests := []struct {
name string
timestamp time.Time
observedTimestamp time.Time
expectedTimestamp time.Time
}{
{
name: "timestamp is correct",
timestamp: timeNow(),
expectedTimestamp: timeNow(),
},
{
name: "timestamp is empty",
observedTimestamp: timeNow(),
expectedTimestamp: timeNow(),
},
{
name: "timestamp is empty and observed timestamp is empty",
expectedTimestamp: timeNow(),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lr := plog.NewLogRecord()
if !tt.timestamp.IsZero() {
lr.SetTimestamp(pcommon.NewTimestampFromTime(tt.timestamp))
}
if !tt.observedTimestamp.IsZero() {
lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(tt.observedTimestamp))
}

assert.Equal(t, time.Unix(0, int64(pcommon.NewTimestampFromTime(tt.expectedTimestamp))), timestampFromLogRecord(lr))
})
}
}
19 changes: 19 additions & 0 deletions exporter/lokiexporter/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokiexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter"

import "time"

var timeNow = time.Now
11 changes: 8 additions & 3 deletions exporter/observiqexporter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,15 @@ func resourceAndInstrumentationLogToEntry(resMap map[string]interface{}, log plo
}

func timestampFromRecord(log plog.LogRecord) string {
if log.Timestamp() == 0 {
return timeNow().UTC().Format(timestampFieldOutputLayout)
if log.Timestamp() != 0 {
return log.Timestamp().AsTime().UTC().Format(timestampFieldOutputLayout)
}
return log.Timestamp().AsTime().UTC().Format(timestampFieldOutputLayout)

if log.ObservedTimestamp() != 0 {
return log.ObservedTimestamp().AsTime().UTC().Format(timestampFieldOutputLayout)
}

return timeNow().UTC().Format(timestampFieldOutputLayout)
}

func messageFromRecord(log plog.LogRecord) string {
Expand Down
20 changes: 20 additions & 0 deletions exporter/observiqexporter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,26 @@ func TestLogdataToObservIQFormat(t *testing.T) {
},
{
"No timestamp on record",
func() plog.LogRecord {
logRecord := plog.NewLogRecord()
logRecord.Body().SetStringVal("Message")
logRecord.SetObservedTimestamp(nanoTs)
return logRecord
},
pcommon.NewResource,
"agent",
"agentID",
observIQLogEntry{
Timestamp: stringTs,
Message: "Message",
Severity: "default",
Data: nil,
Agent: &observIQAgentInfo{Name: "agent", ID: "agentID", Version: "latest"},
},
false,
},
{
"No timestamp and no observed timestamp on record",
func() plog.LogRecord {
logRecord := plog.NewLogRecord()
logRecord.Body().SetStringVal("Message")
Expand Down
6 changes: 2 additions & 4 deletions internal/stanza/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,10 @@ func Convert(ent *entry.Entry) plog.Logs {

// convertInto converts entry.Entry into provided plog.LogRecord.
func convertInto(ent *entry.Entry, dest plog.LogRecord) {
t := ent.ObservedTimestamp
if !ent.Timestamp.IsZero() {
t = ent.Timestamp
dest.SetTimestamp(pcommon.NewTimestampFromTime(ent.Timestamp))
}
dest.SetTimestamp(pcommon.NewTimestampFromTime(t))

dest.SetObservedTimestamp(pcommon.NewTimestampFromTime(ent.ObservedTimestamp))
dest.SetSeverityNumber(sevMap[ent.Severity])
dest.SetSeverityText(sevTextMap[ent.Severity])

Expand Down

0 comments on commit 1849cf8

Please sign in to comment.