Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support multi-zone ingesters when converting global to local limits for streams in limiter.go #13321

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,10 @@ func (r *readRingMock) ZonesCount() int {
return 1
}

func (r *readRingMock) HealthyInstancesInZoneCount() int {
return len(r.replicationSet.Instances)
}

func (r *readRingMock) Subring(_ uint32, _ int) ring.ReadRing {
return r
}
Expand Down
39 changes: 25 additions & 14 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
// to count members
type RingCount interface {
HealthyInstancesCount() int
HealthyInstancesInZoneCount() int
ZonesCount() int
}

type Limits interface {
Expand Down Expand Up @@ -84,7 +86,7 @@ func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLi
// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit, localLimit)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit)
Expand All @@ -105,24 +107,33 @@ func (l *Limiter) minNonZero(first, second int) int {
return first
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
func (l *Limiter) convertGlobalToLocalLimit(globalLimit, maxStreamsPerUser int) int {
JordanRushing marked this conversation as resolved.
Show resolved Hide resolved
JordanRushing marked this conversation as resolved.
Show resolved Hide resolved
if globalLimit == 0 {
return 0
}
// todo: change to healthyInstancesInZoneCount() once
// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()

// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))

zonesCount := l.ring.ZonesCount()

if zonesCount <= 1 {
numIngesters := l.ring.HealthyInstancesCount()
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}
return 0
}

return 0
ingestersInZone := l.ring.HealthyInstancesInZoneCount()
if ingestersInZone == 0 {
return 0 // Avoid division by zero
}

newLimit := int(float64(globalLimit) * float64(l.replicationFactor) * float64(zonesCount) / float64(ingestersInZone))

if maxStreamsPerUser > 0 && (newLimit == 0 || maxStreamsPerUser < newLimit) {
JordanRushing marked this conversation as resolved.
Show resolved Hide resolved
return maxStreamsPerUser
}

return newLimit
}

type supplier[T any] func() T
Expand Down
8 changes: 8 additions & 0 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ func (m *ringCountMock) HealthyInstancesCount() int {
return m.count
}

func (m *ringCountMock) ZonesCount() int {
return 1
}

func (m *ringCountMock) HealthyInstancesInZoneCount() int {
return m.count
}

// Assert some of the weirder (bug?) behavior of golang.org/x/time/rate
func TestGoLimiter(t *testing.T) {
for _, tc := range []struct {
Expand Down
Loading