Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ratelimit interceptor #181

Merged
merged 8 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ratelimit/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// See LICENSE for licensing terms.

/*
`ratelimit` a generic server-side ratelimit middleware for gRPC.

Server Side Ratelimit Middleware

It allows to do grpc rate limit by your own rate limiter (e.g. token bucket, leaky bucket, etc.)

`ratelimit/tokenbucket`provides an implementation based on token bucket `github.com/juju/ratelimit`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update ... I dont think this is true anymore :)


Please see examples for simple examples of use.
*/
package ratelimit
33 changes: 33 additions & 0 deletions ratelimit/examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2018 Zheng Dayu. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove

// See LICENSE for licensing terms.

package ratelimit_test

import (
"time"

"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/ratelimit"
"github.com/grpc-ecosystem/go-grpc-middleware/ratelimit/tokenbucket"
"google.golang.org/grpc"
)

// Simple example of server initialization code.
func Example() {
// Create unary/stream rateLimiters, based on token bucket here.
// You can implement your own ratelimiter for the interface.
unaryRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 10, 10, 10*time.Second)
streamRateLimiter := tokenbucket.NewTokenBucketRateLimiter(1*time.Second, 5, 5, 5*time.Second)
_ = grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
ratelimit.UnaryServerInterceptor(
ratelimit.WithRateLimiter(unaryRateLimiter),
),
),
grpc_middleware.WithStreamServerChain(
ratelimit.StreamServerInterceptor(
ratelimit.WithRateLimiter(streamRateLimiter),
),
),
)
}
16 changes: 16 additions & 0 deletions ratelimit/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// See LICENSE for licensing terms.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may as well remove all of this and let people view the LICENSE file ... when i get time will clean the rest of the codebase


package ratelimit

import "time"

const infinityDuration time.Duration = 0x7fffffffffffffff

type Option func(*rateLimiter)

// WithRateLimiter customizes your limiter in the middleware
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be a full sentence ... how does it customize? It does give much info to the consumer.

func WithRateLimiter(l Limiter) Option {
return func(r *rateLimiter) {
r.limiter = l
}
}
63 changes: 63 additions & 0 deletions ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// See LICENSE for licensing terms.

package ratelimit

import (
"fmt"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Limiter interface {
Limit() bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation on the the type and func :)

}

type rateLimiter struct {
limiter Limiter
}

type emptyLimiter struct{}

func (e *emptyLimiter) Limit() bool {
return false
}

func emptyRatelimiter() *rateLimiter {
return &rateLimiter{
limiter: &emptyLimiter{},
}
}

// UnaryServerInterceptor returns a new unary server interceptors that performs request rate limit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit > limiting

func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
ratelimiter := emptyRatelimiter()
for _, opt := range opts {
opt(ratelimiter)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if ratelimiter.limiter.Limit() {
return nil, status.Errorf(codes.ResourceExhausted, "%s is rejected by grpc_ratelimit middleare, please retry later.", info.FullMethod)
}
return handler(ctx, req)
}
}

// StreamServerInterceptor returns a new stream server interceptor that performs rate limiting on the request.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
ratelimiter := emptyRatelimiter()
fmt.Println(ratelimiter.limiter.Limit())
for _, opt := range opts {
opt(ratelimiter)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could get rid of the opts here ... from what i can see there is only a single opt ... the ratelimiter that the user is passing in.
Lets change it to be something like:

func UnaryServerInterceptor(limiter Limiter) grpc.UnaryServerInterceptor {

fmt.Println(ratelimiter.limiter.Limit())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove Println

return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fmt.Println(ratelimiter.limiter.Limit())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

if ratelimiter.limiter.Limit() {
return status.Errorf(codes.ResourceExhausted, "%s is rejected by grpc_ratelimit middleare, please retry later.", info.FullMethod)
}
return handler(srv, stream)
}
}
106 changes: 106 additions & 0 deletions ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package ratelimit

import (
"context"
"errors"
"testing"

"google.golang.org/grpc"

"github.com/stretchr/testify/assert"
)

const errMsgFake = "fake error"

func TestUnaryServerInterceptor_NoLimit(t *testing.T) {
interceptor := UnaryServerInterceptor()
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New(errMsgFake)
}
req, err := interceptor(nil, nil, nil, handler)
assert.Nil(t, req)
assert.EqualError(t, err, errMsgFake)
}

type mockPassLimiter struct{}

func (*mockPassLimiter) Limit() bool {
return false
}

func TestUnaryServerInterceptor_RateLimitPass(t *testing.T) {
unaryRateLimiter := &mockPassLimiter{}
interceptor := UnaryServerInterceptor(
WithRateLimiter(unaryRateLimiter),
)
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New(errMsgFake)
}
info := &grpc.UnaryServerInfo{
FullMethod: "FakeMethod",
}
req, err := interceptor(nil, nil, info, handler)
assert.Nil(t, req)
assert.EqualError(t, err, errMsgFake)
}

type mockFailLimiter struct{}

func (*mockFailLimiter) Limit() bool {
return true
}

func TestUnaryServerInterceptor_RateLimitFail(t *testing.T) {
unaryRateLimiter := &mockFailLimiter{}
interceptor := UnaryServerInterceptor(
WithRateLimiter(unaryRateLimiter),
)
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, errors.New(errMsgFake)
}
info := &grpc.UnaryServerInfo{
FullMethod: "FakeMethod",
}
req, err := interceptor(nil, nil, info, handler)
assert.Nil(t, req)
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = FakeMethod is rejected by grpc_ratelimit middleare, please retry later.")
}

func TestStreamServerInterceptor_NoLimit(t *testing.T) {
interceptor := StreamServerInterceptor()
handler := func(srv interface{}, stream grpc.ServerStream) error {
return errors.New(errMsgFake)
}
err := interceptor(nil, nil, nil, handler)
assert.EqualError(t, err, errMsgFake)
}

func TestStreamServerInterceptor_RateLimitPass(t *testing.T) {
streamRateLimiter := &mockPassLimiter{}
interceptor := StreamServerInterceptor(
WithRateLimiter(streamRateLimiter),
)
handler := func(srv interface{}, stream grpc.ServerStream) error {
return errors.New(errMsgFake)
}
info := &grpc.StreamServerInfo{
FullMethod: "FakeMethod",
}
err := interceptor(nil, nil, info, handler)
assert.EqualError(t, err, errMsgFake)
}

func TestStreamServerInterceptor_RateLimitFail(t *testing.T) {
streamRateLimiter := &mockFailLimiter{}
interceptor := StreamServerInterceptor(
WithRateLimiter(streamRateLimiter),
)
handler := func(srv interface{}, stream grpc.ServerStream) error {
return errors.New(errMsgFake)
}
info := &grpc.StreamServerInfo{
FullMethod: "FakeMethod",
}
err := interceptor(nil, nil, info, handler)
assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = FakeMethod is rejected by grpc_ratelimit middleare, please retry later.")
}
28 changes: 28 additions & 0 deletions ratelimit/tokenbucket/tokenbucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2018 Zheng Dayu. All Rights Reserved.
// See LICENSE for licensing terms.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above


package tokenbucket

import (
"time"

"github.com/juju/ratelimit"
)

type tokenBucketLimiter struct {
limiter *ratelimit.Bucket
maxWaitDuration time.Duration
}

// NewTokenBucketRateLimiter creates a tokenBucketLimiter.
func NewTokenBucketRateLimiter(fillInterval time.Duration, capacity, quantum int64, maxWaitDuration time.Duration) *tokenBucketLimiter {
return &tokenBucketLimiter{
limiter: ratelimit.NewBucketWithQuantum(fillInterval, capacity, quantum),
maxWaitDuration: maxWaitDuration,
}
}

// Limit takes 1 token from the bucket only if it needs to wait for no greater than maxWaitDuration
func (b *tokenBucketLimiter) Limit() bool {
return !b.limiter.WaitMaxDuration(1, b.maxWaitDuration)
}
22 changes: 22 additions & 0 deletions ratelimit/tokenbucket/tokenbucket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tokenbucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are getting the end user to pass in the rate limiter we can remove the whole tokenbucket package from this codebase and not depoend on juju ourselves.
The user would then wrap it using the Limiter interface so that we allow them to use any rate limit algorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the package can be kept in order to give user an easy implementation of Limiter.

And also user can implement his own limiter if it is not enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can give an example without needing to import the dependency, the thinking behind passing in the rate limiter is so that we do not have to maintain the rate limiters but just wrap them to meet the Limiter interface that has been defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌remove it from repo


import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTokenBucketRateLimiter_LimitPass(t *testing.T) {
l := NewTokenBucketRateLimiter(1*time.Millisecond, 1, 1, 1*time.Millisecond)
ok := l.Limit()
assert.False(t, ok)
}

func TestTokenBucketRateLimiter_LimitFail(t *testing.T) {
l := NewTokenBucketRateLimiter(10*time.Second, 1, 1, 1*time.Millisecond)
ok := l.Limit()
assert.False(t, ok)
ok2 := l.Limit()
assert.True(t, ok2)
}