Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

test: Add test for global rate limiting with load balancing #207

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"
"testing"
"time"

guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
Expand All @@ -34,6 +35,10 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
json "google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -859,6 +864,62 @@ func TestGlobalRateLimits(t *testing.T) {
})
}

func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a non-maintainer of this repo, it looks like a code smell to me that the easiest way to assert this behavior is through a functional test and not a unit test. 🤔

owner := cluster.PeerAt(2).GRPCAddress
peer := cluster.PeerAt(0).GRPCAddress
assert.NotEqual(t, owner, peer)

dialOpts := []grpc.DialOption{
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}

address := fmt.Sprintf("static:///%s,%s", owner, peer)
conn, err := grpc.DialContext(context.Background(), address, dialOpts...)
Comment on lines +878 to +879
Copy link
Contributor

@miparnisari miparnisari Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify what this is doing? it's not clear to me. are you creating a grpc connection to the owner or the peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is creating a static pool of servers to communicate with and creating the connection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connection is to which server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

between the owner and the peer

require.NoError(t, err)

client := guber.NewV1Client(conn)

sendHit := func(status guber.Status, assertion func(resp *guber.RateLimitResp), i int) string {
ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this timeout is waaaay too high. Github actions will not run for more than 10 minutes by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, this test wasnt ready to be merged as is, hence why it was in draft :) - it was simply to reproduce the culprit behaviour that was reported in the bug.

defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_global",
UniqueKey: "account:12345",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Hits: 1,
Limit: 2,
},
},
})
require.NoError(t, err, i)
gotResp := resp.Responses[0]
assert.Equal(t, "", gotResp.GetError(), i)
assert.Equal(t, status, gotResp.GetStatus(), i)

if assertion != nil {
assertion(gotResp)
}

return gotResp.GetMetadata()["owner"]
}

// Send two hits that should be processed by the owner and the peer and deplete the limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be processed by the owner

are you asserting this anywhere? I don't see it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no in this case im relying on the grpc load balancer we passed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we can and should check in order to harden the test

sendHit(guber.Status_UNDER_LIMIT, nil, 1)
sendHit(guber.Status_UNDER_LIMIT, nil, 2)
// sleep to ensure the async forward has occurred and state should be shared
time.Sleep(time.Second * 5)
Comment on lines +915 to +916
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think (not sure) you can control this via setting globalBatchTimeout = 1 and globalSyncWait = 1ms, so you don't need to wait so long here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, use github.com/mailgun/holster/v4/clock to freeze or advance system time during the test.


for i := 0; i < 10; i++ {
sendHit(guber.Status_OVER_LIMIT, nil, i+2)
}
}

func getMetricRequest(t testutil.TestingT, url string, name string) *model.Sample {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -1273,3 +1334,46 @@ func getMetric(t testutil.TestingT, in io.Reader, name string) *model.Sample {
}
return nil
}

// staticBuilder implements the `resolver.Builder` interface.
type staticBuilder struct{}

func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})

}
r, err := newStaticResolver(cc, resolverAddrs)
if err != nil {
return nil, err
}
return r, nil
}

func (sb *staticBuilder) Scheme() string {
return "static"
}

type staticResolver struct {
cc resolver.ClientConn
}

func newStaticResolver(cc resolver.ClientConn, addresses []resolver.Address) (resolver.Resolver, error) {
err := cc.UpdateState(resolver.State{Addresses: addresses})
if err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}