Skip to content

Commit

Permalink
controller: fix limiter cannot work well in high concurrency scenario
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jul 24, 2024
1 parent 9f5522e commit d3ab23b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
8 changes: 6 additions & 2 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
}
// Update state
if ok {
lim.last = now
if lim.last.Before(now) {
lim.last = now
}
lim.tokens = tokens
lim.maybeNotify()
} else {
Expand All @@ -424,7 +426,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
zap.String("name", lim.name))
}
lim.last = last
if lim.last.Before(now) {
lim.last = last
}
if lim.limit == 0 {
lim.notify()
} else if lim.remainingNotifyTimes > 0 {
Expand Down
23 changes: 11 additions & 12 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,27 @@ func TestQPS(t *testing.T) {
cases := []struct {
concurrency int
reserveN int64
RU_PER_SEC int64
ruPerSec int64
}{
{10000, 10, 400000},
{1000, 10, 400000},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.RU_PER_SEC)
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.ruPerSec), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.ruPerSec)
t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(1))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(1))
re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(10)*float64(tc.reserveN))
re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(10))
})
}
}

const testCaseRunTime = 3 * time.Second
const testCaseRunTime = 4 * time.Second

func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) {
func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) {
nc := make(chan notifyMsg, 1)
lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc)
ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

var wg sync.WaitGroup
var totalRequests int64
Expand All @@ -268,7 +267,7 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64
}
}()
}
qps := float64(0)
var vQPS atomic.Value
var wait time.Duration
ch := make(chan struct{})
go func() {
Expand All @@ -280,15 +279,15 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64
break
}
windowRequests = atomic.SwapInt64(&totalRequests, 0)
qps = float64(windowRequests)
vQPS.Store(float64(windowRequests))
r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN))
fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait)
wait = r.Delay()
time.Sleep(1 * time.Second)
}
}()
<-ch
cancel()
wg.Wait()
qps = vQPS.Load().(float64)
return qps, qps * float64(reserveN), wait
}

0 comments on commit d3ab23b

Please sign in to comment.