diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 68fba68eb9647..f6cf1d22345f7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo if !ok { sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp) - stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streamsByFP[fp] = stream i.streams[stream.labelsString] = stream i.streamsCreatedTotal.Inc() @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp) - stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) + stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) i.streams[pushReqStream.Labels] = stream i.streamsByFP[fp] = stream diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ba044194124c5..28fdd42c5f0da 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -198,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) { for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord()) require.NoError(t, err) - chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk() + chunk := newStream(cfg, limiter, "fake", 0, nil, true, NilMetrics).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -364,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { lbs := makeRandomLabels() b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics)) + inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NilMetrics)) } }) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 436025ab9b09f..fea7f856ae3ef 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -4,8 +4,8 @@ import ( "fmt" "math" "sync" + "time" - cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter" "golang.org/x/time/rate" "github.com/grafana/loki/pkg/validation" @@ -129,27 +129,53 @@ func (l *Limiter) minNonZero(first, second int) int { return first } -type localStrategy struct { - limiter *Limiter +type RateLimiterStrategy interface { + RateLimit(tenant string) validation.RateLimit } -func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy { - return &localStrategy{ - limiter: l, +func (l *Limiter) RateLimit(tenant string) validation.RateLimit { + if l.disabled { + return validation.Unlimited } + + return l.limits.PerStreamRateLimit(tenant) } -func (s *localStrategy) Limit(userID string) float64 { - if s.limiter.disabled { - return float64(rate.Inf) +type StreamRateLimiter struct { + recheckPeriod time.Duration + recheckAt time.Time + strategy RateLimiterStrategy + tenant string + lim *rate.Limiter +} + +func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter { + rl := strategy.RateLimit(tenant) + return &StreamRateLimiter{ + recheckPeriod: recheckPeriod, + strategy: strategy, + tenant: tenant, + lim: rate.NewLimiter(rl.Limit, rl.Burst), } - return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID)) } -func (s *localStrategy) Burst(userID string) int { - if s.limiter.disabled { - // Burst is ignored when limit = rate.Inf - return 0 +func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool { + now := time.Now() + if now.After(l.recheckAt) { + l.recheckAt = now.Add(l.recheckPeriod) + + oldLim := l.lim.Limit() + oldBurst := l.lim.Burst() + + next := l.strategy.RateLimit(l.tenant) + + if oldLim != next.Limit || oldBurst != next.Burst { + // Edge case: rate.Inf doesn't advance nicely when reconfigured. + // To simplify, we just create a new limiter after reconfiguration rather + // than alter the existing one. + l.lim = rate.NewLimiter(next.Limit, next.Burst) + } } - return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID) + + return l.lim.AllowN(at, n) } diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 3be6c20cce3b1..b9646bb27d18f 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -4,9 +4,11 @@ import ( "fmt" "math" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/grafana/loki/pkg/validation" ) @@ -168,3 +170,44 @@ type ringCountMock struct { func (m *ringCountMock) HealthyInstancesCount() int { return m.count } + +// Assert some of the weirder (bug?) behavior of golang.org/x/time/rate +func TestGoLimiter(t *testing.T) { + for _, tc := range []struct { + desc string + lim *rate.Limiter + at time.Time + burst int + limit rate.Limit + allow int + exp bool + }{ + { + // I (owen-d) think this _should_ work and am supplying this test + // case by way of explanation for how the StreamRateLimiter + // works around the rate.Inf edge case. + desc: "changing inf limits unnecessarily cordons", + lim: rate.NewLimiter(rate.Inf, 0), + at: time.Now(), + burst: 2, + limit: 1, + allow: 1, + exp: false, + }, + { + desc: "non inf limit works", + lim: rate.NewLimiter(1, 2), + at: time.Now(), + burst: 2, + limit: 1, + allow: 1, + exp: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tc.lim.SetBurstAt(tc.at, tc.burst) + tc.lim.SetLimitAt(tc.at, tc.limit) + require.Equal(t, tc.exp, tc.lim.AllowN(tc.at, tc.allow)) + }) + } +} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 85af2a9704c73..dee33325229b5 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -66,7 +65,7 @@ type line struct { } type stream struct { - limiter *limiter.RateLimiter + limiter *StreamRateLimiter cfg *Config tenant string // Newest chunk at chunks[n-1]. @@ -116,9 +115,9 @@ type entryWithError struct { e error } -func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { +func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { return &stream{ - limiter: limiter.NewRateLimiter(limits, 10*time.Second), + limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second), cfg: cfg, fp: fp, labels: labels, @@ -240,7 +239,7 @@ func (s *stream) Push( } // Check if this this should be rate limited. now := time.Now() - if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) { + if !s.limiter.AllowN(now, len(entries[i].Line)) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit}) rateLimitedSamples++ rateLimitedBytes += len(entries[i].Line) diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 0d86b9b884f6b..a885227f466ed 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -53,7 +53,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { cfg.MaxReturnedErrors = tc.limit s := newStream( cfg, - newLocalStreamRateStrategy(limiter), + limiter, "fake", model.Fingerprint(0), labels.Labels{ @@ -98,7 +98,7 @@ func TestPushDeduplication(t *testing.T) { s := newStream( defaultConfig(), - newLocalStreamRateStrategy(limiter), + limiter, "fake", model.Fingerprint(0), labels.Labels{ @@ -127,7 +127,7 @@ func TestPushRejectOldCounter(t *testing.T) { s := newStream( defaultConfig(), - newLocalStreamRateStrategy(limiter), + limiter, "fake", model.Fingerprint(0), labels.Labels{ @@ -221,7 +221,7 @@ func TestUnorderedPush(t *testing.T) { s := newStream( &cfg, - newLocalStreamRateStrategy(limiter), + limiter, "fake", model.Fingerprint(0), labels.Labels{ @@ -319,7 +319,7 @@ func TestPushRateLimit(t *testing.T) { s := newStream( defaultConfig(), - newLocalStreamRateStrategy(limiter), + limiter, "fake", model.Fingerprint(0), labels.Labels{ @@ -360,7 +360,7 @@ func Benchmark_PushStream(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - s := newStream(&Config{}, newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), ls, true, NilMetrics) + s := newStream(&Config{}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) require.NoError(b, err) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 0fdf09dd224f9..2a8443637948e 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "golang.org/x/time/rate" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util/flagext" @@ -432,6 +433,14 @@ func (o *Overrides) DefaultLimits() *Limits { return o.defaultLimits } +func (o *Overrides) PerStreamRateLimit(userID string) RateLimit { + user := o.getOverridesForUser(userID) + return RateLimit{ + Limit: rate.Limit(float64(user.MaxLocalStreamRateBytes.Val())), + Burst: user.MaxLocalStreamBurstRateBytes.Val(), + } +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) diff --git a/pkg/validation/rate.go b/pkg/validation/rate.go new file mode 100644 index 0000000000000..f8a485126b69f --- /dev/null +++ b/pkg/validation/rate.go @@ -0,0 +1,17 @@ +package validation + +import "golang.org/x/time/rate" + +// RateLimit is a colocated limit & burst config. It largely exists to +// eliminate accidental misconfigurations due to race conditions when +// requesting the limit & burst config sequentially, between which the +// Limits configuration may have updated. +type RateLimit struct { + Limit rate.Limit + Burst int +} + +var Unlimited = RateLimit{ + Limit: rate.Inf, + Burst: 0, +}