Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Improve logging and add metrics to streams dropped by stream limit #2012

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"github.com/grafana/loki/pkg/util/validation"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -129,13 +130,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {

var appendErr error
for _, s := range req.Streams {
labels, err := util.ToClientLabels(s.Labels)
if err != nil {
appendErr = err
continue
}

stream, err := i.getOrCreateStream(labels)
stream, err := i.getOrCreateStream(s)
if err != nil {
appendErr = err
continue
Expand All @@ -153,7 +149,11 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

Expand All @@ -162,8 +162,14 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}

Expand Down
13 changes: 4 additions & 9 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/util"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"

Expand Down Expand Up @@ -124,15 +122,12 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}

err = inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}})
require.NoError(t, err)

// let's verify results.
ls, err := util.ToClientLabels(lbls)
pr := &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

s, err := inst.getOrCreateStream(ls)
// let's verify results
s, err := inst.getOrCreateStream(pr.Streams[0])
require.NoError(t, err)

// make sure each chunk spans max 'sync period' time
Expand Down
30 changes: 13 additions & 17 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded"
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)"
)

// RingCount is the interface exposed by a ring implementation which allows
Expand Down Expand Up @@ -37,32 +37,28 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
actualLimit := l.maxStreamsPerUser(userID)
if streams < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxStreamsPerUser(userID string) int {
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}

if streams < calculatedLimit {
return nil
}

return localLimit
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
Expand Down
104 changes: 39 additions & 65 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,112 +11,86 @@ import (
"github.com/grafana/loki/pkg/util/validation"
)

func TestLimiter_maxStreamsPerUser(t *testing.T) {
func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
expected int
streams int
expected error
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
streams: 100,
expected: nil,
},
"current number of streams is below the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 299,
expected: nil,
},
"current number of streams is above the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 300,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 300, 300, 0, 1000, 300),
},
"both local and global limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: math.MaxInt32,
streams: math.MaxInt32 - 1,
expected: nil,
},
"only local limit is enabled": {
maxLocalStreamsPerUser: 1000,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: 1000,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 1000, 1000, 0, 0),
},
"only global limit is enabled with replication-factor=1": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 1,
ringIngesterCount: 10,
expected: 100,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 100, 0, 1000, 100),
},
"only global limit is enabled with replication-factor=3": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 0, 1000, 300),
},
"both local and global limits are set with local limit < global limit": {
maxLocalStreamsPerUser: 150,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 150,
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 150, 150, 1000, 300),
},
"both local and global limits are set with local limit > global limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{count: testData.ringIngesterCount}

// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
actual := limiter.maxStreamsPerUser("test")

assert.Equal(t, testData.expected, actual)
})
}
}

func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
streams int
expected error
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
streams: 100,
expected: nil,
},
"current number of streams is below the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 299,
expected: nil,
},
"current number of streams is above the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 300,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, 0, 1000, 300),
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
RateLimited = "rate_limited"
// LineTooLong is a reason for discarding too long log lines.
LineTooLong = "line_too_long"
// StreamLimit is a reason for discarding lines when we can't create a new stream
// because the limit of active streams has been reached.
StreamLimit = "stream_limit"
)

// DiscardedBytes is a metric of the total discarded bytes, by reason.
Expand Down