Skip to content

Commit

Permalink
feat(loki): include structured_metadata size while asserting rate lim…
Browse files Browse the repository at this point in the history
…it (#14571)

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko authored Oct 25, 2024
1 parent 7965722 commit a962edb
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 98 deletions.
3 changes: 2 additions & 1 deletion docs/sources/get-started/labels/structured-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ See the [Promtail: Structured metadata stage](https://grafana.com/docs/loki/<LOK
With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash](https://grafana.com/docs/loki/<LOKI_VERSION>/send-data/logstash/).

{{% admonition type="warning" %}}
There are defaults for how much structured metadata can be attached per log line.
Structured metadata size is taken into account while asserting ingestion rate limiting.
Along with that, there are separate limits on how much structured metadata can be attached per log line.
```
# Maximum size accepted for structured metadata per log line.
# CLI flag: -limits.max-structured-metadata-size
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/operations/request-validation-rate-limits.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ It is recommended that Loki operators set up alerts or dashboards with these met

### Terminology

- **sample**: a log line
- **sample**: a log line with [structured metadata]({{< relref "../get-started/labels/structured-metadata" >}})
- **stream**: samples with a unique combination of labels
- **active stream**: streams that are present in the ingesters - these have recently received log lines within the `chunk_idle_period` period (default: 30m)

Expand Down
3 changes: 2 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3260,7 +3260,8 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -distributor.ingestion-rate-limit-strategy
[ingestion_rate_strategy: <string> | default = "global"]
# Per-user ingestion rate limit in sample size per second. Units in MB.
# Per-user ingestion rate limit in sample size per second. Sample size includes
# size of the logs line and the size of structured metadata labels. Units in MB.
# CLI flag: -distributor.ingestion-rate-limit-mb
[ingestion_rate_mb: <float> | default = 4]
Expand Down
14 changes: 4 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(bytes))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes))
continue
}

Expand Down Expand Up @@ -527,7 +524,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n++
validatedLineSize += len(entry.Line)
validatedLineSize += util.EntryTotalSize(&entry)
validatedLineCount++
pushSize += len(entry.Line)
}
Expand Down Expand Up @@ -706,10 +703,7 @@ func (d *Distributor) trackDiscardedData(
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}
discardedStreamBytes := util.EntriesTotalSize(stream.Entries)

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(ctx, tenantID, reason, lbs, float64(discardedStreamBytes))
Expand Down
51 changes: 27 additions & 24 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
Expand Down Expand Up @@ -54,7 +55,9 @@ var (
)

func TestDistributor(t *testing.T) {
ingestionRateLimit := 0.000096 // 100 Bytes/s limit
lineSize := 10
ingestionRateLimit := datasize.ByteSize(400)
ingestionRateLimitMB := ingestionRateLimit.MBytes() // 400 Bytes/s limit

for i, tc := range []struct {
lines int
Expand All @@ -72,7 +75,7 @@ func TestDistributor(t *testing.T) {
{
lines: 100,
streams: 1,
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 100, 1000)},
expectedErrors: []error{httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", ingestionRateLimit, 100, 100*lineSize)},
},
{
lines: 100,
Expand Down Expand Up @@ -104,15 +107,15 @@ func TestDistributor(t *testing.T) {
t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.IngestionRateMB = ingestionRateLimitMB
limits.IngestionBurstSizeMB = ingestionRateLimitMB
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

distributors, _ := prepare(t, 1, 5, limits, nil)

var request logproto.PushRequest
for i := 0; i < tc.streams; i++ {
req := makeWriteRequest(tc.lines, 10)
req := makeWriteRequest(tc.lines, lineSize)
request.Streams = append(request.Streams, req.Streams[0])
}

Expand Down Expand Up @@ -1178,37 +1181,37 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
"local strategy: limit should be set to each distributor": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(100).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(100).MBytes(),
pushes: []testPush{
{bytes: 5, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 10, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 10, 1, 1)},
{bytes: 50, expectedError: nil},
{bytes: 60, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 60)},
{bytes: 50, expectedError: nil},
{bytes: 40, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 40)},
},
},
"global strategy: limit should be evenly shared across distributors": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(200).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(100).MBytes(),
pushes: []testPush{
{bytes: 3, expectedError: nil},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 3)},
{bytes: 2, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 1)},
{bytes: 60, expectedError: nil},
{bytes: 50, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 50)},
{bytes: 40, expectedError: nil},
{bytes: 30, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 100, 1, 30)},
},
},
"global strategy: burst should set to each distributor": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRateMB: 10 * (1.0 / float64(bytesInMB)),
ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)),
ingestionRateMB: datasize.ByteSize(100).MBytes(),
ingestionBurstSizeMB: datasize.ByteSize(200).MBytes(),
pushes: []testPush{
{bytes: 15, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 5, 1, 1)},
{bytes: 150, expectedError: nil},
{bytes: 60, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 50, 1, 60)},
{bytes: 50, expectedError: nil},
{bytes: 30, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, "test", 50, 1, 30)},
},
},
}
Expand All @@ -1227,8 +1230,8 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.NoError(t, err)
assert.Equal(t, success, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
Expand Down
43 changes: 19 additions & 24 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -82,25 +83,28 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))
structuredMetadataCount := len(entry.StructuredMetadata)
structuredMetadataSizeBytes := util.StructuredMetadataSize(entry.StructuredMetadata)
entrySize := float64(len(entry.Line) + structuredMetadataSizeBytes)

if vCtx.rejectOldSample && ts < vCtx.rejectOldSampleMaxAge {
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize)
}
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > vCtx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize)
}
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}
Expand All @@ -111,43 +115,37 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize)
}
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if len(entry.StructuredMetadata) > 0 {
if structuredMetadataCount > 0 {
if !vCtx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize)
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

var structuredMetadataSizeBytes, structuredMetadataCount int
for _, metadata := range entry.StructuredMetadata {
structuredMetadataSizeBytes += len(metadata.Name) + len(metadata.Value)
structuredMetadataCount++
}

if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize)
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize)
}

if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(float64(len(entry.Line)))
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, float64(len(entry.Line)))
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize)
}
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount)
}
Expand Down Expand Up @@ -207,10 +205,7 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (b
}

func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries)))
bytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
}
6 changes: 2 additions & 4 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
Expand Down Expand Up @@ -269,10 +270,7 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
bytes := util.EntriesTotalSize(pushReqStream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
Expand Down
20 changes: 11 additions & 9 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/flagext"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -245,15 +246,15 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
continue
}

lineBytes := len(entries[i].Line)
totalBytes += lineBytes
entryBytes := util.EntryTotalSize(&entries[i])
totalBytes += entryBytes

now := time.Now()
if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}})
if !rateLimitWholeStream && !s.limiter.AllowN(now, entryBytes) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(entryBytes)}})
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e)
rateLimitedSamples++
rateLimitedBytes += lineBytes
rateLimitedBytes += entryBytes
continue
}

Expand All @@ -263,11 +264,11 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
outOfOrderBytes += lineBytes
outOfOrderBytes += entryBytes
continue
}

validBytes += lineBytes
validBytes += entryBytes

lastLine.ts = entries[i].Timestamp
lastLine.content = entries[i].Line
Expand All @@ -287,8 +288,9 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
rateLimitedSamples = len(toStore)
failedEntriesWithError = make([]entryWithError, 0, len(toStore))
for i := 0; i < len(toStore); i++ {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}})
rateLimitedBytes += len(toStore[i].Line)
entryTotalSize := util.EntryTotalSize(toStore[i])
failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(entryTotalSize)}})
rateLimitedBytes += entryTotalSize
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,7 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
bytes := util.EntriesTotalSize(pushReqStream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
Expand Down
Loading

0 comments on commit a962edb

Please sign in to comment.