Skip to content

Commit

Permalink
Refactor per stream rate limit (#4213)
Browse files Browse the repository at this point in the history
* uses new StreamRateLimiter

* sets burst before limiter, uses requested ts

* creates new non infinite rate limiters when configs change

* time/rate testware

* less indirection & protect rate limit access from race conditions

* removes old comment

* always recreate stream limiter after config change
  • Loading branch information
owen-d authored Aug 25, 2021
1 parent b72d8ab commit 668622c
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 30 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, newLocalStreamRateStrategy(i.limiter), i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func Test_SeriesQuery(t *testing.T) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, newLocalStreamRateStrategy(limiter), "fake", 0, nil, true, NilMetrics).NewChunk()
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -364,7 +364,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, newLocalStreamRateStrategy(limiter), "fake", 0, lbs, true, NilMetrics))
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NilMetrics))
}
})
}
Expand Down
56 changes: 41 additions & 15 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"math"
"sync"
"time"

cortex_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -129,27 +129,53 @@ func (l *Limiter) minNonZero(first, second int) int {
return first
}

type localStrategy struct {
limiter *Limiter
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}

func newLocalStreamRateStrategy(l *Limiter) cortex_limiter.RateLimiterStrategy {
return &localStrategy{
limiter: l,
func (l *Limiter) RateLimit(tenant string) validation.RateLimit {
if l.disabled {
return validation.Unlimited
}

return l.limits.PerStreamRateLimit(tenant)
}

func (s *localStrategy) Limit(userID string) float64 {
if s.limiter.disabled {
return float64(rate.Inf)
type StreamRateLimiter struct {
recheckPeriod time.Duration
recheckAt time.Time
strategy RateLimiterStrategy
tenant string
lim *rate.Limiter
}

func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter {
rl := strategy.RateLimit(tenant)
return &StreamRateLimiter{
recheckPeriod: recheckPeriod,
strategy: strategy,
tenant: tenant,
lim: rate.NewLimiter(rl.Limit, rl.Burst),
}
return float64(s.limiter.limits.MaxLocalStreamRateBytes(userID))
}

func (s *localStrategy) Burst(userID string) int {
if s.limiter.disabled {
// Burst is ignored when limit = rate.Inf
return 0
func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool {
now := time.Now()
if now.After(l.recheckAt) {
l.recheckAt = now.Add(l.recheckPeriod)

oldLim := l.lim.Limit()
oldBurst := l.lim.Burst()

next := l.strategy.RateLimit(l.tenant)

if oldLim != next.Limit || oldBurst != next.Burst {
// Edge case: rate.Inf doesn't advance nicely when reconfigured.
// To simplify, we just create a new limiter after reconfiguration rather
// than alter the existing one.
l.lim = rate.NewLimiter(next.Limit, next.Burst)
}
}
return s.limiter.limits.MaxLocalStreamBurstRateBytes(userID)

return l.lim.AllowN(at, n)
}
43 changes: 43 additions & 0 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/validation"
)
Expand Down Expand Up @@ -168,3 +170,44 @@ type ringCountMock struct {
func (m *ringCountMock) HealthyInstancesCount() int {
return m.count
}

// Assert some of the weirder (bug?) behavior of golang.org/x/time/rate
func TestGoLimiter(t *testing.T) {
for _, tc := range []struct {
desc string
lim *rate.Limiter
at time.Time
burst int
limit rate.Limit
allow int
exp bool
}{
{
// I (owen-d) think this _should_ work and am supplying this test
// case by way of explanation for how the StreamRateLimiter
// works around the rate.Inf edge case.
desc: "changing inf limits unnecessarily cordons",
lim: rate.NewLimiter(rate.Inf, 0),
at: time.Now(),
burst: 2,
limit: 1,
allow: 1,
exp: false,
},
{
desc: "non inf limit works",
lim: rate.NewLimiter(1, 2),
at: time.Now(),
burst: 2,
limit: 1,
allow: 1,
exp: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
tc.lim.SetBurstAt(tc.at, tc.burst)
tc.lim.SetLimitAt(tc.at, tc.limit)
require.Equal(t, tc.exp, tc.lim.AllowN(tc.at, tc.allow))
})
}
}
9 changes: 4 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -66,7 +65,7 @@ type line struct {
}

type stream struct {
limiter *limiter.RateLimiter
limiter *StreamRateLimiter
cfg *Config
tenant string
// Newest chunk at chunks[n-1].
Expand Down Expand Up @@ -116,9 +115,9 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, limits limiter.RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
return &stream{
limiter: limiter.NewRateLimiter(limits, 10*time.Second),
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
Expand Down Expand Up @@ -240,7 +239,7 @@ func (s *stream) Push(
}
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, s.tenant, len(entries[i].Line)) {
if !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
cfg.MaxReturnedErrors = tc.limit
s := newStream(
cfg,
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPushDeduplication(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestPushRejectOldCounter(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestUnorderedPush(t *testing.T) {

s := newStream(
&cfg,
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestPushRateLimit(t *testing.T) {

s := newStream(
defaultConfig(),
newLocalStreamRateStrategy(limiter),
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
Expand Down Expand Up @@ -360,7 +360,7 @@ func Benchmark_PushStream(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

s := newStream(&Config{}, newLocalStreamRateStrategy(limiter), "fake", model.Fingerprint(0), ls, true, NilMetrics)
s := newStream(&Config{}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{})
require.NoError(b, err)

Expand Down
9 changes: 9 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/flagext"
Expand Down Expand Up @@ -432,6 +433,14 @@ func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
user := o.getOverridesForUser(userID)
return RateLimit{
Limit: rate.Limit(float64(user.MaxLocalStreamRateBytes.Val())),
Burst: user.MaxLocalStreamBurstRateBytes.Val(),
}
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)
Expand Down
17 changes: 17 additions & 0 deletions pkg/validation/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package validation

import "golang.org/x/time/rate"

// RateLimit is a colocated limit & burst config. It largely exists to
// eliminate accidental misconfigurations due to race conditions when
// requesting the limit & burst config sequentially, between which the
// Limits configuration may have updated.
type RateLimit struct {
Limit rate.Limit
Burst int
}

var Unlimited = RateLimit{
Limit: rate.Inf,
Burst: 0,
}

0 comments on commit 668622c

Please sign in to comment.