From 0586bfb5ed7d5dc4a8980fde3d1f3473447fdd1f Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 9 Jan 2020 14:19:30 +0100 Subject: [PATCH 1/2] Added max streams per user global limit Signed-off-by: Marco Pracucci --- docs/configuration/README.md | 8 +- pkg/ingester/ingester.go | 9 +- pkg/ingester/ingester_test.go | 2 +- pkg/ingester/instance.go | 13 +-- pkg/ingester/instance_test.go | 15 +-- pkg/ingester/limiter.go | 94 ++++++++++++++++ pkg/ingester/limiter_test.go | 195 ++++++++++++++++++++++++++++++++++ pkg/util/validation/limits.go | 19 +++- 8 files changed, 333 insertions(+), 22 deletions(-) create mode 100644 pkg/ingester/limiter.go create mode 100644 pkg/ingester/limiter_test.go diff --git a/docs/configuration/README.md b/docs/configuration/README.md index 035a5d203a49..f75010434a35 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -746,9 +746,15 @@ logs in Loki. # Enforce every sample has a metric name. [enforce_metric_name: | default = true] -# Maximum number of active streams per user. +# Maximum number of active streams per user, per ingester. 0 to disable. [max_streams_per_user: | default = 10000] +# Maximum number of active streams per user, across the cluster. 0 to disable. +# When the global limit is enabled, each ingester is configured with a dynamic +# local limit based on the replication factor and the current number of healthy +# ingesters, and is kept updated whenever the number of ingesters change. +[max_global_streams_per_user: | default = 0] + # Maximum number of chunks that can be fetched by a single query. [max_chunks_per_query: | default = 2000000] diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ff3937843d6e..4ca6d5ce54a6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -99,7 +99,7 @@ type Ingester struct { flushQueues []*util.PriorityQueue flushQueuesDone sync.WaitGroup - limits *validation.Overrides + limiter *Limiter factory func() chunkenc.Chunk } @@ -126,7 +126,6 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid quit: make(chan struct{}), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), quitting: make(chan struct{}), - limits: limits, factory: func() chunkenc.Chunk { return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize) }, @@ -145,6 +144,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid i.lifecycler.Start() + // Now that the lifecycler has been created, we can create the limiter + // which depends on it. + i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) + i.done.Add(1) go i.loop() @@ -208,7 +211,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(instanceID, i.factory, i.limits, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization) + inst = newInstance(instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0dd1b7ee9037..1894e1103277 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -189,7 +189,7 @@ func TestIngester(t *testing.T) { func TestIngesterStreamLimitExceeded(t *testing.T) { ingesterConfig := defaultIngesterTestConfig(t) defaultLimits := defaultLimitsTestConfig() - defaultLimits.MaxStreamsPerUser = 1 + defaultLimits.MaxLocalStreamsPerUser = 1 overrides, err := validation.NewOverrides(defaultLimits) require.NoError(t, err) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 0e844ab66011..5e2bb7aa827a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -24,7 +24,6 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util" - "github.com/grafana/loki/pkg/util/validation" ) const queryBatchSize = 128 @@ -66,7 +65,7 @@ type instance struct { tailers map[uint32]*tailer tailerMtx sync.RWMutex - limits *validation.Overrides + limiter *Limiter factory func() chunkenc.Chunk // sync @@ -74,7 +73,7 @@ type instance struct { syncMinUtil float64 } -func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides, syncPeriod time.Duration, syncMinUtil float64) *instance { +func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance { i := &instance{ streams: map[model.Fingerprint]*stream{}, index: index.New(), @@ -85,7 +84,7 @@ func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *valid factory: factory, tailers: map[uint32]*tailer{}, - limits: limits, + limiter: limiter, syncPeriod: syncPeriod, syncMinUtil: syncMinUtil, @@ -160,9 +159,11 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err return stream, nil } - if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) { - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID)) + err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) } + sortedLabels := i.index.Add(labels, fp) stream = newStream(fp, sortedLabels, i.factory) i.streams[fp] = stream diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0ab5cca52511..9f1e6d50d03b 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -24,10 +24,11 @@ var defaultFactory = func() chunkenc.Chunk { } func TestLabelsCollisions(t *testing.T) { - o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) + limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}) require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance("test", defaultFactory, o, 0, 0) + i := newInstance("test", defaultFactory, limiter, 0, 0) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -50,10 +51,11 @@ func TestLabelsCollisions(t *testing.T) { } func TestConcurrentPushes(t *testing.T) { - o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) + limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}) require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance("test", defaultFactory, o, 0, 0) + inst := newInstance("test", defaultFactory, limiter, 0, 0) const ( concurrent = 10 @@ -100,8 +102,9 @@ func TestConcurrentPushes(t *testing.T) { } func TestSyncPeriod(t *testing.T) { - o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) + limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}) require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) const ( syncPeriod = 1 * time.Minute @@ -110,7 +113,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance("test", defaultFactory, o, syncPeriod, minUtil) + inst := newInstance("test", defaultFactory, limiter, syncPeriod, minUtil) lbls := makeRandomLabels() tt := time.Now() diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go new file mode 100644 index 000000000000..5f1b52002754 --- /dev/null +++ b/pkg/ingester/limiter.go @@ -0,0 +1,94 @@ +package ingester + +import ( + "fmt" + "math" + + "github.com/grafana/loki/pkg/util/validation" +) + +const ( + errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded" +) + +// RingCount is the interface exposed by a ring implementation which allows +// to count members +type RingCount interface { + HealthyInstancesCount() int +} + +// Limiter implements primitives to get the maximum number of streams +// an ingester can handle for a specific tenant +type Limiter struct { + limits *validation.Overrides + ring RingCount + replicationFactor int +} + +// NewLimiter makes a new limiter +func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter { + return &Limiter{ + limits: limits, + ring: ring, + replicationFactor: replicationFactor, + } +} + +// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current +// number of streams in input and returns an error if so. +func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { + actualLimit := l.maxStreamsPerUser(userID) + if streams < actualLimit { + return nil + } + + localLimit := l.limits.MaxLocalStreamsPerUser(userID) + globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) + + return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit) +} + +func (l *Limiter) maxStreamsPerUser(userID string) int { + localLimit := l.limits.MaxLocalStreamsPerUser(userID) + + // We can assume that streams are evenly distributed across ingesters + // so we do convert the global limit into a local limit + globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) + localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit)) + + // If both the local and global limits are disabled, we just + // use the largest int value + if localLimit == 0 { + localLimit = math.MaxInt32 + } + + return localLimit +} + +func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { + if globalLimit == 0 { + return 0 + } + + // Given we don't need a super accurate count (ie. when the ingesters + // topology changes) and we prefer to always be in favor of the tenant, + // we can use a per-ingester limit equal to: + // (global limit / number of ingesters) * replication factor + numIngesters := l.ring.HealthyInstancesCount() + + // May happen because the number of ingesters is asynchronously updated. + // If happens, we just temporarily ignore the global limit. + if numIngesters > 0 { + return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) + } + + return 0 +} + +func (l *Limiter) minNonZero(first, second int) int { + if first == 0 || (second != 0 && first > second) { + return second + } + + return first +} diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go new file mode 100644 index 000000000000..3042b1b58f1d --- /dev/null +++ b/pkg/ingester/limiter_test.go @@ -0,0 +1,195 @@ +package ingester + +import ( + "fmt" + "math" + "testing" + + "github.com/grafana/loki/pkg/util/validation" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLimiter_maxStreamsPerUser(t *testing.T) { + tests := map[string]struct { + maxLocalStreamsPerUser int + maxGlobalStreamsPerUser int + ringReplicationFactor int + ringIngesterCount int + expected int + }{ + "both local and global limits are disabled": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 1, + ringIngesterCount: 1, + expected: math.MaxInt32, + }, + "only local limit is enabled": { + maxLocalStreamsPerUser: 1000, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 1, + ringIngesterCount: 1, + expected: 1000, + }, + "only global limit is enabled with replication-factor=1": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 1, + ringIngesterCount: 10, + expected: 100, + }, + "only global limit is enabled with replication-factor=3": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + expected: 300, + }, + "both local and global limits are set with local limit < global limit": { + maxLocalStreamsPerUser: 150, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + expected: 150, + }, + "both local and global limits are set with local limit > global limit": { + maxLocalStreamsPerUser: 500, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + expected: 300, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + // Mock the ring + ring := &ringCountMock{count: testData.ringIngesterCount} + + // Mock limits + limits, err := validation.NewOverrides(validation.Limits{ + MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, + MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser, + }) + require.NoError(t, err) + + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor) + actual := limiter.maxStreamsPerUser("test") + + assert.Equal(t, testData.expected, actual) + }) + } +} + +func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { + tests := map[string]struct { + maxLocalStreamsPerUser int + maxGlobalStreamsPerUser int + ringReplicationFactor int + ringIngesterCount int + streams int + expected error + }{ + "both local and global limit are disabled": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 1, + ringIngesterCount: 1, + streams: 100, + expected: nil, + }, + "current number of streams is below the limit": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + streams: 299, + expected: nil, + }, + "current number of streams is above the limit": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + streams: 300, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, 0, 1000, 300), + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + // Mock the ring + ring := &ringCountMock{count: testData.ringIngesterCount} + + // Mock limits + limits, err := validation.NewOverrides(validation.Limits{ + MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, + MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser, + }) + require.NoError(t, err) + + limiter := NewLimiter(limits, ring, testData.ringReplicationFactor) + actual := limiter.AssertMaxStreamsPerUser("test", testData.streams) + + assert.Equal(t, testData.expected, actual) + }) + } +} + +func TestLimiter_minNonZero(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + first int + second int + expected int + }{ + "both zero": { + first: 0, + second: 0, + expected: 0, + }, + "first is zero": { + first: 0, + second: 1, + expected: 1, + }, + "second is zero": { + first: 1, + second: 0, + expected: 1, + }, + "both non zero, second > first": { + first: 1, + second: 2, + expected: 1, + }, + "both non zero, first > second": { + first: 2, + second: 1, + expected: 1, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + limiter := NewLimiter(nil, nil, 0) + assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second)) + }) + } +} + +type ringCountMock struct { + count int +} + +func (m *ringCountMock) HealthyInstancesCount() int { + return m.count +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 62dae688a524..a4ff7a899d84 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -25,7 +25,8 @@ type Limits struct { EnforceMetricName bool `yaml:"enforce_metric_name"` // Ingester enforced limits. - MaxStreamsPerUser int `yaml:"max_streams_per_user"` + MaxLocalStreamsPerUser int `yaml:"max_streams_per_user"` + MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_chunks_per_query"` @@ -51,7 +52,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") - f.IntVar(&l.MaxStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user.") + f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.") + f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.") @@ -160,9 +162,16 @@ func (o *Overrides) CreationGracePeriod(userID string) time.Duration { return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod } -// MaxStreamsPerUser returns the maximum number of streams a user is allowed to store. -func (o *Overrides) MaxStreamsPerUser(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxStreamsPerUser +// MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store +// in a single ingester. +func (o *Overrides) MaxLocalStreamsPerUser(userID string) int { + return o.overridesManager.GetLimits(userID).(*Limits).MaxLocalStreamsPerUser +} + +// MaxGlobalStreamsPerUser returns the maximum number of streams a user is allowed to store +// across the cluster. +func (o *Overrides) MaxGlobalStreamsPerUser(userID string) int { + return o.overridesManager.GetLimits(userID).(*Limits).MaxGlobalStreamsPerUser } // MaxChunksPerQuery returns the maximum number of chunks allowed per query. From fe310598f4810cfaedda3f11f12d9acc395df171 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 9 Jan 2020 14:30:27 +0100 Subject: [PATCH 2/2] Updated changelog Signed-off-by: Marco Pracucci --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b6eb147280b..d8db8f37e767 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ ## master / unreleased +### Features + * [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml. +* [1493](https://github.com/grafana/loki/pull/1493) **pracucci**: pkg/ingester: added a per-cluster limit on the maximum number of series per-user, configured via the `max_global_streams_per_user` config option. # 1.2.0 (2019-12-09)