Skip to content

Commit

Permalink
xds/resolver: Add support for cluster specifier plugins (#4987)
Browse files Browse the repository at this point in the history
* xds/resolver: Add support for cluster specifier plugins
  • Loading branch information
zasweq authored Dec 6, 2021
1 parent 512e894 commit 3786ae1
Show file tree
Hide file tree
Showing 5 changed files with 433 additions and 39 deletions.
368 changes: 368 additions & 0 deletions xds/internal/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,368 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package resolver

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

func init() {
balancer.Register(cspB{})
}

type cspB struct{}

func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return nil
}

func (cspB) Name() string {
return "csp_experimental"
}

type cspConfig struct {
ArbitraryField string `json:"arbitrary_field"`
}

// TestXDSResolverClusterSpecifierPlugin tests that cluster specifier plugins
// produce the correct service config, and that the config selector routes to a
// cluster specifier plugin supported by this service config (i.e. prefixed with
// a cluster specifier plugin prefix).
func (s) TestXDSResolverClusterSpecifierPlugin(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)

waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}},
}, nil)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}]
}
}
}}]}`

wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}

cluster := clustermanager.GetPickedClusterForTesting(res.Context)
clusterWant := clusterSpecifierPluginPrefix + "cspA"
if cluster != clusterWant {
t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant)
}
}

// TestXDSResolverClusterSpecifierPluginConfigUpdate tests that cluster
// specifier plugins produce the correct service config, and that on an update
// to the CSP Configuration, the new config is accounted for in the output
// service config.
func (s) TestXDSResolverClusterSpecifierPluginConfigUpdate(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)

waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}},
}, nil)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}]
}
}
}}]}`

wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "changed"}}}},
}, nil)

gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON = `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"changed"}}]
}
}
}}]}`

wantSCParsed = internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
}

// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and
// their corresponding configurations remain in service config if RPCs are in
// flight.
func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}},
}, nil)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}]
}
}
}}]}`

wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}

cluster := clustermanager.GetPickedClusterForTesting(res.Context)
clusterWant := clusterSpecifierPluginPrefix + "cspA"
if cluster != clusterWant {
t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant)
}
// delay res.OnCommitted()

// Perform TWO updates to ensure the old config selector does not hold a reference to cspA
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update.

xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)

gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON2 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}]
},
"cluster_specifier_plugin:cspB":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}]
}
}
}}]}`

wantSCParsed2 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON2)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config))
}

// Invoke OnCommitted; should lead to a service config update that deletes
// cspA.
res.OnCommitted()

xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}},
},
},
// Top level csp config here - the value of cspB should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}},
}, nil)
gotState, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState = gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON3 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspB":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}]
}
}
}}]}`

wantSCParsed3 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON3)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config))
}
}
Loading

0 comments on commit 3786ae1

Please sign in to comment.