Skip to content

Commit

Permalink
Apply changes from code review
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 10, 2024
1 parent 30da06d commit 167f350
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
12 changes: 6 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
levelDetector := newFieldDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()
shouldDiscoverGenericFields := levelDetector.shouldDiscoverGenericFields()
fieldDetector := newFieldDetector(validationContext)
shouldDiscoverLevels := fieldDetector.shouldDiscoverLogLevels()
shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
Expand Down Expand Up @@ -536,14 +536,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}
if shouldDiscoverLevels {
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
logLevel, ok := fieldDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
}
if shouldDiscoverGenericFields {
for field, hints := range levelDetector.validationContext.discoverGenericFields {
extracted, ok := levelDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry)
for field, hints := range fieldDetector.validationContext.discoverGenericFields {
extracted, ok := fieldDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, extracted)
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/distributor/field_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,24 @@ func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {

func (l *FieldDetector) getValueUsingLogfmtParser(line []byte, hints []string) []byte {
d := logfmt.NewDecoder(line)
// In order to have the same behaviour as the JSON field extraction,
// the full line needs to be parsed to extract all possible matching fields.
pos := len(hints) // the index of the hint that matches
var res []byte
for !d.EOL() && d.ScanKeyval() {
k := unsafe.String(unsafe.SliceData(d.Key()), len(d.Key()))
for _, hint := range hints {
if strings.EqualFold(k, hint) {
return d.Value()
for x, hint := range hints {
if strings.EqualFold(k, hint) && x < pos {
res, pos = d.Value(), x
// If there is only a single hint, or the matching hint is the first one,
// we can stop parsing the rest of the line and return early.
if x == 0 {
return res
}
}
}
}
return nil
return res
}

func (l *FieldDetector) getValueUsingJSONParser(log []byte, hints []string) []byte {
Expand Down
26 changes: 26 additions & 0 deletions pkg/distributor/field_detection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,19 @@ func Test_DetectGenericFields(t *testing.T) {
{Name: "org_id", Value: "fake"},
},
},
{
name: "logline (logfmt) matches multiple fields",
labels: labels.Labels{
{Name: "env", Value: "prod"},
},
entry: push.Entry{
Line: `msg="this log line matches" tenant_id="fake_a" org_id=fake_b duration=1h`,
StructuredMetadata: push.LabelsAdapter{},
},
expected: push.LabelsAdapter{
{Name: "org_id", Value: "fake_b"}, // first field from configuration that matches takes precedence
},
},
{
name: "logline (json) matches",
labels: labels.Labels{
Expand All @@ -551,6 +564,19 @@ func Test_DetectGenericFields(t *testing.T) {
{Name: "org_id", Value: "fake"},
},
},
{
name: "logline (json) matches multiple fields",
labels: labels.Labels{
{Name: "env", Value: "prod"},
},
entry: push.Entry{
Line: `{"msg": "this log line matches", "tenant_id": "fake_a", "org_id": "fake_b", "duration": "1s"}`,
StructuredMetadata: push.LabelsAdapter{},
},
expected: push.LabelsAdapter{
{Name: "org_id", Value: "fake_b"}, // first field from configuration that matches takes precedence
},
},
} {
t.Run(tc.name, func(t *testing.T) {
extracted := push.LabelsAdapter{}
Expand Down

0 comments on commit 167f350

Please sign in to comment.