Skip to content

Commit

Permalink
better per stream rate limits configuration options (#4236)
Browse files Browse the repository at this point in the history
* better per stream rate limits configuration options

* clean up test config

* pr feedback
  • Loading branch information
owen-d authored Sep 1, 2021
1 parent 0f0cb9f commit 20515a2
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 56 deletions.
11 changes: 11 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,17 @@ logs in Loki.
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <int> | default = 0]

# Maximum byte rate per second per stream,
# also expressible in human readable forms (1MB, 256KB, etc).
# CLI flag: -ingester.per-stream-rate-limit
[per_stream_rate_limit: <string|int> | default = "3MB"]

# Maximum burst bytes per stream,
# also expressible in human readable forms (1MB, 256KB, etc).
# This is how far above the rate limit a stream can "burst" before the stream is limited.
# CLI flag: -ingester.per-stream-rate-limit-burst
[per_stream_rate_limit_burst: <string|int> | default = "15MB"]

# Limit how far back in time series data and metadata can be queried, up until lookback duration ago.
# This limit is enforced in the query frontend, the querier and the ruler.
# If the requested time range is outside the allowed range, the request will not fail,
Expand Down
15 changes: 8 additions & 7 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,14 @@ var (

func Test_SeriesIterator(t *testing.T) {
var instances []*instance
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}, nil)

// NB (owen-d): Not sure why we have these overrides
l := defaultLimitsTestConfig()
l.MaxLocalStreamsPerUser = 1000
l.IngestionRateMB = 1e4
l.IngestionBurstSizeMB = 1e4

limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down
42 changes: 7 additions & 35 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ func defaultConfig() *Config {
var NilMetrics = newIngesterMetrics(nil)

func TestLabelsCollisions(t *testing.T) {
l := validation.Limits{
MaxLocalStreamsPerUser: 10000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand All @@ -69,12 +64,7 @@ func TestLabelsCollisions(t *testing.T) {
}

func TestConcurrentPushes(t *testing.T) {
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -125,12 +115,7 @@ func TestConcurrentPushes(t *testing.T) {
}

func TestSyncPeriod(t *testing.T) {
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -172,12 +157,7 @@ func TestSyncPeriod(t *testing.T) {
}

func Test_SeriesQuery(t *testing.T) {
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -292,12 +272,7 @@ func makeRandomLabels() labels.Labels {
}

func Benchmark_PushInstance(b *testing.B) {
l := validation.Limits{
MaxLocalStreamsPerUser: 1000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
limits, err := validation.NewOverrides(l, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

Expand Down Expand Up @@ -337,11 +312,8 @@ func Benchmark_PushInstance(b *testing.B) {
}

func Benchmark_instance_addNewTailer(b *testing.B) {
l := validation.Limits{
MaxLocalStreamsPerUser: 100000,
MaxLocalStreamRateBytes: defaultLimitsTestConfig().MaxLocalStreamRateBytes,
MaxLocalStreamBurstRateBytes: defaultLimitsTestConfig().MaxLocalStreamBurstRateBytes,
}
l := defaultLimitsTestConfig()
l.MaxLocalStreamsPerUser = 100000
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ func TestUnorderedPush(t *testing.T) {

func TestPushRateLimit(t *testing.T) {
l := validation.Limits{
MaxLocalStreamRateBytes: 10,
MaxLocalStreamBurstRateBytes: 10,
PerStreamRateLimit: 10,
PerStreamRateLimitBurst: 10,
}
limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
Expand Down
45 changes: 33 additions & 12 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Limits struct {
UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"`
MaxLocalStreamRateBytes flagext.ByteSize `yaml:"max_stream_rate_bytes" json:"max_stream_rate_bytes"`
MaxLocalStreamBurstRateBytes flagext.ByteSize `yaml:"max_stream_burst_rate_bytes" json:"max_stream_burst_rate_bytes"`
PerStreamRateLimit flagext.ByteSize `yaml:"per_stream_rate_limit" json:"per_stream_rate_limit"`
PerStreamRateLimitBurst flagext.ByteSize `yaml:"per_stream_rate_limit_burst" json:"per_stream_rate_limit_burst"`

// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"`
Expand Down Expand Up @@ -114,10 +116,17 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.")
f.BoolVar(&l.UnorderedWrites, "ingester.unordered-writes", false, "(Experimental) Allow out of order writes.")

// Deprecated
_ = l.MaxLocalStreamRateBytes.Set(strconv.Itoa(defaultPerStreamRateLimit))
f.Var(&l.MaxLocalStreamRateBytes, "ingester.max-stream-rate-bytes", "Maximum bytes per second rate per active stream.")
f.Var(&l.MaxLocalStreamRateBytes, "ingester.max-stream-rate-bytes", "Maximum bytes per second rate per active stream (deprecated in favor of ingester.per-stream-rate-limit).")
_ = l.MaxLocalStreamBurstRateBytes.Set(strconv.Itoa(defaultPerStreamBurstLimit))
f.Var(&l.MaxLocalStreamBurstRateBytes, "ingester.max-stream-burst-bytes", "Maximum burst bytes per second rate per active stream.")
f.Var(&l.MaxLocalStreamBurstRateBytes, "ingester.max-stream-burst-bytes", "Maximum burst bytes per second rate per active stream (deprecated in favor of ingester.per-stream-rate-limit-burst).")

_ = l.PerStreamRateLimit.Set(strconv.Itoa(defaultPerStreamRateLimit))
f.Var(&l.PerStreamRateLimit, "ingester.per-stream-rate-limit", "Maximum byte rate per second per stream, also expressible in human readable forms (1MB, 256KB, etc).")
_ = l.PerStreamRateLimitBurst.Set(strconv.Itoa(defaultPerStreamBurstLimit))
f.Var(&l.PerStreamRateLimitBurst, "ingester.per-stream-rate-limit-burst", "Maximum burst bytes per stream, also expressible in human readable forms (1MB, 256KB, etc).")

f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")

Expand Down Expand Up @@ -417,14 +426,6 @@ func (o *Overrides) UnorderedWrites(userID string) bool {
return o.getOverridesForUser(userID).UnorderedWrites
}

func (o *Overrides) MaxLocalStreamRateBytes(userID string) int {
return o.getOverridesForUser(userID).MaxLocalStreamRateBytes.Val()
}

func (o *Overrides) MaxLocalStreamBurstRateBytes(userID string) int {
return o.getOverridesForUser(userID).MaxLocalStreamBurstRateBytes.Val()
}

func (o *Overrides) ForEachTenantLimit(callback ForEachTenantLimitCallback) {
o.tenantLimits.ForEachTenantLimit(callback)
}
Expand All @@ -435,9 +436,20 @@ func (o *Overrides) DefaultLimits() *Limits {

func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
user := o.getOverridesForUser(userID)

return RateLimit{
Limit: rate.Limit(float64(user.MaxLocalStreamRateBytes.Val())),
Burst: user.MaxLocalStreamBurstRateBytes.Val(),
Limit: rate.Limit(float64(
firstNonDefault(
defaultPerStreamRateLimit,
user.PerStreamRateLimit.Val(),
user.MaxLocalStreamRateBytes.Val(),
),
)),
Burst: firstNonDefault(
defaultPerStreamBurstLimit,
user.PerStreamRateLimitBurst.Val(),
user.MaxLocalStreamBurstRateBytes.Val(),
),
}
}

Expand All @@ -450,3 +462,12 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
}
return o.defaultLimits
}

func firstNonDefault(def int, xs ...int) int {
for _, x := range xs {
if x != def {
return x
}
}
return def
}

0 comments on commit 20515a2

Please sign in to comment.