From f1bc87bf8b709d85ae8a02e8fac1cf524ff75fe5 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 16 Jan 2023 02:24:36 +0800 Subject: [PATCH 1/7] add limtier Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 441 +++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 pkg/mcs/resource_manager/client/limiter.go diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go new file mode 100644 index 00000000000..8a05d599b29 --- /dev/null +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -0,0 +1,441 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +const maxRequestTokens = 1e8 + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + mu sync.Mutex + limit Limit + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time + notifyThreshold float64 + lowTokensNotifyChan chan struct{} + isLowProcess bool +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter { + lim := &Limiter{ + limit: r, + last: time.Now(), + tokens: tokens, + lowTokensNotifyChan: lowTokensNotifyChan, + } + log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) + return lim +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. +// Usage example: +// +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(ctx context.Context, now time.Time, n int) *Reservation { + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return &Reservation{ + ok: false, + } + default: + } + // Determine wait limit + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + r := lim.reserveN(now, n, waitLimit) + return &r +} + +// SetupNotificationAt enables the notification at the given threshold. +func (lim *Limiter) SetupNotificationAt(now time.Time, threshold float64) { + lim.advance(now) + lim.notifyThreshold = threshold +} + +// notify tries to send a non-blocking notification on notifyCh and disables +// further notifications (until the next Reconfigure or StartNotification). +func (lim *Limiter) notify() { + if lim.isLowProcess { + return + } + lim.notifyThreshold = 0 + lim.isLowProcess = true + select { + case lim.lowTokensNotifyChan <- struct{}{}: + default: + } +} + +// maybeNotify checks if it's time to send the notification and if so, performs +// the notification. +func (lim *Limiter) maybeNotify(now time.Time) { + if lim.IsLowTokens() { + lim.notify() + } +} + +func (lim *Limiter) IsLowTokens() bool { + if lim.isLowProcess || (lim.notifyThreshold > 0 && lim.tokens < lim.notifyThreshold) { + return true + } + return false +} + +// RemoveTokens decreases the amount of tokens currently available. +func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { + lim.mu.Lock() + defer lim.mu.Unlock() + now, _, tokens := lim.advance(now) + lim.last = now + lim.tokens = tokens - amount + lim.maybeNotify(now) +} + +type tokenBucketReconfigureArgs struct { + NewTokens float64 + + NewRate float64 + + NotifyThreshold float64 +} + +func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs) { + lim.mu.Lock() + defer lim.mu.Unlock() + log.Debug("[resource group controllor] before reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold)) + now, _, tokens := lim.advance(now) + lim.last = now + lim.tokens = tokens + args.NewTokens + lim.limit = Limit(args.NewRate) + lim.notifyThreshold = args.NotifyThreshold + lim.isLowProcess = false + lim.maybeNotify(now) + log.Debug("[resource group controllor] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold)) +} + +// AvailableTokens decreases the amount of tokens currently available. +func (lim *Limiter) AvailableTokens(now time.Time) float64 { + lim.mu.Lock() + defer lim.mu.Unlock() + _, _, tokens := lim.advance(now) + return tokens +} + +// reserveN is a helper method for ReserveN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + defer lim.mu.Unlock() + + if lim.limit == Inf { + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } else if lim.limit == 0 { + var ok bool + if lim.tokens >= float64(n) { + ok = true + lim.tokens -= float64(n) + } + lim.maybeNotify(now) + return Reservation{ + ok: ok, + lim: lim, + tokens: int(lim.tokens), + timeToAct: now, + } + } else if n > maxRequestTokens { + return Reservation{ + ok: false, + lim: lim, + tokens: int(lim.tokens), + timeToAct: now, + } + } + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + lim.maybeNotify(now) + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= maxRequestTokens && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +// advance requires that lim.mu is held. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Calculate the new number of tokens, due to time that passed. + elapsed := now.Sub(last) + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + if limit <= 0 { + return InfDuration + } + seconds := tokens / float64(limit) + return time.Duration(float64(time.Second) * seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + if limit <= 0 { + return 0 + } + return d.Seconds() * float64(limit) +} + +// WaitReservations is used to process a series of reservations +// so that all limiter tokens are returned if one reservation fails +func WaitReservations(now time.Time, ctx context.Context, reservations []*Reservation) error { + if len(reservations) == 0 { + return nil + } + cancel := func() { + for _, res := range reservations { + res.CancelAt(now) + } + } + longestDelayDuration := time.Duration(0) + for _, res := range reservations { + if !res.ok { + cancel() + return fmt.Errorf("[resource group controller] limiter has no enough token") + } + delay := res.DelayFrom(now) + if delay > longestDelayDuration { + longestDelayDuration = delay + } + } + if longestDelayDuration <= 0 { + return nil + } + if longestDelayDuration > 500*time.Millisecond { + log.Warn("[resource group controllor] limiter needs wait ", zap.Time("now", now), zap.Duration("delay", longestDelayDuration)) + } + t := time.NewTimer(longestDelayDuration) + defer t.Stop() + + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + cancel() + return ctx.Err() + } +} From d74e60046da39ce0b42392bbb7234ad74b3a72f4 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 16 Jan 2023 02:25:15 +0800 Subject: [PATCH 2/7] address comment Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index 8a05d599b29..b79b4d401c6 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -238,7 +238,7 @@ func (lim *Limiter) notify() { // maybeNotify checks if it's time to send the notification and if so, performs // the notification. -func (lim *Limiter) maybeNotify(now time.Time) { +func (lim *Limiter) maybeNotify() { if lim.IsLowTokens() { lim.notify() } @@ -258,7 +258,7 @@ func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { now, _, tokens := lim.advance(now) lim.last = now lim.tokens = tokens - amount - lim.maybeNotify(now) + lim.maybeNotify() } type tokenBucketReconfigureArgs struct { @@ -279,7 +279,7 @@ func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs) lim.limit = Limit(args.NewRate) lim.notifyThreshold = args.NotifyThreshold lim.isLowProcess = false - lim.maybeNotify(now) + lim.maybeNotify() log.Debug("[resource group controllor] after reconfigure", zap.Float64("NewTokens", lim.tokens), zap.Float64("NewRate", float64(lim.limit)), zap.Float64("NotifyThreshold", args.NotifyThreshold)) } @@ -311,7 +311,7 @@ func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duratio ok = true lim.tokens -= float64(n) } - lim.maybeNotify(now) + lim.maybeNotify() return Reservation{ ok: ok, lim: lim, @@ -330,7 +330,7 @@ func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duratio // Calculate the remaining number of tokens resulting from the request. tokens -= float64(n) - lim.maybeNotify(now) + lim.maybeNotify() // Calculate the wait duration var waitDuration time.Duration if tokens < 0 { From d68fd7f5e1e78511eb4715531ab0b1412de12e55 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 16 Jan 2023 17:31:19 +0800 Subject: [PATCH 3/7] remove useless code Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 72 +++++----------------- 1 file changed, 16 insertions(+), 56 deletions(-) diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index b79b4d401c6..d629185e702 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -58,26 +58,16 @@ const maxRequestTokens = 1e8 // The zero value is a valid Limiter, but it will reject all events. // Use NewLimiter to create non-zero Limiters. // -// Limiter has three main methods, Allow, Reserve, and Wait. -// Most callers should use Wait. -// -// Each of the three methods consumes a single token. -// They differ in their behavior when no token is available. -// If no token is available, Allow returns false. +// Limiter has one main methods Reserve. // If no token is available, Reserve returns a reservation for a future token -// and the amount of time the caller must wait before using it. -// If no token is available, Wait blocks until one can be obtained +// and the amount of time the caller must wait before using it, // or its associated context.Context is canceled. -// -// The methods AllowN, ReserveN, and WaitN consume n tokens. type Limiter struct { mu sync.Mutex limit Limit tokens float64 // last is the last time the limiter's tokens field was updated - last time.Time - // lastEvent is the latest time of a rate-limited event (past or future) - lastEvent time.Time + last time.Time notifyThreshold float64 lowTokensNotifyChan chan struct{} isLowProcess bool @@ -108,7 +98,7 @@ func NewLimiter(r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Lim type Reservation struct { ok bool lim *Limiter - tokens int + tokens float64 timeToAct time.Time // This is the Limit at reservation time, it can change later. limit Limit @@ -145,8 +135,7 @@ func (r *Reservation) DelayFrom(now time.Time) time.Duration { } // CancelAt indicates that the reservation holder will not perform the reserved action -// and reverses the effects of this Reservation on the rate limit as much as possible, -// considering that other reservations may have already been made. +// and reverses tokens which be refilled into limiter. func (r *Reservation) CancelAt(now time.Time) { if !r.ok { return @@ -158,36 +147,22 @@ func (r *Reservation) CancelAt(now time.Time) { if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return } - - // calculate tokens to restore - // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved - // after r was obtained. These tokens should not be restored. - restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) - if restoreTokens <= 0 { - return - } // advance time to now now, _, tokens := r.lim.advance(now) // calculate new number of tokens - tokens += restoreTokens + tokens += float64(r.tokens) // update state r.lim.last = now r.lim.tokens = tokens - if r.timeToAct == r.lim.lastEvent { - prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) - if !prevEvent.Before(now) { - r.lim.lastEvent = prevEvent - } - } } -// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// Reserve returns a Reservation that indicates how long the caller must wait before n events happen. // The Limiter takes this Reservation into account when allowing future events. -// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. +// The returned Reservation’s OK() method returns false if waitting duration exceeds deadline. // Usage example: // -// r := lim.ReserveN(time.Now(), 1) +// r := lim.Reserve(time.Now(), 1) // if !r.OK() { // // Not allowed to act! Did you remember to set lim.burst to be > 0 ? // return @@ -196,9 +171,7 @@ func (r *Reservation) CancelAt(now time.Time) { // Act() // // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. -// If you need to respect a deadline or cancel the delay, use Wait instead. -// To drop or skip events exceeding rate limit, use Allow instead. -func (lim *Limiter) ReserveN(ctx context.Context, now time.Time, n int) *Reservation { +func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Reservation { // Check if ctx is already cancelled select { case <-ctx.Done(): @@ -244,6 +217,7 @@ func (lim *Limiter) maybeNotify() { } } +// IsLowTokens returns whether the limiter is in low tokens func (lim *Limiter) IsLowTokens() bool { if lim.isLowProcess || (lim.notifyThreshold > 0 && lim.tokens < lim.notifyThreshold) { return true @@ -269,6 +243,7 @@ type tokenBucketReconfigureArgs struct { NotifyThreshold float64 } +// Reconfigure modifies all setting for limiter func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs) { lim.mu.Lock() defer lim.mu.Unlock() @@ -291,10 +266,10 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 { return tokens } -// reserveN is a helper method for ReserveN. +// reserveN is a helper method for Reserve. // maxFutureReserve specifies the maximum reservation wait duration allowed. // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. -func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { +func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() defer lim.mu.Unlock() @@ -305,24 +280,11 @@ func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duratio tokens: n, timeToAct: now, } - } else if lim.limit == 0 { - var ok bool - if lim.tokens >= float64(n) { - ok = true - lim.tokens -= float64(n) - } - lim.maybeNotify() - return Reservation{ - ok: ok, - lim: lim, - tokens: int(lim.tokens), - timeToAct: now, - } } else if n > maxRequestTokens { return Reservation{ ok: false, lim: lim, - tokens: int(lim.tokens), + tokens: lim.tokens, timeToAct: now, } } @@ -354,11 +316,9 @@ func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duratio if ok { lim.last = now lim.tokens = tokens - lim.lastEvent = r.timeToAct } else { lim.last = last } - return r } @@ -399,7 +359,7 @@ func (limit Limit) tokensFromDuration(d time.Duration) float64 { // WaitReservations is used to process a series of reservations // so that all limiter tokens are returned if one reservation fails -func WaitReservations(now time.Time, ctx context.Context, reservations []*Reservation) error { +func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) error { if len(reservations) == 0 { return nil } From ac83f2cd9cb8500a6883bdb1b13478621d615e1d Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 17 Jan 2023 16:18:29 +0800 Subject: [PATCH 4/7] address comment Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index d629185e702..369177fcfa9 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -150,7 +150,7 @@ func (r *Reservation) CancelAt(now time.Time) { // advance time to now now, _, tokens := r.lim.advance(now) // calculate new number of tokens - tokens += float64(r.tokens) + tokens += r.tokens // update state r.lim.last = now @@ -291,7 +291,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. - tokens -= float64(n) + tokens -= n lim.maybeNotify() // Calculate the wait duration var waitDuration time.Duration From 57d526fcede4df6e0bb36d8d42d3c3fa4eded098 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 17 Jan 2023 18:20:59 +0800 Subject: [PATCH 5/7] address comment and add test Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 37 +++++++++++----------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index 369177fcfa9..512a8ddfe11 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -45,8 +45,6 @@ func Every(interval time.Duration) Limit { return 1 / Limit(interval.Seconds()) } -const maxRequestTokens = 1e8 - // A Limiter controls how frequently events are allowed to happen. // It implements a "token bucket" of size b, initially full and refilled // at rate r tokens per second. @@ -63,14 +61,18 @@ const maxRequestTokens = 1e8 // and the amount of time the caller must wait before using it, // or its associated context.Context is canceled. type Limiter struct { - mu sync.Mutex - limit Limit - tokens float64 + mu sync.Mutex + limit Limit + tokens float64 + maxRequestTokens float64 // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 lowTokensNotifyChan chan struct{} - isLowProcess bool + // To prevent too many chan sent, the notifyThreshold is set to 0 after notify. + // So the notifyThreshold cannot show whether the limiter is in the low token state, + // isLowProcess is used to check it. + isLowProcess bool } // Limit returns the maximum overall event rate. @@ -82,11 +84,12 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter { +func NewLimiter(r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter { lim := &Limiter{ limit: r, last: time.Now(), tokens: tokens, + maxRequestTokens: maxRequestTokens, lowTokensNotifyChan: lowTokensNotifyChan, } log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) @@ -144,7 +147,7 @@ func (r *Reservation) CancelAt(now time.Time) { r.lim.mu.Lock() defer r.lim.mu.Unlock() - if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + if r.lim.limit == Inf || r.tokens == 0 { return } // advance time to now @@ -189,8 +192,8 @@ func (lim *Limiter) Reserve(ctx context.Context, now time.Time, n float64) *Rese return &r } -// SetupNotificationAt enables the notification at the given threshold. -func (lim *Limiter) SetupNotificationAt(now time.Time, threshold float64) { +// SetupNotificationThreshold enables the notification at the given threshold. +func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) { lim.advance(now) lim.notifyThreshold = threshold } @@ -268,7 +271,7 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 { // reserveN is a helper method for Reserve. // maxFutureReserve specifies the maximum reservation wait duration allowed. -// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +// reserveN returns Reservation, not *Reservation. func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() defer lim.mu.Unlock() @@ -280,19 +283,16 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur tokens: n, timeToAct: now, } - } else if n > maxRequestTokens { + } else if n > lim.maxRequestTokens { return Reservation{ - ok: false, - lim: lim, - tokens: lim.tokens, - timeToAct: now, + ok: false, + lim: lim, } } now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. tokens -= n - lim.maybeNotify() // Calculate the wait duration var waitDuration time.Duration if tokens < 0 { @@ -300,7 +300,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur } // Decide result - ok := n <= maxRequestTokens && waitDuration <= maxFutureReserve + ok := n <= lim.maxRequestTokens && waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{ @@ -316,6 +316,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur if ok { lim.last = now lim.tokens = tokens + lim.maybeNotify() } else { lim.last = last } From 0c46eafcb86fe02ff74f6a0aea96472e2fc07f65 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 17 Jan 2023 18:21:28 +0800 Subject: [PATCH 6/7] address comment and add test Signed-off-by: Cabinfever_B --- .../resource_manager/client/limiter_test.go | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 pkg/mcs/resource_manager/client/limiter_test.go diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go new file mode 100644 index 00000000000..11e3e1eb2a2 --- /dev/null +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -0,0 +1,170 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + d = 1 * time.Second +) + +var ( + t0 = time.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t6 = t0.Add(time.Duration(6) * d) + t7 = t0.Add(time.Duration(7) * d) + t8 = t0.Add(time.Duration(8) * d) +) + +type request struct { + t time.Time + n float64 + act time.Time + ok bool +} + +// dFromDuration converts a duration to the nearest multiple of the global constant d. +func dFromDuration(dur time.Duration) int { + // Add d/2 to dur so that integer division will round to + // the nearest multiple instead of truncating. + // (We don't care about small inaccuracies.) + return int((dur + (d / 2)) / d) +} + +// dSince returns multiples of d since t0 +func dSince(t time.Time) int { + return dFromDuration(t.Sub(t0)) +} + +func runReserveMax(t *testing.T, lim *Limiter, req request) *Reservation { + return runReserve(t, lim, req, InfDuration) +} + +func runReserve(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation { + t.Helper() + r := lim.reserveN(req.t, req.n, maxReserve) + if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok { + t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)", + dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok) + } + return &r +} + +func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected float64) { + re.LessOrEqual(math.Abs(expected-lim.AvailableTokens(t)), 1e-2) +} + +func TestSimpleReserve(t *testing.T) { + lim := NewLimiter(1, 2, 1000, make(chan struct{}, 1)) + + runReserveMax(t, lim, request{t0, 3, t1, true}) + runReserveMax(t, lim, request{t0, 3, t4, true}) + runReserveMax(t, lim, request{t3, 2, t6, true}) + + runReserve(t, lim, request{t3, 2, t7, false}, time.Second*4) + runReserveMax(t, lim, request{t5, 2000, t6, false}) + + runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8) +} + +func TestReconfig(t *testing.T) { + re := require.New(t) + lim := NewLimiter(1, 2, 1000, make(chan struct{}, 1)) + + runReserveMax(t, lim, request{t0, 4, t2, true}) + args := tokenBucketReconfigureArgs{ + NewTokens: 6., + NewRate: 2, + } + lim.Reconfigure(t1, args) + checkTokens(re, lim, t1, 5) + checkTokens(re, lim, t2, 7) +} + +func TestNotify(t *testing.T) { + nc := make(chan struct{}, 1) + lim := NewLimiter(1, 0, 1000, nc) + + args := tokenBucketReconfigureArgs{ + NewTokens: 1000., + NewRate: 2, + NotifyThreshold: 400, + } + lim.Reconfigure(t1, args) + runReserveMax(t, lim, request{t2, 1000, t2, true}) + select { + case <-nc: + default: + t.Errorf("no notify") + } +} + +func TestCancel(t *testing.T) { + ctx := context.Background() + ctx1, cancel1 := context.WithDeadline(ctx, t2) + re := require.New(t) + nc := make(chan struct{}, 1) + lim1 := NewLimiter(1, 10, 100, nc) + lim2 := NewLimiter(1, 0, 100, nc) + + r1 := runReserveMax(t, lim1, request{t0, 5, t0, true}) + checkTokens(re, lim1, t0, 5) + r1.CancelAt(t1) + checkTokens(re, lim1, t1, 11) + + r1 = lim1.Reserve(ctx, t1, 5) + r2 := lim2.Reserve(ctx1, t1, 5) + checkTokens(re, lim1, t2, 7) + checkTokens(re, lim2, t2, 2) + err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) + re.Error(err) + checkTokens(re, lim1, t3, 13) + checkTokens(re, lim2, t3, 3) + cancel1() + + ctx2, cancel2 := context.WithCancel(ctx) + r1 = lim1.Reserve(ctx, t3, 5) + r2 = lim2.Reserve(ctx2, t3, 5) + checkTokens(re, lim1, t3, 8) + checkTokens(re, lim2, t3, -2) + var wg sync.WaitGroup + wg.Add(1) + go func() { + err := WaitReservations(ctx2, t3, []*Reservation{r1, r2}) + re.Error(err) + wg.Done() + }() + time.Sleep(1 * time.Second) + cancel2() + wg.Wait() + checkTokens(re, lim1, t5, 15) + checkTokens(re, lim2, t5, 5) +} From 122c52b4dded73807bc6282034cf693cbbe97a9f Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 17 Jan 2023 18:40:10 +0800 Subject: [PATCH 7/7] fix test Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/client/limiter.go | 4 ++-- pkg/mcs/resource_manager/client/limiter_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index 512a8ddfe11..76825a2e5e5 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -84,10 +84,10 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, tokens, maxRequestTokens float64, lowTokensNotifyChan chan struct{}) *Limiter { lim := &Limiter{ limit: r, - last: time.Now(), + last: now, tokens: tokens, maxRequestTokens: maxRequestTokens, lowTokensNotifyChan: lowTokensNotifyChan, diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go index 11e3e1eb2a2..0a0ae48d7c6 100644 --- a/pkg/mcs/resource_manager/client/limiter_test.go +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -79,11 +79,11 @@ func runReserve(t *testing.T, lim *Limiter, req request, maxReserve time.Duratio } func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected float64) { - re.LessOrEqual(math.Abs(expected-lim.AvailableTokens(t)), 1e-2) + re.LessOrEqual(math.Abs(expected-lim.AvailableTokens(t)), 1e-7) } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(1, 2, 1000, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -97,7 +97,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(1, 2, 1000, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 2, 1000, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -111,7 +111,7 @@ func TestReconfig(t *testing.T) { func TestNotify(t *testing.T) { nc := make(chan struct{}, 1) - lim := NewLimiter(1, 0, 1000, nc) + lim := NewLimiter(t0, 1, 0, 1000, nc) args := tokenBucketReconfigureArgs{ NewTokens: 1000., @@ -132,8 +132,8 @@ func TestCancel(t *testing.T) { ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) nc := make(chan struct{}, 1) - lim1 := NewLimiter(1, 10, 100, nc) - lim2 := NewLimiter(1, 0, 100, nc) + lim1 := NewLimiter(t0, 1, 10, 100, nc) + lim2 := NewLimiter(t0, 1, 0, 100, nc) r1 := runReserveMax(t, lim1, request{t0, 5, t0, true}) checkTokens(re, lim1, t0, 5)