Skip to content

Commit a3a8657

Browse files
authored
clusterimpl: update picker synchronously on config update (#7652)
1 parent 74738cf commit a3a8657

File tree

3 files changed

+204
-171
lines changed

3 files changed

+204
-171
lines changed

xds/internal/balancer/clusterimpl/balancer_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,74 @@ func (s) TestFailedToParseChildPolicyConfig(t *testing.T) {
943943
}
944944
}
945945

946+
// Test verify that the case picker is updated synchronously on receipt of
947+
// configuration update.
948+
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
949+
// Override the pickerUpdateHook to be notified that picker was updated.
950+
pickerUpdated := make(chan struct{}, 1)
951+
origNewPickerUpdated := pickerUpdateHook
952+
pickerUpdateHook = func() {
953+
pickerUpdated <- struct{}{}
954+
}
955+
defer func() { pickerUpdateHook = origNewPickerUpdated }()
956+
957+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
958+
defer cancel()
959+
// Override the clientConnUpdateHook to ensure client conn was updated.
960+
clientConnUpdateDone := make(chan struct{}, 1)
961+
origClientConnUpdateHook := clientConnUpdateHook
962+
clientConnUpdateHook = func() {
963+
// Verify that picker was updated before the completion of
964+
// client conn update.
965+
select {
966+
case <-pickerUpdated:
967+
case <-ctx.Done():
968+
t.Fatal("Client conn update completed before picker update.")
969+
}
970+
clientConnUpdateDone <- struct{}{}
971+
}
972+
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()
973+
974+
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
975+
xdsC := fakeclient.NewClient()
976+
977+
builder := balancer.Get(Name)
978+
cc := testutils.NewBalancerClientConn(t)
979+
b := builder.Build(cc, balancer.BuildOptions{})
980+
defer b.Close()
981+
982+
// Create a stub balancer which waits for the cluster_impl policy to be
983+
// closed before sending a picker update (upon receipt of a resolver
984+
// update).
985+
stub.Register(t.Name(), stub.BalancerFuncs{
986+
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
987+
bd.ClientConn.UpdateState(balancer.State{
988+
Picker: base.NewErrPicker(errors.New("dummy error picker")),
989+
})
990+
return nil
991+
},
992+
})
993+
994+
if err := b.UpdateClientConnState(balancer.ClientConnState{
995+
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
996+
BalancerConfig: &LBConfig{
997+
Cluster: testClusterName,
998+
EDSServiceName: testServiceName,
999+
ChildPolicy: &internalserviceconfig.BalancerConfig{
1000+
Name: t.Name(),
1001+
},
1002+
},
1003+
}); err != nil {
1004+
t.Fatalf("Unexpected error from UpdateClientConnState: %v", err)
1005+
}
1006+
1007+
select {
1008+
case <-clientConnUpdateDone:
1009+
case <-ctx.Done():
1010+
t.Fatal("Timed out waiting for client conn update to be completed.")
1011+
}
1012+
}
1013+
9461014
func assertString(f func() (string, error)) string {
9471015
s, err := f()
9481016
if err != nil {

0 commit comments

Comments
 (0)