Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor per stream rate limit #4213

Merged
merged 7 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk()
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -364,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics))
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NilMetrics))
}
})
}
Expand Down
63 changes: 48 additions & 15 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"math"
"sync"
"time"

cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -129,27 +129,60 @@ func (l *Limiter) minNonZero(first, second int) int {
return first
}

type localStrategy struct {
limiter *Limiter
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}

func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy {
return &localStrategy{
limiter: l,
func (l *Limiter) RateLimit(tenant string) validation.RateLimit {
if l.disabled {
return validation.Unlimited
}

return l.limits.PerStreamRateLimit(tenant)
}

type StreamRateLimiter struct {
recheckPeriod time.Duration
recheckAt time.Time
strategy RateLimiterStrategy
tenant string
lim *rate.Limiter
}

func (s *localStrategy) Limit(userID string) float64 {
if s.limiter.disabled {
return float64(rate.Inf)
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter {
rl := strategy.RateLimit(tenant)
return &StreamRateLimiter{
recheckPeriod: recheckPeriod,
strategy: strategy,
tenant: tenant,
lim: rate.NewLimiter(rl.Limit, rl.Burst),
}
return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID))
}

func (s *localStrategy) Burst(userID string) int {
if s.limiter.disabled {
// Burst is ignored when limit = rate.Inf
return 0
func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool {
now := time.Now()
if now.After(l.recheckAt) {
l.recheckAt = now.Add(l.recheckPeriod)

oldLim := l.lim.Limit()
oldBurst := l.lim.Burst()

next := l.strategy.RateLimit(l.tenant)

// Edge case: rate.Inf doesn't advance nicely when reconfigured.
// Instead of buffering it manually, we simply create a new limiter.
if oldLim == rate.Inf && next.Limit != oldLim {
l.lim = rate.NewLimiter(next.Limit, next.Burst)
} else {
if next.Burst != oldBurst {
l.lim.SetBurstAt(at, next.Burst)
}

if next.Limit != oldLim {
l.lim.SetLimitAt(at, next.Limit)
}
}
}
return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID)

return l.lim.AllowN(at, n)
}
43 changes: 43 additions & 0 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
)
Expand Down Expand Up @@ -168,3 +170,44 @@ type ringCountMock struct {
func (m *ringCountMock) HealthyInstancesCount() int {
return m.count
}

// Assert some of the weirder (bug?) behavior of golang.org/x/time/rate
func TestGoLimiter(t *testing.T) {
for _, tc := range []struct {
desc string
lim *rate.Limiter
at time.Time
burst int
limit rate.Limit
allow int
exp bool
}{
{
// I (owen-d) think this _should_ work and am supplying this test
// case by way of explanation for how the StreamRateLimiter
// works around the rate.Inf edge case.
desc: "changing inf limits unnecessarily cordons",
lim: rate.NewLimiter(rate.Inf, 0),
at: time.Now(),
burst: 2,
limit: 1,
allow: 1,
exp: false,
},
{
desc: "non inf limit works",
lim: rate.NewLimiter(1, 2),
at: time.Now(),
burst: 2,
limit: 1,
allow: 1,
exp: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
tc.lim.SetBurstAt(tc.at, tc.burst)
tc.lim.SetLimitAt(tc.at, tc.limit)
require.Equal(t, tc.exp, tc.lim.AllowN(tc.at, tc.allow))
})
}
}
9 changes: 4 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +65,7 @@ type line struct {
}

type stream struct {
limiter *limiter.RateLimiter
limiter *StreamRateLimiter
cfg *Config
tenant string
// Newest chunk at chunks[n-1].
Expand Down Expand Up @@ -116,9 +115,9 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
return &stream{
limiter: limiter.NewRateLimiter(limits, 10*time.Second),
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
Expand Down Expand Up @@ -240,7 +239,7 @@ func (s *stream) Push(
}
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) {
if !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
cfg.MaxReturnedErrors = tc.limit
s := newStream(
cfg,
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPushDeduplication(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestPushRejectOldCounter(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestUnorderedPush(t *testing.T) {

s := newStream(
&cfg,
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestPushRateLimit(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -360,7 +360,7 @@ func Benchmark_PushStream(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

s := newStream(&Config{}, newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), ls, true, NilMetrics)
s := newStream(&Config{}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{})
require.NoError(b, err)

Expand Down
9 changes: 9 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/flagext"
Expand Down Expand Up @@ -432,6 +433,14 @@ func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
user := o.getOverridesForUser(userID)
return RateLimit{
Limit: rate.Limit(float64(user.MaxLocalStreamRateBytes.Val())),
Burst: user.MaxLocalStreamBurstRateBytes.Val(),
}
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)
Expand Down
17 changes: 17 additions & 0 deletions pkg/validation/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package validation

import "golang.org/x/time/rate"

// RateLimit is a colocated limit & burst config. It largely exists to
// eliminate accidental misconfigurations due to race conditions when
// requesting the limit & burst config sequentially, between which the
// Limits configuration may have updated.
type RateLimit struct {
Limit rate.Limit
Burst int
}

var Unlimited = RateLimit{
Limit: rate.Inf,
Burst: 0,
}