Skip to content

Commit

Permalink
*: Add Limiter Config (#4839)
Browse files Browse the repository at this point in the history
ref #4666

Add Rate Limiter Config for server

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored May 31, 2022
1 parent b92303c commit 52dd587
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 36 deletions.
27 changes: 20 additions & 7 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ import (
"golang.org/x/time/rate"
)

// DimensionConfig is the limit dimension config of one label
type DimensionConfig struct {
// qps conifg
QPS float64
QPSBurst int
// concurrency config
ConcurrencyLimit uint64
}

// Limiter is a controller for the request rate.
type Limiter struct {
qpsLimiter sync.Map
Expand All @@ -30,7 +39,9 @@ type Limiter struct {

// NewLimiter returns a global limiter which can be updated in the later.
func NewLimiter() *Limiter {
return &Limiter{labelAllowList: make(map[string]struct{})}
return &Limiter{
labelAllowList: make(map[string]struct{}),
}
}

// Allow is used to check whether it has enough token.
Expand Down Expand Up @@ -65,10 +76,12 @@ func (l *Limiter) Release(label string) {
}

// Update is used to update Ratelimiter with Options
func (l *Limiter) Update(label string, opts ...Option) {
func (l *Limiter) Update(label string, opts ...Option) UpdateStatus {
var status UpdateStatus
for _, opt := range opts {
opt(label, l)
status |= opt(label, l)
}
return status
}

// GetQPSLimiterStatus returns the status of a given label's QPS limiter.
Expand All @@ -80,8 +93,8 @@ func (l *Limiter) GetQPSLimiterStatus(label string) (limit rate.Limit, burst int
return 0, 0
}

// DeleteQPSLimiter deletes QPS limiter of given label
func (l *Limiter) DeleteQPSLimiter(label string) {
// QPSUnlimit deletes QPS limiter of the given label
func (l *Limiter) QPSUnlimit(label string) {
l.qpsLimiter.Delete(label)
}

Expand All @@ -94,8 +107,8 @@ func (l *Limiter) GetConcurrencyLimiterStatus(label string) (limit uint64, curre
return 0, 0
}

// DeleteConcurrencyLimiter deletes concurrency limiter of given label
func (l *Limiter) DeleteConcurrencyLimiter(label string) {
// ConcurrencyUnlimit deletes concurrency limiter of the given label
func (l *Limiter) ConcurrencyUnlimit(label string) {
l.concurrencyLimiter.Delete(label)
}

Expand Down
45 changes: 29 additions & 16 deletions pkg/ratelimit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ func (s *testRatelimiterSuite) TestUpdateConcurrencyLimiter(c *C) {
limiter := NewLimiter()

label := "test"
for _, opt := range opts {
opt(label, limiter)
}
status := limiter.Update(label, opts...)
c.Assert(status&ConcurrencyChanged != 0, IsTrue)
var lock sync.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
Expand All @@ -57,7 +56,11 @@ func (s *testRatelimiterSuite) TestUpdateConcurrencyLimiter(c *C) {
c.Assert(limit, Equals, uint64(10))
c.Assert(current, Equals, uint64(0))

limiter.Update(label, UpdateConcurrencyLimiter(5))
status = limiter.Update(label, UpdateConcurrencyLimiter(10))
c.Assert(status&ConcurrencyNoChange != 0, IsTrue)

status = limiter.Update(label, UpdateConcurrencyLimiter(5))
c.Assert(status&ConcurrencyChanged != 0, IsTrue)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand All @@ -71,7 +74,8 @@ func (s *testRatelimiterSuite) TestUpdateConcurrencyLimiter(c *C) {
limiter.Release(label)
}

limiter.DeleteConcurrencyLimiter(label)
status = limiter.Update(label, UpdateConcurrencyLimiter(0))
c.Assert(status&ConcurrencyDeleted != 0, IsTrue)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -99,21 +103,21 @@ func (s *testRatelimiterSuite) TestBlockList(c *C) {
}
c.Assert(limiter.IsInAllowList(label), Equals, true)

UpdateQPSLimiter(rate.Every(time.Second), 1)(label, limiter)
status := UpdateQPSLimiter(float64(rate.Every(time.Second)), 1)(label, limiter)
c.Assert(status&InAllowList != 0, Equals, true)
for i := 0; i < 10; i++ {
c.Assert(limiter.Allow(label), Equals, true)
}
}

func (s *testRatelimiterSuite) TestUpdateQPSLimiter(c *C) {
c.Parallel()
opts := []Option{UpdateQPSLimiter(rate.Every(time.Second), 1)}
opts := []Option{UpdateQPSLimiter(float64(rate.Every(time.Second)), 1)}
limiter := NewLimiter()

label := "test"
for _, opt := range opts {
opt(label, limiter)
}
status := limiter.Update(label, opts...)
c.Assert(status&QPSChanged != 0, IsTrue)

var lock sync.Mutex
successCount, failedCount := 0, 0
Expand All @@ -130,7 +134,11 @@ func (s *testRatelimiterSuite) TestUpdateQPSLimiter(c *C) {
c.Assert(limit, Equals, rate.Limit(1))
c.Assert(burst, Equals, 1)

limiter.Update(label, UpdateQPSLimiter(5, 5))
status = limiter.Update(label, UpdateQPSLimiter(float64(rate.Every(time.Second)), 1))
c.Assert(status&QPSNoChange != 0, IsTrue)

status = limiter.Update(label, UpdateQPSLimiter(5, 5))
c.Assert(status&QPSChanged != 0, IsTrue)
limit, burst = limiter.GetQPSLimiterStatus(label)
c.Assert(limit, Equals, rate.Limit(5))
c.Assert(burst, Equals, 5)
Expand All @@ -144,7 +152,9 @@ func (s *testRatelimiterSuite) TestUpdateQPSLimiter(c *C) {
}
}
time.Sleep(time.Second)
limiter.DeleteQPSLimiter(label)

status = limiter.Update(label, UpdateQPSLimiter(0, 0))
c.Assert(status&QPSDeleted != 0, IsTrue)
for i := 0; i < 10; i++ {
c.Assert(limiter.Allow(label), Equals, true)
}
Expand All @@ -155,7 +165,7 @@ func (s *testRatelimiterSuite) TestUpdateQPSLimiter(c *C) {

func (s *testRatelimiterSuite) TestQPSLimiter(c *C) {
c.Parallel()
opts := []Option{UpdateQPSLimiter(rate.Every(3*time.Second), 100)}
opts := []Option{UpdateQPSLimiter(float64(rate.Every(3*time.Second)), 100)}
limiter := NewLimiter()

label := "test"
Expand Down Expand Up @@ -184,9 +194,12 @@ func (s *testRatelimiterSuite) TestQPSLimiter(c *C) {

func (s *testRatelimiterSuite) TestTwoLimiters(c *C) {
c.Parallel()
opts := []Option{UpdateQPSLimiter(100, 100),
UpdateConcurrencyLimiter(100),
cfg := &DimensionConfig{
QPS: 100,
QPSBurst: 100,
ConcurrencyLimit: 100,
}
opts := []Option{UpdateDimensionConfig(cfg)}
limiter := NewLimiter()

label := "test"
Expand Down Expand Up @@ -217,7 +230,7 @@ func (s *testRatelimiterSuite) TestTwoLimiters(c *C) {
for i := 0; i < 100; i++ {
limiter.Release(label)
}
limiter.Update(label, UpdateQPSLimiter(rate.Every(10*time.Second), 1))
limiter.Update(label, UpdateQPSLimiter(float64(rate.Every(10*time.Second)), 1))
wg.Add(100)
for i := 0; i < 100; i++ {
go CountRateLimiterHandleResult(limiter, label, &successCount, &failedCount, &lock, &wg)
Expand Down
88 changes: 75 additions & 13 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,101 @@ package ratelimit

import "golang.org/x/time/rate"

// UpdateStatus is flags for updating limiter config.
type UpdateStatus uint32

// Flags for limiter.
const (
eps float64 = 1e-8
// QPSNoChange shows that limiter's config isn't changed.
QPSNoChange UpdateStatus = 1 << iota
// QPSChanged shows that limiter's config is changed and not deleted.
QPSChanged
// QPSDeleted shows that limiter's config is deleted.
QPSDeleted
// ConcurrencyNoChange shows that limiter's config isn't changed.
ConcurrencyNoChange
// ConcurrencyChanged shows that limiter's config is changed and not deleted.
ConcurrencyChanged
// ConcurrencyDeleted shows that limiter's config is deleted.
ConcurrencyDeleted
// InAllowList shows that limiter's config isn't changed because it is in in allow list.
InAllowList
)

// Option is used to create a limiter with the optional settings.
// these setting is used to add a kind of limiter for a service
type Option func(string, *Limiter)
type Option func(string, *Limiter) UpdateStatus

// AddLabelAllowList adds a label into allow list.
// It means the given label will not be limited
func AddLabelAllowList() Option {
return func(label string, l *Limiter) {
return func(label string, l *Limiter) UpdateStatus {
l.labelAllowList[label] = struct{}{}
return 0
}
}

func updateConcurrencyConfig(l *Limiter, label string, limit uint64) UpdateStatus {
oldConcurrencyLimit, _ := l.GetConcurrencyLimiterStatus(label)
if oldConcurrencyLimit == limit {
return ConcurrencyNoChange
}
if limit < 1 {
l.ConcurrencyUnlimit(label)
return ConcurrencyDeleted
}
if limiter, exist := l.concurrencyLimiter.LoadOrStore(label, newConcurrencyLimiter(limit)); exist {
limiter.(*concurrencyLimiter).setLimit(limit)
}
return ConcurrencyChanged
}

func updateQPSConfig(l *Limiter, label string, limit float64, burst int) UpdateStatus {
oldQPSLimit, oldBurst := l.GetQPSLimiterStatus(label)

if (float64(oldQPSLimit)-limit < eps && float64(oldQPSLimit)-limit > -eps) && oldBurst == burst {
return QPSNoChange
}
if limit <= eps || burst < 1 {
l.QPSUnlimit(label)
return QPSDeleted
}
if limiter, exist := l.qpsLimiter.LoadOrStore(label, NewRateLimiter(limit, burst)); exist {
limiter.(*RateLimiter).SetLimit(rate.Limit(limit))
limiter.(*RateLimiter).SetBurst(burst)
}
return QPSChanged
}

// UpdateConcurrencyLimiter creates a concurrency limiter for a given label if it doesn't exist.
func UpdateConcurrencyLimiter(limit uint64) Option {
return func(label string, l *Limiter) {
return func(label string, l *Limiter) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return
}
if limiter, exist := l.concurrencyLimiter.LoadOrStore(label, newConcurrencyLimiter(limit)); exist {
limiter.(*concurrencyLimiter).setLimit(limit)
return InAllowList
}
return updateConcurrencyConfig(l, label, limit)
}
}

// UpdateQPSLimiter creates a QPS limiter for a given label if it doesn't exist.
func UpdateQPSLimiter(limit rate.Limit, burst int) Option {
return func(label string, l *Limiter) {
func UpdateQPSLimiter(limit float64, burst int) Option {
return func(label string, l *Limiter) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return
return InAllowList
}
if limiter, exist := l.qpsLimiter.LoadOrStore(label, NewRateLimiter(float64(limit), burst)); exist {
limiter.(*RateLimiter).SetLimit(limit)
limiter.(*RateLimiter).SetBurst(burst)
return updateQPSConfig(l, label, limit, burst)
}
}

// UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist.
func UpdateDimensionConfig(cfg *DimensionConfig) Option {
return func(label string, l *Limiter) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return InAllowList
}
status := updateQPSConfig(l, label, cfg.QPS, cfg.QPSBurst)
status |= updateConcurrencyConfig(l, label, cfg.ConcurrencyLimit)
return status
}
}

0 comments on commit 52dd587

Please sign in to comment.