Skip to content

Commit 8135a2a

Browse files
committed
Add a test
1 parent ef7d373 commit 8135a2a

File tree

1 file changed

+208
-12
lines changed

1 file changed

+208
-12
lines changed

balancer/leastrequest/balancer_test.go

+208-12
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,9 @@ func (s) TestParseConfig(t *testing.T) {
117117
}
118118
}
119119

120-
// setupBackends spins up three test backends, each listening on a port on
121-
// localhost. The three backends always reply with an empty response with no
122-
// error, and for streaming receive until hitting an EOF error.
123-
func setupBackends(t *testing.T) []string {
124-
t.Helper()
125-
const numBackends = 3
126-
addresses := make([]string, numBackends)
127-
// Construct and start three working backends.
120+
func startBackends(t *testing.T, numBackends int) []*stubserver.StubServer {
121+
backends := make([]*stubserver.StubServer, 0, numBackends)
122+
// Construct and start working backends.
128123
for i := 0; i < numBackends; i++ {
129124
backend := &stubserver.StubServer{
130125
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
@@ -140,7 +135,21 @@ func setupBackends(t *testing.T) []string {
140135
}
141136
t.Logf("Started good TestService backend at: %q", backend.Address)
142137
t.Cleanup(func() { backend.Stop() })
143-
addresses[i] = backend.Address
138+
backends = append(backends, backend)
139+
}
140+
return backends
141+
}
142+
143+
// setupBackends spins up three test backends, each listening on a port on
144+
// localhost. The three backends always reply with an empty response with no
145+
// error, and for streaming receive until hitting an EOF error.
146+
func setupBackends(t *testing.T, numBackends int) []string {
147+
t.Helper()
148+
addresses := make([]string, numBackends)
149+
backends := startBackends(t, numBackends)
150+
// Construct and start working backends.
151+
for i := 0; i < numBackends; i++ {
152+
addresses[i] = backends[i].Address
144153
}
145154
return addresses
146155
}
@@ -205,7 +214,7 @@ func (s) TestLeastRequestE2E(t *testing.T) {
205214
index++
206215
return ret
207216
}
208-
addresses := setupBackends(t)
217+
addresses := setupBackends(t, 3)
209218

210219
mr := manual.NewBuilderWithScheme("lr-e2e")
211220
defer mr.Close()
@@ -321,7 +330,7 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
321330
index++
322331
return ret
323332
}
324-
addresses := setupBackends(t)
333+
addresses := setupBackends(t, 3)
325334

326335
mr := manual.NewBuilderWithScheme("lr-e2e")
327336
defer mr.Close()
@@ -462,7 +471,7 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
462471
// and makes 100 RPCs asynchronously. This makes sure no race conditions happen
463472
// in this scenario.
464473
func (s) TestConcurrentRPCs(t *testing.T) {
465-
addresses := setupBackends(t)
474+
addresses := setupBackends(t, 3)
466475

467476
mr := manual.NewBuilderWithScheme("lr-e2e")
468477
defer mr.Close()
@@ -508,5 +517,192 @@ func (s) TestConcurrentRPCs(t *testing.T) {
508517
}()
509518
}
510519
wg.Wait()
520+
}
521+
522+
// Test tests that the least request balancer persists RPC counts once it gets
523+
// new picker updates and backends within an endpoint go down. It first updates
524+
// the balancer with two endpoints having two addresses each. It verifies the
525+
// requests are round robined across the first address of each endpoint. It then
526+
// stops the active backend in endpoint[0]. It verified that the balancer starts
527+
// using the second address in endpoint[0]. The test then creates a bunch of
528+
// streams on two endpoints. Then, it updates the balancer with three endpoints,
529+
// including the two previous. Any created streams should then be started on the
530+
// new endpoint. The test shuts down the active backed in endpoint[1] and
531+
// endpoint[2]. The test verifies that new RPCs are round robined across the
532+
// active backends in endpoint[1] and endpoint[2].
533+
func (s) TestLeastRequestEndpoints_MultipleAddresses(t *testing.T) {
534+
defer func(u func() uint32) {
535+
randuint32 = u
536+
}(randuint32)
537+
var index int
538+
indexes := []uint32{
539+
0, 0, 1, 1,
540+
}
541+
randuint32 = func() uint32 {
542+
ret := indexes[index%len(indexes)]
543+
index++
544+
return ret
545+
}
546+
backends := startBackends(t, 6)
547+
mr := manual.NewBuilderWithScheme("lr-e2e")
548+
defer mr.Close()
549+
550+
// Configure least request as top level balancer of channel.
551+
lrscJSON := `
552+
{
553+
"loadBalancingConfig": [
554+
{
555+
"least_request_experimental": {
556+
"choiceCount": 2
557+
}
558+
}
559+
]
560+
}`
561+
endpoints := []resolver.Endpoint{
562+
{Addresses: []resolver.Address{{Addr: backends[0].Address}, {Addr: backends[1].Address}}},
563+
{Addresses: []resolver.Address{{Addr: backends[2].Address}, {Addr: backends[3].Address}}},
564+
{Addresses: []resolver.Address{{Addr: backends[4].Address}, {Addr: backends[5].Address}}},
565+
}
566+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
567+
firstTwoEndpoints := []resolver.Endpoint{endpoints[0], endpoints[1]}
568+
mr.InitialState(resolver.State{
569+
Endpoints: firstTwoEndpoints,
570+
ServiceConfig: sc,
571+
})
572+
573+
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
574+
if err != nil {
575+
t.Fatalf("grpc.NewClient() failed: %v", err)
576+
}
577+
defer cc.Close()
578+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
579+
defer cancel()
580+
testServiceClient := testgrpc.NewTestServiceClient(cc)
581+
582+
// Wait for the two backends to round robin across. The happens because a
583+
// child pickfirst transitioning into READY causes a new picker update. Once
584+
// the picker update with the two backends is present, this test can start
585+
// to populate those backends with streams.
586+
wantAddrs := []resolver.Address{
587+
endpoints[0].Addresses[0],
588+
endpoints[1].Addresses[0],
589+
}
590+
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
591+
t.Fatalf("error in expected round robin: %v", err)
592+
}
593+
594+
// Shut down one of the addresses in endpoints[0], the child pickfirst
595+
// should fallback to the next address in endpoints[0].
596+
backends[0].Stop()
597+
wantAddrs = []resolver.Address{
598+
endpoints[0].Addresses[1],
599+
endpoints[1].Addresses[0],
600+
}
601+
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
602+
t.Fatalf("error in expected round robin: %v", err)
603+
}
604+
605+
// Start 50 streaming RPCs, and leave them unfinished for the duration of
606+
// the test. This will populate the first two endpoints with many active
607+
// RPCs.
608+
for i := 0; i < 50; i++ {
609+
_, err := testServiceClient.FullDuplexCall(ctx)
610+
if err != nil {
611+
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
612+
}
613+
}
511614

615+
// Update the least request balancer to choice count 3. Also update the
616+
// address list adding a third endpoint. Alongside the injected randomness,
617+
// this should trigger the least request balancer to search all created
618+
// endpoints. Thus, since endpoint 3 is the new endpoint and the first two
619+
// endpoint are populated with RPCs, once the picker update of all 3 READY
620+
// pickfirsts takes effect, all new streams should be started on endpoint 3.
621+
index = 0
622+
indexes = []uint32{
623+
0, 1, 2, 3, 4, 5,
624+
}
625+
lrscJSON = `
626+
{
627+
"loadBalancingConfig": [
628+
{
629+
"least_request_experimental": {
630+
"choiceCount": 3
631+
}
632+
}
633+
]
634+
}`
635+
sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
636+
mr.UpdateState(resolver.State{
637+
Endpoints: endpoints,
638+
ServiceConfig: sc,
639+
})
640+
newAddress := endpoints[2].Addresses[0]
641+
// Poll for only endpoint 3 to show up. This requires a polling loop because
642+
// picker update with all three endpoints doesn't take into effect
643+
// immediately, needs the third pickfirst to become READY.
644+
if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil {
645+
t.Fatalf("error in expected round robin: %v", err)
646+
}
647+
648+
// Start 25 rpcs, but don't finish them. They should all start on endpoint 3,
649+
// since the first two endpoints both have 25 RPCs (and randomness
650+
// injection/choiceCount causes all 3 to be compared every iteration).
651+
for i := 0; i < 25; i++ {
652+
stream, err := testServiceClient.FullDuplexCall(ctx)
653+
if err != nil {
654+
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
655+
}
656+
p, ok := peer.FromContext(stream.Context())
657+
if !ok {
658+
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
659+
}
660+
if p.Addr.String() != newAddress.Addr {
661+
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), newAddress)
662+
}
663+
}
664+
665+
// Now 25 RPC's are active on each endpoint, the next three RPC's should
666+
// round robin, since choiceCount is three and the injected random indexes
667+
// cause it to search all three endpoints for fewest outstanding requests on
668+
// each iteration.
669+
wantAddrCount := map[string]int{
670+
endpoints[0].Addresses[1].Addr: 1,
671+
endpoints[1].Addresses[0].Addr: 1,
672+
endpoints[2].Addresses[0].Addr: 1,
673+
}
674+
gotAddrCount := make(map[string]int)
675+
for i := 0; i < len(endpoints); i++ {
676+
stream, err := testServiceClient.FullDuplexCall(ctx)
677+
if err != nil {
678+
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
679+
}
680+
p, ok := peer.FromContext(stream.Context())
681+
if !ok {
682+
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
683+
}
684+
if p.Addr != nil {
685+
gotAddrCount[p.Addr.String()]++
686+
}
687+
}
688+
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
689+
t.Fatalf("addr count (-got:, +want): %v", diff)
690+
}
691+
692+
// Shutdown the active address for endpoint[1] and endpoint[2]. This should
693+
// result in their streams failing. Now the requests should roundrobin b/w
694+
// endpoint[1] and endpoint[2].
695+
backends[2].Stop()
696+
backends[4].Stop()
697+
index = 0
698+
indexes = []uint32{
699+
0, 1, 2, 2, 1, 0,
700+
}
701+
wantAddrs = []resolver.Address{
702+
endpoints[1].Addresses[1],
703+
endpoints[2].Addresses[1],
704+
}
705+
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
706+
t.Fatalf("error in expected round robin: %v", err)
707+
}
512708
}

0 commit comments

Comments
 (0)