diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 4cee66aeb6e6..5496b99dd5c4 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -19,6 +19,7 @@ package balancergroup import ( + "encoding/json" "fmt" "sync" "time" @@ -29,6 +30,7 @@ import ( "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) // subBalancerWrapper is used to keep the configurations that will be used to start @@ -148,20 +150,6 @@ func (sbc *subBalancerWrapper) resolverError(err error) { b.ResolverError(err) } -func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) { - sbc.builder = builder - b := sbc.balancer - // Even if you get an add and it persists builder but doesn't start - // balancer, this would leave graceful switch being nil, in which we are - // correctly overwriting with the recent builder here as well to use later. - // The graceful switch balancer's presence is an invariant of whether the - // balancer group is closed or not (if closed, nil, if started, present). - if sbc.balancer != nil { - sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name()) - b.SwitchTo(sbc.builder) - } -} - func (sbc *subBalancerWrapper) stopBalancer() { if sbc.balancer == nil { return @@ -170,7 +158,8 @@ func (sbc *subBalancerWrapper) stopBalancer() { sbc.balancer = nil } -// BalancerGroup takes a list of balancers, and make them into one balancer. +// BalancerGroup takes a list of balancers, each behind a gracefulswitch +// balancer, and make them into one balancer. // // Note that this struct doesn't implement balancer.Balancer, because it's not // intended to be used directly as a balancer. It's expected to be used as a @@ -377,25 +366,6 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { bg.AddWithClientConn(id, builder.Name(), bg.cc) } -// UpdateBuilder updates the builder for a current child, starting the Graceful -// Switch process for that child. -// -// TODO: update this API to take the name of the new builder instead. -func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { - bg.outgoingMu.Lock() - // This does not deal with the balancer cache because this call should come - // after an Add call for a given child balancer. If the child is removed, - // the caller will call Add if the child balancer comes back which would - // then deal with the balancer cache. - sbc := bg.idToBalancerConfig[id] - if sbc == nil { - // simply ignore it if not present, don't error - return - } - sbc.gracefulSwitch(builder) - bg.outgoingMu.Unlock() -} - // Remove removes the balancer with id from the group. // // But doesn't close the balancer. The balancer is kept in a cache, and will be @@ -636,3 +606,14 @@ func (bg *BalancerGroup) ExitIdleOne(id string) { } bg.outgoingMu.Unlock() } + +// ParseConfig parses a child config list and returns a LB config for the +// gracefulswitch Balancer. +// +// cfg is expected to be a json.RawMessage containing a JSON array of LB policy +// names + configs as the format of the "loadBalancingConfig" field in +// ServiceConfig. It returns a type that should be passed to +// UpdateClientConnState in the BalancerConfig field. +func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return gracefulswitch.ParseConfig(cfg) +} diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index f1153e48ef7a..b182ec8489e2 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -18,6 +18,7 @@ package balancergroup import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -609,9 +610,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { return bal.UpdateClientConnState(ccs) }, }) - builder := balancer.Get(childPolicyName) - bg.UpdateBuilder(testBalancerIDs[0], builder) - if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil { + cfgJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, t.Name())) + lbCfg, err := ParseConfig(cfgJSON) + if err != nil { + t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err) + } + if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}, + BalancerConfig: lbCfg, + }); err != nil { t.Fatalf("error updating ClientConn state: %v", err) } diff --git a/xds/internal/balancer/clustermanager/balancerstateaggregator.go b/xds/internal/balancer/clustermanager/balancerstateaggregator.go index 4b971a3e241b..92c69f5e1fc2 100644 --- a/xds/internal/balancer/clustermanager/balancerstateaggregator.go +++ b/xds/internal/balancer/clustermanager/balancerstateaggregator.go @@ -45,17 +45,14 @@ func (s *subBalancerState) String() string { type balancerStateAggregator struct { cc balancer.ClientConn logger *grpclog.PrefixLogger + csEval *balancer.ConnectivityStateEvaluator mu sync.Mutex - // If started is false, no updates should be sent to the parent cc. A closed - // sub-balancer could still send pickers to this aggregator. This makes sure - // that no updates will be forwarded to parent when the whole balancer group - // and states aggregator is closed. - started bool - // All balancer IDs exist as keys in this map, even if balancer group is not - // started. - // - // If an ID is not in map, it's either removed or never added. + // This field is used to ensure that no updates are forwarded to the parent + // CC once the aggregator is closed. A closed sub-balancer could still send + // pickers to this aggregator. + closed bool + // Map from child policy name to last reported state. idToPickerState map[string]*subBalancerState // Set when UpdateState call propagation is paused. pauseUpdateState bool @@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo return &balancerStateAggregator{ cc: cc, logger: logger, + csEval: &balancer.ConnectivityStateEvaluator{}, idToPickerState: make(map[string]*subBalancerState), } } -// Start starts the aggregator. It can be called after Close to restart the -// aggretator. -func (bsa *balancerStateAggregator) start() { - bsa.mu.Lock() - defer bsa.mu.Unlock() - bsa.started = true -} - -// Close closes the aggregator. When the aggregator is closed, it won't call -// parent ClientConn to update balancer state. func (bsa *balancerStateAggregator) close() { bsa.mu.Lock() defer bsa.mu.Unlock() - bsa.started = false - bsa.clearStates() + bsa.closed = true } -// add adds a sub-balancer state with weight. It adds a place holder, and waits -// for the real sub-balancer to update state. +// add adds a sub-balancer in CONNECTING state. // // This is called when there's a new child. func (bsa *balancerStateAggregator) add(id string) { bsa.mu.Lock() defer bsa.mu.Unlock() + bsa.idToPickerState[id] = &subBalancerState{ // Start everything in CONNECTING, so if one of the sub-balancers // reports TransientFailure, the RPCs will still wait for the other @@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) { }, stateToAggregate: connectivity.Connecting, } + bsa.csEval.RecordTransition(connectivity.Shutdown, connectivity.Connecting) + bsa.buildAndUpdateLocked() } // remove removes the sub-balancer state. Future updates from this sub-balancer, @@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) { if _, ok := bsa.idToPickerState[id]; !ok { return } + // Setting the state of the deleted sub-balancer to Shutdown will get + // csEvltr to remove the previous state for any aggregated state + // evaluations. Transitions to and from connectivity.Shutdown are ignored + // by csEvltr. + bsa.csEval.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(bsa.idToPickerState, id) + bsa.buildAndUpdateLocked() } // pauseStateUpdates causes UpdateState calls to not propagate to the parent @@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() { defer bsa.mu.Unlock() bsa.pauseUpdateState = false if bsa.needUpdateStateOnResume { - bsa.cc.UpdateState(bsa.build()) + bsa.cc.UpdateState(bsa.buildLocked()) } } @@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() { // // It calls parent ClientConn's UpdateState with the new aggregated state. func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) { + bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state) + bsa.mu.Lock() defer bsa.mu.Unlock() pickerSt, ok := bsa.idToPickerState[id] @@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) // update the state, to prevent the aggregated state from being always // CONNECTING. Otherwise, stateToAggregate is the same as // state.ConnectivityState. + bsa.csEval.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState) pickerSt.stateToAggregate = state.ConnectivityState } pickerSt.state = state - - if !bsa.started { - return - } - if bsa.pauseUpdateState { - // If updates are paused, do not call UpdateState, but remember that we - // need to call it when they are resumed. - bsa.needUpdateStateOnResume = true - return - } - bsa.cc.UpdateState(bsa.build()) -} - -// clearState Reset everything to init state (Connecting) but keep the entry in -// map (to keep the weight). -// -// Caller must hold bsa.mu. -func (bsa *balancerStateAggregator) clearStates() { - for _, pState := range bsa.idToPickerState { - pState.state = balancer.State{ - ConnectivityState: connectivity.Connecting, - Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), - } - pState.stateToAggregate = connectivity.Connecting - } + bsa.buildAndUpdateLocked() } -// buildAndUpdate combines the sub-state from each sub-balancer into one state, -// and update it to parent ClientConn. -func (bsa *balancerStateAggregator) buildAndUpdate() { - bsa.mu.Lock() - defer bsa.mu.Unlock() - if !bsa.started { +// buildAndUpdateLocked combines the sub-state from each sub-balancer into one +// state, and sends a picker update to the parent ClientConn. +func (bsa *balancerStateAggregator) buildAndUpdateLocked() { + if bsa.closed { return } if bsa.pauseUpdateState { @@ -206,47 +178,11 @@ func (bsa *balancerStateAggregator) buildAndUpdate() { bsa.needUpdateStateOnResume = true return } - bsa.cc.UpdateState(bsa.build()) + bsa.cc.UpdateState(bsa.buildLocked()) } -// build combines sub-states into one. The picker will do a child pick. -// -// Caller must hold bsa.mu. -func (bsa *balancerStateAggregator) build() balancer.State { - // TODO: the majority of this function (and UpdateState) is exactly the same - // as weighted_target's state aggregator. Try to make a general utility - // function/struct to handle the logic. - // - // One option: make a SubBalancerState that handles Update(State), including - // handling the special connecting after ready, as in UpdateState(). Then a - // function to calculate the aggregated connectivity state as in this - // function. - // - // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated - // state. - var readyN, connectingN, idleN int - for _, ps := range bsa.idToPickerState { - switch ps.stateToAggregate { - case connectivity.Ready: - readyN++ - case connectivity.Connecting: - connectingN++ - case connectivity.Idle: - idleN++ - } - } - var aggregatedState connectivity.State - switch { - case readyN > 0: - aggregatedState = connectivity.Ready - case connectingN > 0: - aggregatedState = connectivity.Connecting - case idleN > 0: - aggregatedState = connectivity.Idle - default: - aggregatedState = connectivity.TransientFailure - } - +// buildLocked combines sub-states into one. +func (bsa *balancerStateAggregator) buildLocked() balancer.State { // The picker's return error might not be consistent with the // aggregatedState. Because for this LB policy, we want to always build // picker with all sub-pickers (not only ready sub-pickers), so even if the @@ -254,7 +190,7 @@ func (bsa *balancerStateAggregator) build() balancer.State { // or TransientFailure. bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState) return balancer.State{ - ConnectivityState: aggregatedState, + ConnectivityState: bsa.csEval.CurrentState(), Picker: newPickerGroup(bsa.idToPickerState), } } diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index db8332b90eac..e6d751ecbee4 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -25,6 +25,8 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/balancergroup" internalgrpclog "google.golang.org/grpc/internal/grpclog" @@ -46,7 +48,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal b := &bal{} b.logger = prefixLogger(b) b.stateAggregator = newBalancerStateAggregator(cc, b.logger) - b.stateAggregator.start() b.bg = balancergroup.New(balancergroup.Options{ CC: cc, BuildOpts: opts, @@ -68,59 +69,101 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err } type bal struct { - logger *internalgrpclog.PrefixLogger - - // TODO: make this package not dependent on xds specific code. Same as for - // weighted target balancer. + logger *internalgrpclog.PrefixLogger bg *balancergroup.BalancerGroup stateAggregator *balancerStateAggregator children map[string]childConfig } -func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { - update := false +func (b *bal) setErrorPickerForChild(childName string, err error) { + b.stateAggregator.UpdateState(childName, balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(err), + }) +} + +func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) error { + // TODO: Get rid of handling hierarchy in addresses. This LB policy never + // gets addresses from the resolver. addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - // Remove sub-pickers and sub-balancers that are not in the new cluster list. + // Remove sub-balancers that are not in the new list from the aggregator and + // balancergroup. for name := range b.children { if _, ok := newConfig.Children[name]; !ok { b.stateAggregator.remove(name) b.bg.Remove(name) - update = true } } - // For sub-balancers in the new cluster list, - // - add to balancer group if it's new, - // - forward the address/balancer config update. - for name, newT := range newConfig.Children { - if _, ok := b.children[name]; !ok { - // If this is a new sub-balancer, add it to the picker map. - b.stateAggregator.add(name) - // Then add to the balancer group. - b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) + var retErr error + for childName, childCfg := range newConfig.Children { + lbCfg := childCfg.ChildPolicy.Config + if _, ok := b.children[childName]; !ok { + // Add new sub-balancers to the aggregator and balancergroup. + b.stateAggregator.add(childName) + b.bg.Add(childName, balancer.Get(childCfg.ChildPolicy.Name)) } else { - // Already present, check for type change and if so send down a new builder. - if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name { - b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name)) + // If the child policy type has changed for existing sub-balancers, + // parse the new config and send down the config update to the + // balancergroup, which will take care of gracefully switching the + // child over to the new policy. + // + // If we run into errors here, we need to ensure that RPCs to this + // child fail, while RPCs to other children with good configs + // continue to succeed. + newPolicyName, oldPolicyName := childCfg.ChildPolicy.Name, b.children[childName].ChildPolicy.Name + if newPolicyName != oldPolicyName { + var err error + var cfgJSON []byte + cfgJSON, err = childCfg.ChildPolicy.MarshalJSON() + if err != nil { + retErr = fmt.Errorf("failed to JSON marshal load balancing policy for child %q: %v", childName, err) + b.setErrorPickerForChild(childName, retErr) + continue + } + // This overwrites lbCfg to be in the format expected by the + // gracefulswitch balancer. So, when this config is pushed to + // the child (below), it will result in a graceful switch to the + // new child policy. + lbCfg, err = balancergroup.ParseConfig(cfgJSON) + if err != nil { + retErr = fmt.Errorf("failed to parse load balancing policy for child %q: %v", childName, err) + b.setErrorPickerForChild(childName, retErr) + continue + } } } - // TODO: handle error? How to aggregate errors and return? - _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ + + if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: addressesSplit[name], + Addresses: addressesSplit[childName], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }, - BalancerConfig: newT.ChildPolicy.Config, - }) + BalancerConfig: lbCfg, + }); err != nil { + retErr = fmt.Errorf("failed to push new configuration %v to child %q", childCfg.ChildPolicy.Config, childName) + b.setErrorPickerForChild(childName, retErr) + } + + // Picker update is sent to the parent ClientConn only after the + // new child policy returns a picker. So, there is no need to + // set needUpdateStateOnResume to true here. } b.children = newConfig.Children - if update { - b.stateAggregator.buildAndUpdate() - } + + // If multiple sub-balancers run into errors, we will return only the last + // one, which is still good enough, since the grpc channel will anyways + // return this error as balancer.ErrBadResolver to the name resolver, + // resulting in re-resolution attempts. + return retErr + + // Adding or removing a sub-balancer will result in the + // needUpdateStateOnResume bit to true which results in a picker update once + // resumeStateUpdates() is called. } func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error { @@ -128,12 +171,11 @@ func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error { if !ok { return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState) + b.logger.Infof("Update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState) b.stateAggregator.pauseStateUpdates() defer b.stateAggregator.resumeStateUpdates() - b.updateChildren(s, newConfig) - return nil + return b.updateChildren(s, newConfig) } func (b *bal) ResolverError(err error) { diff --git a/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go new file mode 100644 index 000000000000..e9fd3c389fef --- /dev/null +++ b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go @@ -0,0 +1,327 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/xds" // Register the xDS name resolver and related LB policies. +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + +func makeEmptyCallRPCAndVerifyPeer(ctx context.Context, client testgrpc.TestServiceClient, wantPeer string) error { + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { + return fmt.Errorf("EmptyCall() failed: %v", err) + } + if gotPeer := peer.Addr.String(); gotPeer != wantPeer { + return fmt.Errorf("EmptyCall() routed to %q, want to be routed to: %q", gotPeer, wantPeer) + } + return nil +} + +func makeUnaryCallRPCAndVerifyPeer(ctx context.Context, client testgrpc.TestServiceClient, wantPeer string) error { + peer := &peer.Peer{} + if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Peer(peer)); err != nil { + return fmt.Errorf("UnaryCall() failed: %v", err) + } + if gotPeer := peer.Addr.String(); gotPeer != wantPeer { + return fmt.Errorf("EmptyCall() routed to %q, want to be routed to: %q", gotPeer, wantPeer) + } + return nil +} + +func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { + // Spin up an xDS management server. + mgmtServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + + // Configure client side xDS resources on the management server. + const ( + serviceName = "my-service-client-side-xds" + routeConfigName = "route-" + serviceName + clusterName1 = "cluster1-" + serviceName + clusterName2 = "cluster2-" + serviceName + endpointsName1 = "endpoints1-" + serviceName + endpointsName2 = "endpoints2-" + serviceName + endpointsName3 = "endpoints3-" + serviceName + ) + // A single Listener resource pointing to the following Route + // configuration: + // - "/grpc.testing.TestService/EmptyCall" --> cluster1 + // - "/grpc.testing.TestService/UnaryCall" --> cluster2 + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)} + routes := []*v3routepb.RouteConfiguration{{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{serviceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName1}, + }}, + }, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName2}, + }}, + }, + }, + }}, + }} + // Two cluster resources corresponding to the ones mentioned in the above + // route configuration resource. These are configured with round_robin as + // their endpoint picking policy. + clusters := []*v3clusterpb.Cluster{ + e2e.DefaultCluster(clusterName1, endpointsName1, e2e.SecurityLevelNone), + e2e.DefaultCluster(clusterName2, endpointsName2, e2e.SecurityLevelNone), + } + // Spin up two test backends, one for each cluster below. + server1 := stubserver.StartTestService(t, nil) + defer server1.Stop() + server2 := stubserver.StartTestService(t, nil) + defer server2.Stop() + // Two endpoints resources, each with one backend from above. + endpoints := []*v3endpointpb.ClusterLoadAssignment{ + e2e.DefaultEndpoint(endpointsName1, "localhost", []uint32{testutils.ParsePort(t, server1.Address)}), + e2e.DefaultEndpoint(endpointsName2, "localhost", []uint32{testutils.ParsePort(t, server2.Address)}), + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: listeners, + Routes: routes, + Clusters: clusters, + Endpoints: endpoints, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + client := testgrpc.NewTestServiceClient(cc) + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it is routed to cluster2. + if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server2.Address); err != nil { + t.Fatal(err) + } + + // Create a wrapped pickfirst LB policy. When the endpoint picking policy on + // the cluster resource is changed to pickfirst, this will allow us to + // verify that load balancing configuration is pushed to it. + pfBuilder := balancer.Get(grpc.PickFirstBalancerName) + internal.BalancerUnregister(pfBuilder.Name()) + + lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1) + stub.Register(pfBuilder.Name(), stub.BalancerFuncs{ + ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return pfBuilder.(balancer.ConfigParser).ParseConfig(lbCfg) + }, + Init: func(bd *stub.BalancerData) { + bd.Data = pfBuilder.Build(bd.ClientConn, bd.BuildOptions) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + select { + case lbCfgCh <- ccs.BalancerConfig: + default: + } + bal := bd.Data.(balancer.Balancer) + return bal.UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bal := bd.Data.(balancer.Balancer) + bal.Close() + }, + }) + + // Send a config update that changes the child policy configuration for one + // of the clusters to pickfirst. The endpoints resource is also changed here + // to ensure that we can verify that the new child policy + cluster2 := &v3clusterpb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: endpointsName3, + }, + LoadBalancingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(t, &v3pickfirstpb.PickFirst{ + ShuffleAddressList: true, + }), + }, + }, + }, + }, + } + server3 := stubserver.StartTestService(t, nil) + defer server3.Stop() + endpoints3 := e2e.DefaultEndpoint(endpointsName3, "localhost", []uint32{testutils.ParsePort(t, server3.Address)}) + resources.Clusters[1] = cluster2 + resources.Endpoints = append(resources.Endpoints, endpoints3) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for configuration to be pushed to the new pickfirst child policy") + case <-lbCfgCh: + } + + // Ensure RPCs are still succeeding. + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it is routed to cluster2, and the + // new endpoints resource. + for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { + if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server3.Address); err == nil { + break + } + t.Log(err) + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to cluster2 to be routed to the new endpoints resource") + } + + // Send a config update that changes the child policy configuration for one + // of the clusters to an unsupported LB policy. This should result in + // failure of RPCs to that cluster. + cluster2 = &v3clusterpb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: endpointsName3, + }, + LoadBalancingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + // The type not registered in gRPC Policy registry. + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: "type.googleapis.com/myorg.ThisTypeDoesNotExist", + Value: &structpb.Struct{}, + }), + }, + }, + }, + }, + } + resources.Clusters[1] = cluster2 + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // At this point, RPCs to cluster1 should continue to succeed, while RPCs to + // cluster2 should start to fail. + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it starts to fail. + for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { + _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}) + got := status.Code(err) + if got == codes.Unavailable { + break + } + t.Logf("UnaryCall() returned code: %v, want %v", got, codes.Unavailable) + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to cluster2 to start failing") + } + + // Channel should still be READY. + if got, want := cc.GetState(), connectivity.Ready; got != want { + t.Fatalf("grpc.ClientConn in state %v, want %v", got, want) + } +}