-
Notifications
You must be signed in to change notification settings - Fork 691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ratelimit interceptor #181
Changes from 3 commits
2dbe578
5f7557b
6a919f1
4b75c00
02ac7e2
199f608
b708755
821b156
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# grpc_ratelimit | ||
`import "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit"` | ||
|
||
* [Overview](#pkg-overview) | ||
* [Imported Packages](#pkg-imports) | ||
* [Index](#pkg-index) | ||
* [Examples](#pkg-examples) | ||
|
||
## <a name="pkg-overview">Overview</a> | ||
`grpc_ratelimit` a generic server-side ratelimit middleware for gRPC. | ||
|
||
### Server Side Ratelimit Middleware | ||
It allows to use your own rate limiter (e.g. token bucket, leaky bucket, etc.) to do grpc rate limit. | ||
|
||
`ratelimit/tokenbucket`provides an implementation based on token bucket `github.com/juju/ratelimit`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. double space |
||
|
||
Please see examples for simple examples of use. | ||
|
||
#### Example: | ||
|
||
<details> | ||
<summary>Click to expand code.</summary> | ||
|
||
```go | ||
// Create unary/stream rateLimiters, based on token bucket here. | ||
// You can implement your own ratelimiter for the interface. | ||
unaryRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 10, 10) | ||
streamRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 5, 5) | ||
_ = grpc.NewServer( | ||
grpc_middleware.WithUnaryServerChain( | ||
grpc_ratelimit.UnaryServerInterceptor( | ||
grpc_ratelimit.WithLimiter(unaryRateLimiter), | ||
grpc_ratelimit.WithMaxWaitDuration(10*time.Second), | ||
), | ||
), | ||
grpc_middleware.WithStreamServerChain( | ||
grpc_ratelimit.StreamServerInterceptor( | ||
grpc_ratelimit.WithLimiter(streamRateLimiter), | ||
grpc_ratelimit.WithMaxWaitDuration(5*time.Second), | ||
), | ||
), | ||
) | ||
``` | ||
|
||
</details> | ||
|
||
## <a name="pkg-imports">Imported Packages</a> | ||
|
||
- [golang.org/x/net/context](https://godoc.org/golang.org/x/net/context) | ||
- [google.golang.org/grpc](https://godoc.org/google.golang.org/grpc) | ||
- [google.golang.org/grpc/codes](https://godoc.org/google.golang.org/grpc/codes) | ||
- [google.golang.org/grpc/status](https://godoc.org/google.golang.org/grpc/status) | ||
|
||
## <a name="pkg-index">Index</a> | ||
* [func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor](#StreamServerInterceptor) | ||
* [func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor](#UnaryServerInterceptor) | ||
* [type Limiter](#Limiter) | ||
* [type Option](#Option) | ||
* [func WithLimiter(l Limiter) Option](#WithLimiter) | ||
* [func WithMaxWaitDuration(maxWaitDuration time.Duration) Option](#WithMaxWaitDuration) | ||
|
||
#### <a name="pkg-examples">Examples</a> | ||
* [Package (Initialization)](#example__initialization) | ||
|
||
#### <a name="pkg-files">Package files</a> | ||
[doc.go](./doc.go) [options.go](./options.go) [ratelimit.go](./ratelimit.go) | ||
|
||
## <a name="StreamServerInterceptor">func</a> [StreamServerInterceptor](./ratelimit.go#L55) | ||
``` go | ||
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor | ||
``` | ||
StreamServerInterceptor returns a new stream server interceptors that performs request rate limit. | ||
|
||
## <a name="UnaryServerInterceptor">func</a> [UnaryServerInterceptor](./ratelimit.go#L41) | ||
``` go | ||
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor | ||
``` | ||
UnaryServerInterceptor returns a new unary server interceptors that performs request rate limit. | ||
|
||
## <a name="Limiter">type</a> [Limiter](./ratelimit.go#L15-L17) | ||
``` go | ||
type Limiter interface { | ||
WaitMaxDuration(time.Duration) bool | ||
} | ||
``` | ||
|
||
## <a name="Option">type</a> [Option](./options.go#L10) | ||
``` go | ||
type Option func(*rateLimiter) | ||
``` | ||
|
||
### <a name="WithLimiter">func</a> [WithLimiter](./options.go#L13) | ||
``` go | ||
func WithLimiter(l Limiter) Option | ||
``` | ||
WithLimiter customizes your limiter in the middleware | ||
|
||
### <a name="WithMaxWaitDuration">func</a> [WithMaxWaitDuration](./options.go#L23) | ||
``` go | ||
func WithMaxWaitDuration(maxWaitDuration time.Duration) Option | ||
``` | ||
WithMaxWaitDuration customizes maxWaitDuration in limiter's WaitMaxDuration action. | ||
|
||
- - - | ||
Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DOC.md |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
// Copyright 2018 Zheng Dayu. All Rights Reserved. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove :) |
||
// See LICENSE for licensing terms. | ||
|
||
/* | ||
`grpc_ratelimit` a generic server-side ratelimit middleware for gRPC. | ||
|
||
Server Side Ratelimit Middleware | ||
|
||
It allows to use your own rate limiter (e.g. token bucket, leaky bucket, etc.) to do grpc rate limit. | ||
|
||
`ratelimit/tokenbucket`provides an implementation based on token bucket `github.com/juju/ratelimit`. | ||
|
||
Please see examples for simple examples of use. | ||
*/ | ||
package grpc_ratelimit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's not continue this anti-pattern |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
// Copyright 2018 Zheng Dayu. All Rights Reserved. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove |
||
// See LICENSE for licensing terms. | ||
|
||
package grpc_ratelimit_test | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/grpc-ecosystem/go-grpc-middleware" | ||
"github.com/grpc-ecosystem/go-grpc-middleware/ratelimit" | ||
"github.com/grpc-ecosystem/go-grpc-middleware/ratelimit/tokenbucket" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
// Simple example of server initialization code. | ||
func Example_initialization() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this just be |
||
// Create unary/stream rateLimiters, based on token bucket here. | ||
// You can implement your own ratelimiter for the interface. | ||
unaryRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 10, 10) | ||
streamRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 5, 5) | ||
_ = grpc.NewServer( | ||
grpc_middleware.WithUnaryServerChain( | ||
grpc_ratelimit.UnaryServerInterceptor( | ||
grpc_ratelimit.WithLimiter(unaryRateLimiter), | ||
grpc_ratelimit.WithMaxWaitDuration(10*time.Second), | ||
), | ||
), | ||
grpc_middleware.WithStreamServerChain( | ||
grpc_ratelimit.StreamServerInterceptor( | ||
grpc_ratelimit.WithLimiter(streamRateLimiter), | ||
grpc_ratelimit.WithMaxWaitDuration(5*time.Second), | ||
), | ||
), | ||
) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
// Copyright 2018 Zheng Dayu. All Rights Reserved. | ||
// See LICENSE for licensing terms. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may as well remove all of this and let people view the LICENSE file ... when i get time will clean the rest of the codebase |
||
|
||
package grpc_ratelimit | ||
|
||
import "time" | ||
|
||
const infinityDuration time.Duration = 0x7fffffffffffffff | ||
|
||
type Option func(*rateLimiter) | ||
|
||
// WithLimiter customizes your limiter in the middleware | ||
func WithLimiter(l Limiter) Option { | ||
return func(r *rateLimiter) { | ||
r.limiter = l | ||
if r.maxWaitDuration == 0 { | ||
r.maxWaitDuration = infinityDuration | ||
} | ||
} | ||
} | ||
|
||
// WithMaxWaitDuration customizes maxWaitDuration in limiter's WaitMaxDuration action. | ||
func WithMaxWaitDuration(maxWaitDuration time.Duration) Option { | ||
return func(r *rateLimiter) { | ||
r.maxWaitDuration = maxWaitDuration | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
// Copyright 2018 Zheng Dayu. All Rights Reserved. | ||
// See LICENSE for licensing terms. | ||
|
||
package grpc_ratelimit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
|
||
import ( | ||
"time" | ||
|
||
"golang.org/x/net/context" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
type Limiter interface { | ||
WaitMaxDuration(time.Duration) bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This interface feels as though it is just for the specific implementation implemented in this PR. Should probably be something like |
||
} | ||
|
||
type rateLimiter struct { | ||
limiter Limiter | ||
maxWaitDuration time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will all rate limiters utilise this? |
||
} | ||
|
||
func (r *rateLimiter) Wait() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. feels like this should be Limit and on the interface |
||
return r.limiter.WaitMaxDuration(r.maxWaitDuration) | ||
} | ||
|
||
type emptyLimiter struct{} | ||
|
||
func (e *emptyLimiter) WaitMaxDuration(time.Duration) bool { | ||
return true | ||
} | ||
|
||
func emptyRatelimiter() *rateLimiter { | ||
return &rateLimiter{ | ||
limiter: &emptyLimiter{}, | ||
} | ||
} | ||
|
||
// UnaryServerInterceptor returns a new unary server interceptors that performs request rate limit. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { | ||
ratelimiter := emptyRatelimiter() | ||
for _, opt := range opts { | ||
opt(ratelimiter) | ||
} | ||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { | ||
if ratelimiter.Wait() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets invert the if ...
|
||
return handler(ctx, req) | ||
} | ||
return nil, status.Errorf(codes.ResourceExhausted, "%s is rejected by grpc_ratelimit middleare, please retry later.", info.FullMethod) | ||
} | ||
} | ||
|
||
// StreamServerInterceptor returns a new stream server interceptors that performs request rate limit. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor { | ||
ratelimiter := emptyRatelimiter() | ||
for _, opt := range opts { | ||
opt(ratelimiter) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could get rid of the opts here ... from what i can see there is only a single opt ... the ratelimiter that the user is passing in.
|
||
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { | ||
if ratelimiter.Wait() { | ||
return handler(srv, stream) | ||
} | ||
return status.Errorf(codes.ResourceExhausted, "%s is rejected by grpc_ratelimit middleare, please retry later.", info.FullMethod) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again invert this logic to have the happy path non inverted |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package grpc_ratelimit | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/metadata" | ||
|
||
"github.com/grpc-ecosystem/go-grpc-middleware/ratelimit/tokenbucket" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
const errMsgFake = "fake error" | ||
|
||
func TestEmptyUnaryServerInterceptor(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
interceptor := UnaryServerInterceptor() | ||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { | ||
return nil, errors.New(errMsgFake) | ||
} | ||
var ctx context.Context | ||
var req interface{} | ||
var info *grpc.UnaryServerInfo | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the onces that can define them in a var block
|
||
req2, err := interceptor(ctx, req, info, handler) | ||
assert.Nil(t, req2) | ||
assert.EqualError(t, err, errMsgFake) | ||
} | ||
|
||
func TestRateLimitUnaryServerInterceptor(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above |
||
unaryRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 1, 1) | ||
interceptor := UnaryServerInterceptor( | ||
WithLimiter(unaryRateLimiter), | ||
WithMaxWaitDuration(1*time.Millisecond), | ||
) | ||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { | ||
return nil, errors.New(errMsgFake) | ||
} | ||
var ctx context.Context | ||
var req interface{} | ||
var info *grpc.UnaryServerInfo | ||
req2, err := interceptor(ctx, req, info, handler) | ||
assert.Nil(t, req2) | ||
assert.EqualError(t, err, errMsgFake) | ||
} | ||
|
||
func TestRateLimitStreamServerInterceptor(t *testing.T) { | ||
unaryRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 1, 1) | ||
interceptor := StreamServerInterceptor( | ||
WithLimiter(unaryRateLimiter), | ||
WithMaxWaitDuration(1*time.Millisecond), | ||
) | ||
handler := func(srv interface{}, stream grpc.ServerStream) error { | ||
return errors.New(errMsgFake) | ||
} | ||
var srv interface{} | ||
var ss *mockServerStream | ||
var info *grpc.StreamServerInfo | ||
err := interceptor(srv, ss, info, handler) | ||
assert.EqualError(t, err, errMsgFake) | ||
} | ||
|
||
type mockServerStream struct{} | ||
|
||
func (mss *mockServerStream) SetHeader(metadata.MD) error { | ||
return nil | ||
} | ||
|
||
func (mss *mockServerStream) SendHeader(metadata.MD) error { | ||
return nil | ||
} | ||
|
||
func (mss *mockServerStream) SetTrailer(metadata.MD) {} | ||
|
||
func (mss *mockServerStream) Context() context.Context { | ||
return context.Background() | ||
} | ||
|
||
func (mss *mockServerStream) SendMsg(m interface{}) error { | ||
return nil | ||
} | ||
|
||
func (mss *mockServerStream) RecvMsg(m interface{}) error { | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// Copyright 2018 Zheng Dayu. All Rights Reserved. | ||
// See LICENSE for licensing terms. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
|
||
package tokenbucket | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/juju/ratelimit" | ||
) | ||
|
||
type tokenBucketLimiter struct { | ||
limiter *ratelimit.Bucket | ||
} | ||
|
||
// NewTokenBucketRateLimiter creates a tokenBucketLimiter. | ||
func NewTokenBucketRateLimiter(fillInterval time.Duration, capacity, quantum int64) *tokenBucketLimiter { | ||
return &tokenBucketLimiter{ | ||
limiter: ratelimit.NewBucketWithQuantum(fillInterval, capacity, quantum), | ||
} | ||
} | ||
|
||
// WaitMaxDuration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please fill out comment |
||
func (b *tokenBucketLimiter) WaitMaxDuration(maxWaitDuration time.Duration) bool { | ||
return b.limiter.WaitMaxDuration(1, maxWaitDuration) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package tokenbucket | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are getting the end user to pass in the rate limiter we can remove the whole tokenbucket package from this codebase and not depoend on juju ourselves. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the package can be kept in order to give user an easy implementation of And also user can implement his own limiter if it is not enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can give an example without needing to import the dependency, the thinking behind passing in the rate limiter is so that we do not have to maintain the rate limiters but just wrap them to meet the Limiter interface that has been defined. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👌remove it from repo |
||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestNewTokenBucketRateLimiter(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can we add tests for both the limited and not limited calls |
||
l := NewTokenBucketRateLimiter(1*time.Millisecond, 1, 1) | ||
ok := l.WaitMaxDuration(1 * time.Millisecond) | ||
assert.True(t, ok) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we tidy up this sentence?