From 08ebd15cd9efa3c035ddfe3c50273011f9e13941 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Mon, 1 Jul 2024 16:53:41 -0700 Subject: [PATCH 1/9] xds: Use the connected address for locality (#7357) --- balancer/balancer.go | 15 +++++ balancer_wrapper.go | 11 +++- clientconn.go | 59 ++++++++++++------- internal/internal.go | 7 +++ .../balancer/clusterimpl/clusterimpl.go | 20 ++++++- .../clusterimpl/tests/balancer_test.go | 11 +--- 6 files changed, 90 insertions(+), 33 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index f391744f7299..3fc19e15d9cd 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -72,8 +72,20 @@ func unregisterForTesting(name string) { delete(m, name) } +// getConnectedAddress returns the connected address for a SubConnState. +func getConnectedAddress(scs SubConnState) (resolver.Address, bool) { + return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready +} + +// setConnectedAddress sets the connected address for a SubConnState. +func setConnectedAddress(scs *SubConnState, addr resolver.Address) { + scs.connectedAddress = addr +} + func init() { internal.BalancerUnregister = unregisterForTesting + internal.GetConnectedAddress = getConnectedAddress + internal.SetConnectedAddress = setConnectedAddress } // Get returns the resolver builder registered with the given name. @@ -410,6 +422,9 @@ type SubConnState struct { // ConnectionError is set if the ConnectivityState is TransientFailure, // describing the reason the SubConn failed. Otherwise, it is nil. ConnectionError error + // connectedAddr contains the connected address when ConnectivityState is Ready. Otherwise, it is + // indeterminate. + connectedAddress resolver.Address } // ClientConnState describes the state of a ClientConn relevant to the diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 4161fdf47a8b..39577d5c8210 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + grpcinternal "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -252,7 +253,7 @@ type acBalancerWrapper struct { // updateState is invoked by grpc to push a subConn state update to the // underlying balancer. -func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { +func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) { acbw.ccb.serializer.Schedule(func(ctx context.Context) { if ctx.Err() != nil || acbw.ccb.balancer == nil { return @@ -260,7 +261,13 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { // Even though it is optional for balancers, gracefulswitch ensures // opts.StateListener is set, so this cannot ever be nil. // TODO: delete this comment when UpdateSubConnState is removed. - acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err} + if s == connectivity.Ready { + if SetConnectedAddress, ok := grpcinternal.SetConnectedAddress.(func(state *balancer.SubConnState, addr resolver.Address)); ok { + SetConnectedAddress(&scs, curAddr) + } + } + acbw.stateListener(scs) }) } diff --git a/clientconn.go b/clientconn.go index 423be7b43b00..a218f4b4085b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -53,7 +53,9 @@ import ( const ( // minimum time to give a connection to complete - minConnectTimeout = 20 * time.Second + minConnectTimeout = 20 * time.Second + withBalancerAttributes = true + withoutBalancerAttributes = false ) var ( @@ -812,16 +814,26 @@ func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { cc.csMgr.updateState(connectivity.TransientFailure) } -// Makes a copy of the input addresses slice and clears out the balancer -// attributes field. Addresses are passed during subconn creation and address -// update operations. In both cases, we will clear the balancer attributes by -// calling this function, and therefore we will be able to use the Equal method -// provided by the resolver.Address type for comparison. -func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address { +// addressWithoutBalancerAttributes returns a copy of the input address with +// the BalancerAttributes field cleared. +func addressWithoutBalancerAttributes(a resolver.Address) resolver.Address { + a.BalancerAttributes = nil + return a +} + +// Makes a copy of the input addresses slice and optionally clears out the +// balancer attributes field. Addresses are passed during subconn creation and +// address update operations. In both cases, we may clear the balancer +// attributes by calling this function, which would therefore allow us to use +// the Equal method provided by the resolver.Address type for comparison. +func copyAddresses(in []resolver.Address, includeBalancerAttributes bool) []resolver.Address { out := make([]resolver.Address, len(in)) for i := range in { - out[i] = in[i] - out[i].BalancerAttributes = nil + if includeBalancerAttributes { + out[i] = in[i] + } else { + out[i] = addressWithoutBalancerAttributes(in[i]) + } } return out } @@ -837,7 +849,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer. ac := &addrConn{ state: connectivity.Idle, cc: cc, - addrs: copyAddressesWithoutBalancerAttributes(addrs), + addrs: copyAddresses(addrs, withBalancerAttributes), scopts: opts, dopts: cc.dopts, channelz: channelz.RegisterSubChannel(cc.channelz, ""), @@ -924,12 +936,18 @@ func (ac *addrConn) connect() error { return nil } -func equalAddresses(a, b []resolver.Address) bool { +func equalAddressIgnoreBalancerAttributes(a, b resolver.Address) bool { + return a.Addr == b.Addr && a.ServerName == b.ServerName && + a.Attributes.Equal(b.Attributes) && + a.Metadata == b.Metadata +} + +func equalAddressesIgnoreBalancerAttributes(a, b []resolver.Address) bool { if len(a) != len(b) { return false } for i, v := range a { - if !v.Equal(b[i]) { + if !equalAddressIgnoreBalancerAttributes(v, b[i]) { return false } } @@ -939,15 +957,15 @@ func equalAddresses(a, b []resolver.Address) bool { // updateAddrs updates ac.addrs with the new addresses list and handles active // connections or connection attempts. func (ac *addrConn) updateAddrs(addrs []resolver.Address) { - addrs = copyAddressesWithoutBalancerAttributes(addrs) + addrs = copyAddresses(addrs, withBalancerAttributes) limit := len(addrs) if limit > 5 { limit = 5 } - channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit]) + channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), copyAddresses(addrs[:limit], withoutBalancerAttributes)) ac.mu.Lock() - if equalAddresses(ac.addrs, addrs) { + if equalAddressesIgnoreBalancerAttributes(ac.addrs, addrs) { ac.mu.Unlock() return } @@ -966,7 +984,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { // Try to find the connected address. for _, a := range addrs { a.ServerName = ac.cc.getServerName(a) - if a.Equal(ac.curAddr) { + if equalAddressIgnoreBalancerAttributes(a, ac.curAddr) { // We are connected to a valid address, so do nothing but // update the addresses. ac.mu.Unlock() @@ -1214,7 +1232,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) } else { channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) } - ac.acbw.updateState(s, lastErr) + ac.acbw.updateState(s, ac.curAddr, lastErr) } // adjustParams updates parameters used to create transports upon @@ -1347,6 +1365,7 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c // new transport. func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { addr.ServerName = ac.cc.getServerName(addr) + addrWithoutBalancerAttributes := addressWithoutBalancerAttributes(addr) hctx, hcancel := context.WithCancel(ctx) onClose := func(r transport.GoAwayReason) { @@ -1381,14 +1400,14 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, defer cancel() copts.ChannelzParent = ac.channelz - newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose) + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addrWithoutBalancerAttributes, copts, onClose) if err != nil { if logger.V(2) { - logger.Infof("Creating new client transport to %q: %v", addr, err) + logger.Infof("Creating new client transport to %q: %v", addrWithoutBalancerAttributes, err) } // newTr is either nil, or closed. hcancel() - channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err) + channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addrWithoutBalancerAttributes, err) return err } diff --git a/internal/internal.go b/internal/internal.go index 5d6653986923..a0166184e0ac 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -208,6 +208,13 @@ var ( // ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n // is the number of elements. swap swaps the elements with indexes i and j. ShuffleAddressListForTesting any // func(n int, swap func(i, j int)) + + // GetConnectedAddress returns the connected address for a SubConnState and + // whether the address is valid based on the state. + GetConnectedAddress any // func (scs SubConnState) (resolver.Address, bool) + + // SetConnectedAddress sets the connected address for a SubConnState. + SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address) ) // HealthChecker defines the signature of the client-side LB channel health diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 164f3099d280..b3238c380ce4 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + grpcinternal "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" @@ -366,14 +367,27 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer lID = xdsinternal.GetLocalityID(newAddrs[i]) } var sc balancer.SubConn + ret := &scWrapper{} oldListener := opts.StateListener - opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) } + opts.StateListener = func(state balancer.SubConnState) { + b.updateSubConnState(sc, state, oldListener) + // Read connected address and call updateLocalityID() based on the connected address's locality. + // https://github.com/grpc/grpc-go/issues/7339 + if GetConnectedAddress, ok := grpcinternal.GetConnectedAddress.(func(state balancer.SubConnState) (resolver.Address, bool)); ok { + if addr, ok := GetConnectedAddress(state); ok { + // TODO: Why is lID empty when running the test? The locality info is being lost somehow. + lID := xdsinternal.GetLocalityID(addr) + if !lID.Equal(xdsinternal.LocalityID{}) { + ret.updateLocalityID(lID) + } + } + } + } sc, err := b.ClientConn.NewSubConn(newAddrs, opts) if err != nil { return nil, err } - // Wrap this SubConn in a wrapper, and add it to the map. - ret := &scWrapper{SubConn: sc} + ret.SubConn = sc ret.updateLocalityID(lID) return ret, nil } diff --git a/xds/internal/balancer/clusterimpl/tests/balancer_test.go b/xds/internal/balancer/clusterimpl/tests/balancer_test.go index 4a5c13b8e6b4..d2a6b6d7f757 100644 --- a/xds/internal/balancer/clusterimpl/tests/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/tests/balancer_test.go @@ -310,14 +310,9 @@ func (s) TestLoadReportingPickFirstMultiLocality(t *testing.T) { } mgmtServer.LRSServer.LRSResponseChan <- &resp - // Wait for load to be reported for locality of server 2. - // We (incorrectly) wait for load report for region-2 because presently - // pickfirst always reports load for the locality of the last address in the - // subconn. This will be fixed by ensuring there is only one address per - // subconn. - // TODO(#7339): Change region to region-1 once fixed. - if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-2"); err != nil { - t.Fatalf("region-2 did not receive load due to error: %v", err) + // Wait for load to be reported for locality of server 1. + if err := waitForSuccessfulLoadReport(ctx, mgmtServer.LRSServer, "region-1"); err != nil { + t.Fatalf("Server 1 did not receive load due to error: %v", err) } // Stop server 1 and send one more rpc. Now the request should go to server 2. From 32d869b88f6c794a173da7207666907e11426474 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Tue, 2 Jul 2024 13:10:30 -0700 Subject: [PATCH 2/9] Address review comments #1 --- balancer/balancer.go | 13 ++++-- balancer_wrapper.go | 6 +-- clientconn.go | 44 +++++++------------ .../balancer/clusterimpl/clusterimpl.go | 22 +++++----- xds/internal/internal.go | 5 +++ 5 files changed, 46 insertions(+), 44 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 3fc19e15d9cd..d9c66dce3655 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -72,7 +72,8 @@ func unregisterForTesting(name string) { delete(m, name) } -// getConnectedAddress returns the connected address for a SubConnState. +// getConnectedAddress returns the connected address for a SubConnState and +// whether or not it is valid. func getConnectedAddress(scs SubConnState) (resolver.Address, bool) { return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready } @@ -422,11 +423,17 @@ type SubConnState struct { // ConnectionError is set if the ConnectivityState is TransientFailure, // describing the reason the SubConn failed. Otherwise, it is nil. ConnectionError error - // connectedAddr contains the connected address when ConnectivityState is Ready. Otherwise, it is - // indeterminate. + // connectedAddr contains the connected address when ConnectivityState is + // Ready. Otherwise, it is indeterminate. connectedAddress resolver.Address } +func (lhs SubConnState) Equal(rhs SubConnState) bool { + return lhs.ConnectivityState == rhs.ConnectivityState && + lhs.ConnectionError == rhs.ConnectionError && + lhs.connectedAddress.Addr == rhs.connectedAddress.Addr +} + // ClientConnState describes the state of a ClientConn relevant to the // balancer. type ClientConnState struct { diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 39577d5c8210..88d2f6e301bd 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -25,7 +25,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" - grpcinternal "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -263,8 +263,8 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve // TODO: delete this comment when UpdateSubConnState is removed. scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err} if s == connectivity.Ready { - if SetConnectedAddress, ok := grpcinternal.SetConnectedAddress.(func(state *balancer.SubConnState, addr resolver.Address)); ok { - SetConnectedAddress(&scs, curAddr) + if sca, ok := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)); ok { + sca(&scs, curAddr) } } acbw.stateListener(scs) diff --git a/clientconn.go b/clientconn.go index a218f4b4085b..94157b6e5870 100644 --- a/clientconn.go +++ b/clientconn.go @@ -53,9 +53,7 @@ import ( const ( // minimum time to give a connection to complete - minConnectTimeout = 20 * time.Second - withBalancerAttributes = true - withoutBalancerAttributes = false + minConnectTimeout = 20 * time.Second ) var ( @@ -821,20 +819,11 @@ func addressWithoutBalancerAttributes(a resolver.Address) resolver.Address { return a } -// Makes a copy of the input addresses slice and optionally clears out the -// balancer attributes field. Addresses are passed during subconn creation and -// address update operations. In both cases, we may clear the balancer -// attributes by calling this function, which would therefore allow us to use -// the Equal method provided by the resolver.Address type for comparison. -func copyAddresses(in []resolver.Address, includeBalancerAttributes bool) []resolver.Address { +// Makes a copy of the input addresses slice. Addresses are passed during +// subconn creation and address update operations. +func copyAddresses(in []resolver.Address) []resolver.Address { out := make([]resolver.Address, len(in)) - for i := range in { - if includeBalancerAttributes { - out[i] = in[i] - } else { - out[i] = addressWithoutBalancerAttributes(in[i]) - } - } + copy(out, in) return out } @@ -849,7 +838,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer. ac := &addrConn{ state: connectivity.Idle, cc: cc, - addrs: copyAddresses(addrs, withBalancerAttributes), + addrs: copyAddresses(addrs), scopts: opts, dopts: cc.dopts, channelz: channelz.RegisterSubChannel(cc.channelz, ""), @@ -936,18 +925,18 @@ func (ac *addrConn) connect() error { return nil } -func equalAddressIgnoreBalancerAttributes(a, b resolver.Address) bool { +func equalAddress(a, b resolver.Address) bool { return a.Addr == b.Addr && a.ServerName == b.ServerName && a.Attributes.Equal(b.Attributes) && a.Metadata == b.Metadata } -func equalAddressesIgnoreBalancerAttributes(a, b []resolver.Address) bool { +func equalAddresses(a, b []resolver.Address) bool { if len(a) != len(b) { return false } for i, v := range a { - if !equalAddressIgnoreBalancerAttributes(v, b[i]) { + if !equalAddress(v, b[i]) { return false } } @@ -957,15 +946,15 @@ func equalAddressesIgnoreBalancerAttributes(a, b []resolver.Address) bool { // updateAddrs updates ac.addrs with the new addresses list and handles active // connections or connection attempts. func (ac *addrConn) updateAddrs(addrs []resolver.Address) { - addrs = copyAddresses(addrs, withBalancerAttributes) + addrs = copyAddresses(addrs) limit := len(addrs) if limit > 5 { limit = 5 } - channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), copyAddresses(addrs[:limit], withoutBalancerAttributes)) + channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit]) ac.mu.Lock() - if equalAddressesIgnoreBalancerAttributes(ac.addrs, addrs) { + if equalAddresses(ac.addrs, addrs) { ac.mu.Unlock() return } @@ -984,7 +973,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { // Try to find the connected address. for _, a := range addrs { a.ServerName = ac.cc.getServerName(a) - if equalAddressIgnoreBalancerAttributes(a, ac.curAddr) { + if equalAddress(a, ac.curAddr) { // We are connected to a valid address, so do nothing but // update the addresses. ac.mu.Unlock() @@ -1365,7 +1354,6 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c // new transport. func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { addr.ServerName = ac.cc.getServerName(addr) - addrWithoutBalancerAttributes := addressWithoutBalancerAttributes(addr) hctx, hcancel := context.WithCancel(ctx) onClose := func(r transport.GoAwayReason) { @@ -1400,14 +1388,14 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, defer cancel() copts.ChannelzParent = ac.channelz - newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addrWithoutBalancerAttributes, copts, onClose) + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose) if err != nil { if logger.V(2) { - logger.Infof("Creating new client transport to %q: %v", addrWithoutBalancerAttributes, err) + logger.Infof("Creating new client transport to %q: %v", addr, err) } // newTr is either nil, or closed. hcancel() - channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addrWithoutBalancerAttributes, err) + channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err) return err } diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index b3238c380ce4..8ea754dbafea 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -31,7 +31,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" - grpcinternal "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" @@ -361,24 +361,26 @@ func (scw *scWrapper) localityID() xdsinternal.LocalityID { func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { clusterName := b.getClusterName() newAddrs := make([]resolver.Address, len(addrs)) + // TODO: Don't set the locality here. Let the StateListener handle it all. var lID xdsinternal.LocalityID for i, addr := range addrs { newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName) lID = xdsinternal.GetLocalityID(newAddrs[i]) } var sc balancer.SubConn - ret := &scWrapper{} + scw := &scWrapper{} oldListener := opts.StateListener opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) // Read connected address and call updateLocalityID() based on the connected address's locality. // https://github.com/grpc/grpc-go/issues/7339 - if GetConnectedAddress, ok := grpcinternal.GetConnectedAddress.(func(state balancer.SubConnState) (resolver.Address, bool)); ok { - if addr, ok := GetConnectedAddress(state); ok { - // TODO: Why is lID empty when running the test? The locality info is being lost somehow. + if gca, ok := internal.GetConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok { + if addr, ok := gca(state); ok { lID := xdsinternal.GetLocalityID(addr) - if !lID.Equal(xdsinternal.LocalityID{}) { - ret.updateLocalityID(lID) + if !lID.Empty() { + scw.updateLocalityID(lID) + } else if b.logger.V(2) { + b.logger.Infof("Locality ID for %v unexpectedly empty", addr) } } } @@ -387,9 +389,9 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer if err != nil { return nil, err } - ret.SubConn = sc - ret.updateLocalityID(lID) - return ret, nil + scw.SubConn = sc + scw.updateLocalityID(lID) + return scw, nil } func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) { diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 7091990500f9..1df16e31a84d 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -55,6 +55,11 @@ func (l LocalityID) Equal(o any) bool { return l.Region == ol.Region && l.Zone == ol.Zone && l.SubZone == ol.SubZone } +// Empty returns whether or not the locality ID is empty. +func (l LocalityID) Empty() bool { + return len(l.Region) == 0 && len(l.Zone) == 0 && len(l.SubZone) == 0 +} + // LocalityIDFromString converts a json representation of locality, into a // LocalityID struct. func LocalityIDFromString(s string) (ret LocalityID, _ error) { From 50700344105983baaa9bc8827614b44598c718d3 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Tue, 2 Jul 2024 13:16:10 -0700 Subject: [PATCH 3/9] Remove unused function --- clientconn.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/clientconn.go b/clientconn.go index 94157b6e5870..dd421d802d67 100644 --- a/clientconn.go +++ b/clientconn.go @@ -812,13 +812,6 @@ func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { cc.csMgr.updateState(connectivity.TransientFailure) } -// addressWithoutBalancerAttributes returns a copy of the input address with -// the BalancerAttributes field cleared. -func addressWithoutBalancerAttributes(a resolver.Address) resolver.Address { - a.BalancerAttributes = nil - return a -} - // Makes a copy of the input addresses slice. Addresses are passed during // subconn creation and address update operations. func copyAddresses(in []resolver.Address) []resolver.Address { From 8709e7846e8f1df49d34ad50b0b698cae2cd7aba Mon Sep 17 00:00:00 2001 From: Brad Town Date: Tue, 2 Jul 2024 13:25:35 -0700 Subject: [PATCH 4/9] Reflow comment. --- xds/internal/balancer/clusterimpl/clusterimpl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 8ea754dbafea..0ceb1bc8a899 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -372,8 +372,8 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer oldListener := opts.StateListener opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) - // Read connected address and call updateLocalityID() based on the connected address's locality. - // https://github.com/grpc/grpc-go/issues/7339 + // Read connected address and call updateLocalityID() based on the connected + // address's locality. https://github.com/grpc/grpc-go/issues/7339 if gca, ok := internal.GetConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok { if addr, ok := gca(state); ok { lID := xdsinternal.GetLocalityID(addr) From 18bce3623cd9b3daae6a1533cbe1dcadc5525670 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Tue, 2 Jul 2024 16:05:07 -0700 Subject: [PATCH 5/9] Remove setting locality in `NewSubConn`. --- internal/testutils/balancer.go | 16 ++++++++++++++++ .../balancer/clusterimpl/balancer_test.go | 2 +- xds/internal/balancer/clusterimpl/clusterimpl.go | 4 ---- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index c65be16be4b6..e55e79b416e5 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" ) @@ -76,6 +77,21 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { } } +// UpdateStateAndConnectedAddress saves the connected address to state if the +// connectivity state is Ready and pushes the state to the listener if one is +// registered. +func (tsc *TestSubConn) UpdateStateAndConnectedAddress(state balancer.SubConnState, addr resolver.Address) { + <-tsc.connectCalled.Done() + if state.ConnectivityState == connectivity.Ready { + sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + sca(&state, addr) + } + if tsc.stateListener != nil { + tsc.stateListener(state) + return + } +} + // Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent // TestClientConn. func (tsc *TestSubConn) Shutdown() { diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 76c96decfd7d..2d579ae97773 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -637,7 +637,7 @@ func (s) TestLoadReporting(t *testing.T) { t.Fatal(err.Error()) } - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc1.UpdateStateAndConnectedAddress(balancer.SubConnState{ConnectivityState: connectivity.Ready}, addrs[0]) // Test pick with one backend. const successCount = 5 const errorCount = 5 diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 0ceb1bc8a899..3bd52e6f5fb8 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -361,11 +361,8 @@ func (scw *scWrapper) localityID() xdsinternal.LocalityID { func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { clusterName := b.getClusterName() newAddrs := make([]resolver.Address, len(addrs)) - // TODO: Don't set the locality here. Let the StateListener handle it all. - var lID xdsinternal.LocalityID for i, addr := range addrs { newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName) - lID = xdsinternal.GetLocalityID(newAddrs[i]) } var sc balancer.SubConn scw := &scWrapper{} @@ -390,7 +387,6 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer return nil, err } scw.SubConn = sc - scw.updateLocalityID(lID) return scw, nil } From 69acfcfacb9c31e5582b2723734fa3f39be29633 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Mon, 8 Jul 2024 12:50:15 -0700 Subject: [PATCH 6/9] Address review comments (2/x). --- balancer/balancer.go | 9 +++++---- clientconn.go | 15 ++++++--------- internal/internal.go | 7 ++++--- xds/internal/balancer/clusterimpl/clusterimpl.go | 4 ++-- xds/internal/internal.go | 2 +- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index d9c66dce3655..ba72d173405f 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -72,9 +72,10 @@ func unregisterForTesting(name string) { delete(m, name) } -// getConnectedAddress returns the connected address for a SubConnState and -// whether or not it is valid. -func getConnectedAddress(scs SubConnState) (resolver.Address, bool) { +// connectedAddress returns the connected address for a SubConnState. +// The second return value is set to to false if the state is not READY, and the +// first return value is meaningless in this case. +func connectedAddress(scs SubConnState) (resolver.Address, bool) { return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready } @@ -85,7 +86,7 @@ func setConnectedAddress(scs *SubConnState, addr resolver.Address) { func init() { internal.BalancerUnregister = unregisterForTesting - internal.GetConnectedAddress = getConnectedAddress + internal.ConnectedAddress = connectedAddress internal.SetConnectedAddress = setConnectedAddress } diff --git a/clientconn.go b/clientconn.go index dd421d802d67..4602623097f8 100644 --- a/clientconn.go +++ b/clientconn.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "net/url" + "slices" "strings" "sync" "sync/atomic" @@ -918,6 +919,10 @@ func (ac *addrConn) connect() error { return nil } +// equalAddress returns true is a and b are considered equal. +// This is different from the Equal method on the resolver.Address type which +// considers all fields to determine equality. Here, we only consider fields +// that are meaningful to the subConn. func equalAddress(a, b resolver.Address) bool { return a.Addr == b.Addr && a.ServerName == b.ServerName && a.Attributes.Equal(b.Attributes) && @@ -925,15 +930,7 @@ func equalAddress(a, b resolver.Address) bool { } func equalAddresses(a, b []resolver.Address) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if !equalAddress(v, b[i]) { - return false - } - } - return true + return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddress(a, b) }) } // updateAddrs updates ac.addrs with the new addresses list and handles active diff --git a/internal/internal.go b/internal/internal.go index a0166184e0ac..702b06df2080 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -209,9 +209,10 @@ var ( // is the number of elements. swap swaps the elements with indexes i and j. ShuffleAddressListForTesting any // func(n int, swap func(i, j int)) - // GetConnectedAddress returns the connected address for a SubConnState and - // whether the address is valid based on the state. - GetConnectedAddress any // func (scs SubConnState) (resolver.Address, bool) + // ConnectedAddress returns the connected address for a SubConnState. +// The second return value is set to to false if the state is not READY, and the +// first return value is meaningless in this case. + ConnectedAddress any // func (scs SubConnState) (resolver.Address, bool) // SetConnectedAddress sets the connected address for a SubConnState. SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 3bd52e6f5fb8..275d05bc33f4 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -371,13 +371,13 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer b.updateSubConnState(sc, state, oldListener) // Read connected address and call updateLocalityID() based on the connected // address's locality. https://github.com/grpc/grpc-go/issues/7339 - if gca, ok := internal.GetConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok { + if gca, ok := internal.ConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok { if addr, ok := gca(state); ok { lID := xdsinternal.GetLocalityID(addr) if !lID.Empty() { scw.updateLocalityID(lID) } else if b.logger.V(2) { - b.logger.Infof("Locality ID for %v unexpectedly empty", addr) + b.logger.Infof("Locality ID for %s unexpectedly empty", addr) } } } diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 1df16e31a84d..1d8a6b03f1b3 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -57,7 +57,7 @@ func (l LocalityID) Equal(o any) bool { // Empty returns whether or not the locality ID is empty. func (l LocalityID) Empty() bool { - return len(l.Region) == 0 && len(l.Zone) == 0 && len(l.SubZone) == 0 + return l.Region == "" && l.Zone == "" && l.SubZone == "" } // LocalityIDFromString converts a json representation of locality, into a From 66478fb81b25b638506fc78948ce4ba219b972f3 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Mon, 8 Jul 2024 12:57:57 -0700 Subject: [PATCH 7/9] Address review comments (3/x). --- internal/testutils/balancer.go | 16 ---------------- .../balancer/clusterimpl/balancer_test.go | 6 +++++- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index e55e79b416e5..c65be16be4b6 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" ) @@ -77,21 +76,6 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { } } -// UpdateStateAndConnectedAddress saves the connected address to state if the -// connectivity state is Ready and pushes the state to the listener if one is -// registered. -func (tsc *TestSubConn) UpdateStateAndConnectedAddress(state balancer.SubConnState, addr resolver.Address) { - <-tsc.connectCalled.Done() - if state.ConnectivityState == connectivity.Ready { - sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) - sca(&state, addr) - } - if tsc.stateListener != nil { - tsc.stateListener(state) - return - } -} - // Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent // TestClientConn. func (tsc *TestSubConn) Shutdown() { diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 2d579ae97773..5a4bb0f270b2 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" @@ -637,7 +638,10 @@ func (s) TestLoadReporting(t *testing.T) { t.Fatal(err.Error()) } - sc1.UpdateStateAndConnectedAddress(balancer.SubConnState{ConnectivityState: connectivity.Ready}, addrs[0]) + scs := balancer.SubConnState{ConnectivityState: connectivity.Ready} + sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + sca(&scs, addrs[0]) + sc1.UpdateState(scs) // Test pick with one backend. const successCount = 5 const errorCount = 5 From c5a4adea45910a3702673e006bb8e99826553385 Mon Sep 17 00:00:00 2001 From: Brad Town Date: Mon, 8 Jul 2024 13:22:35 -0700 Subject: [PATCH 8/9] Address review comments (4/x). --- balancer/balancer.go | 2 +- internal/internal.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index ba72d173405f..5698a1d49939 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -432,7 +432,7 @@ type SubConnState struct { func (lhs SubConnState) Equal(rhs SubConnState) bool { return lhs.ConnectivityState == rhs.ConnectivityState && lhs.ConnectionError == rhs.ConnectionError && - lhs.connectedAddress.Addr == rhs.connectedAddress.Addr + lhs.connectedAddress.Equal(rhs.connectedAddress) } // ClientConnState describes the state of a ClientConn relevant to the diff --git a/internal/internal.go b/internal/internal.go index 702b06df2080..4cdf2ae53b75 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -209,9 +209,9 @@ var ( // is the number of elements. swap swaps the elements with indexes i and j. ShuffleAddressListForTesting any // func(n int, swap func(i, j int)) - // ConnectedAddress returns the connected address for a SubConnState. -// The second return value is set to to false if the state is not READY, and the -// first return value is meaningless in this case. + // ConnectedAddress returns the connected address for a SubConnState. The + // second return value is set to to false if the state is not READY, and the + // first return value is meaningless in this case. ConnectedAddress any // func (scs SubConnState) (resolver.Address, bool) // SetConnectedAddress sets the connected address for a SubConnState. From 64ae66243e9b8157b28b0d44b47474baad9308ab Mon Sep 17 00:00:00 2001 From: Brad Town Date: Wed, 10 Jul 2024 10:34:38 -0700 Subject: [PATCH 9/9] Address review comments (5/x). --- balancer/balancer.go | 15 ++++---------- balancer_wrapper.go | 6 +++--- clientconn.go | 12 +++++------ internal/internal.go | 5 ++--- .../balancer/clusterimpl/clusterimpl.go | 20 +++++++++++-------- .../outlierdetection/balancer_test.go | 2 +- 6 files changed, 28 insertions(+), 32 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 5698a1d49939..04e47a1254ba 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -72,11 +72,10 @@ func unregisterForTesting(name string) { delete(m, name) } -// connectedAddress returns the connected address for a SubConnState. -// The second return value is set to to false if the state is not READY, and the -// first return value is meaningless in this case. -func connectedAddress(scs SubConnState) (resolver.Address, bool) { - return scs.connectedAddress, scs.ConnectivityState == connectivity.Ready +// connectedAddress returns the connected address for a SubConnState. The +// address is only valid if the state is READY. +func connectedAddress(scs SubConnState) resolver.Address { + return scs.connectedAddress } // setConnectedAddress sets the connected address for a SubConnState. @@ -429,12 +428,6 @@ type SubConnState struct { connectedAddress resolver.Address } -func (lhs SubConnState) Equal(rhs SubConnState) bool { - return lhs.ConnectivityState == rhs.ConnectivityState && - lhs.ConnectionError == rhs.ConnectionError && - lhs.connectedAddress.Equal(rhs.connectedAddress) -} - // ClientConnState describes the state of a ClientConn relevant to the // balancer. type ClientConnState struct { diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 88d2f6e301bd..554fd3c64afe 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -32,6 +32,8 @@ import ( "google.golang.org/grpc/resolver" ) +var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the @@ -263,9 +265,7 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve // TODO: delete this comment when UpdateSubConnState is removed. scs := balancer.SubConnState{ConnectivityState: s, ConnectionError: err} if s == connectivity.Ready { - if sca, ok := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)); ok { - sca(&scs, curAddr) - } + setConnectedAddress(&scs, curAddr) } acbw.stateListener(scs) }) diff --git a/clientconn.go b/clientconn.go index 4602623097f8..663902ae8308 100644 --- a/clientconn.go +++ b/clientconn.go @@ -919,18 +919,18 @@ func (ac *addrConn) connect() error { return nil } -// equalAddress returns true is a and b are considered equal. +// equalAddressIgnoringBalAttributes returns true is a and b are considered equal. // This is different from the Equal method on the resolver.Address type which // considers all fields to determine equality. Here, we only consider fields // that are meaningful to the subConn. -func equalAddress(a, b resolver.Address) bool { +func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool { return a.Addr == b.Addr && a.ServerName == b.ServerName && a.Attributes.Equal(b.Attributes) && a.Metadata == b.Metadata } -func equalAddresses(a, b []resolver.Address) bool { - return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddress(a, b) }) +func equalAddressesIgnoringBalAttributes(a, b []resolver.Address) bool { + return slices.EqualFunc(a, b, func(a, b resolver.Address) bool { return equalAddressIgnoringBalAttributes(&a, &b) }) } // updateAddrs updates ac.addrs with the new addresses list and handles active @@ -944,7 +944,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit]) ac.mu.Lock() - if equalAddresses(ac.addrs, addrs) { + if equalAddressesIgnoringBalAttributes(ac.addrs, addrs) { ac.mu.Unlock() return } @@ -963,7 +963,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { // Try to find the connected address. for _, a := range addrs { a.ServerName = ac.cc.getServerName(a) - if equalAddress(a, ac.curAddr) { + if equalAddressIgnoringBalAttributes(&a, &ac.curAddr) { // We are connected to a valid address, so do nothing but // update the addresses. ac.mu.Unlock() diff --git a/internal/internal.go b/internal/internal.go index 4cdf2ae53b75..46ed257685bf 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -210,9 +210,8 @@ var ( ShuffleAddressListForTesting any // func(n int, swap func(i, j int)) // ConnectedAddress returns the connected address for a SubConnState. The - // second return value is set to to false if the state is not READY, and the - // first return value is meaningless in this case. - ConnectedAddress any // func (scs SubConnState) (resolver.Address, bool) + // address is only valid if the state is READY. + ConnectedAddress any // func (scs SubConnState) resolver.Address // SetConnectedAddress sets the connected address for a SubConnState. SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 275d05bc33f4..9058f0d01fc8 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -53,6 +53,8 @@ const ( defaultRequestCountMax = 1024 ) +var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address) + func init() { balancer.Register(bb{}) } @@ -369,18 +371,20 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer oldListener := opts.StateListener opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) + if state.ConnectivityState != connectivity.Ready { + return + } // Read connected address and call updateLocalityID() based on the connected // address's locality. https://github.com/grpc/grpc-go/issues/7339 - if gca, ok := internal.ConnectedAddress.(func(balancer.SubConnState) (resolver.Address, bool)); ok { - if addr, ok := gca(state); ok { - lID := xdsinternal.GetLocalityID(addr) - if !lID.Empty() { - scw.updateLocalityID(lID) - } else if b.logger.V(2) { - b.logger.Infof("Locality ID for %s unexpectedly empty", addr) - } + addr := connectedAddress(state) + lID := xdsinternal.GetLocalityID(addr) + if lID.Empty() { + if b.logger.V(2) { + b.logger.Infof("Locality ID for %s unexpectedly empty", addr) } + return } + scw.updateLocalityID(lID) } sc, err := b.ClientConn.NewSubConn(newAddrs, opts) if err != nil { diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 54eefaa34c1a..39bd51aa6567 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -852,7 +852,7 @@ func (s) TestUpdateAddresses(t *testing.T) { } func scwsEqual(gotSCWS subConnWithState, wantSCWS subConnWithState) error { - if gotSCWS.sc != wantSCWS.sc || !cmp.Equal(gotSCWS.state, wantSCWS.state, cmp.AllowUnexported(subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { + if gotSCWS.sc != wantSCWS.sc || !cmp.Equal(gotSCWS.state, wantSCWS.state, cmp.AllowUnexported(subConnWrapper{}, addressInfo{}, balancer.SubConnState{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCWS, wantSCWS) } return nil