Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max streams per user global limit #1493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Features

* [1493](https://github.com/grafana/loki/pull/1493) **pracucci**: pkg/ingester: added a per-cluster limit on the maximum number of series per-user, configured via the `max_global_streams_per_user` config option.
* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Added `global` ingestion rate limiter strategy support.

Expand Down
8 changes: 7 additions & 1 deletion docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -762,9 +762,15 @@ logs in Loki.
# Enforce every sample has a metric name.
[enforce_metric_name: <boolean> | default = true]

# Maximum number of active streams per user.
# Maximum number of active streams per user, per ingester. 0 to disable.
[max_streams_per_user: <int> | default = 10000]

# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
# ingesters, and is kept updated whenever the number of ingesters change.
[max_global_streams_per_user: <int> | default = 0]

# Maximum number of chunks that can be fetched by a single query.
[max_chunks_per_query: <int> | default = 2000000]

Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Ingester struct {
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk
}

Expand All @@ -126,7 +126,6 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
limits: limits,
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
Expand All @@ -145,6 +144,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid

i.lifecycler.Start()

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

i.done.Add(1)
go i.loop()

Expand Down Expand Up @@ -208,7 +211,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.factory, i.limits, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
inst = newInstance(instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestIngester(t *testing.T) {
func TestIngesterStreamLimitExceeded(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxStreamsPerUser = 1
defaultLimits.MaxLocalStreamsPerUser = 1
overrides, err := validation.NewOverrides(defaultLimits)

require.NoError(t, err)
Expand Down
13 changes: 7 additions & 6 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const queryBatchSize = 128
Expand Down Expand Up @@ -66,15 +65,15 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk

// sync
syncPeriod time.Duration
syncMinUtil float64
}

func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides, syncPeriod time.Duration, syncMinUtil float64) *instance {
func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Expand All @@ -85,7 +84,7 @@ func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *valid

factory: factory,
tailers: map[uint32]*tailer{},
limits: limits,
limiter: limiter,

syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
Expand Down Expand Up @@ -160,9 +159,11 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}

sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.factory)
i.streams[fp] = stream
Expand Down
15 changes: 9 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ var defaultFactory = func() chunkenc.Chunk {
}

func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance("test", defaultFactory, o, 0, 0)
i := newInstance("test", defaultFactory, limiter, 0, 0)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -50,10 +51,11 @@ func TestLabelsCollisions(t *testing.T) {
}

func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance("test", defaultFactory, o, 0, 0)
inst := newInstance("test", defaultFactory, limiter, 0, 0)

const (
concurrent = 10
Expand Down Expand Up @@ -100,8 +102,9 @@ func TestConcurrentPushes(t *testing.T) {
}

func TestSyncPeriod(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

const (
syncPeriod = 1 * time.Minute
Expand All @@ -110,7 +113,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance("test", defaultFactory, o, syncPeriod, minUtil)
inst := newInstance("test", defaultFactory, limiter, syncPeriod, minUtil)
lbls := makeRandomLabels()

tt := time.Now()
Expand Down
94 changes: 94 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ingester

import (
"fmt"
"math"

"github.com/grafana/loki/pkg/util/validation"
)

const (
errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded"
)

// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
HealthyInstancesCount() int
}

// Limiter implements primitives to get the maximum number of streams
// an ingester can handle for a specific tenant
type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
}

// NewLimiter makes a new limiter
func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
}
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
actualLimit := l.maxStreamsPerUser(userID)
if streams < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxStreamsPerUser(userID string) int {
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
}

return localLimit
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}

// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()

// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}

return 0
}

func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}

return first
}
Loading