Skip to content

Commit

Permalink
clusterresolver: comply with A37 for handling errors from discovery m…
Browse files Browse the repository at this point in the history
…echanisms (#6461)
  • Loading branch information
zasweq authored Jul 24, 2023
1 parent d7f45cd commit 2aa2615
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 37 deletions.
12 changes: 11 additions & 1 deletion xds/internal/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint
})
}

priorities := groupLocalitiesByPriority(edsResp.Localities)
// Localities of length 0 is triggered by an NACK or resource-not-found
// error before update, or a empty localities list in a update. In either
// case want to create a priority, and send down empty address list, causing
// TF for that priority. "If any discovery mechanism instance experiences an
// error retrieving data, and it has not previously reported any results, it
// should report a result that is a single priority with no endpoints." -
// A37
priorities := [][]xdsresource.Locality{{}}
if len(edsResp.Localities) != 0 {
priorities = groupLocalitiesByPriority(edsResp.Localities)
}
retNames := g.generate(priorities)
retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
var retAddrs []resolver.Address
Expand Down
171 changes: 160 additions & 11 deletions xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package e2e_test

import (
"context"
"errors"
"fmt"
"sort"
"strings"
Expand All @@ -31,6 +32,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -596,8 +598,8 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
// DNS resolver yet. Once the DNS resolver pushes an update, the test verifies
// that we switch to the DNS cluster and can make a successful RPC. At this
// point when the DNS cluster returns an error, the test verifies that RPCs are
// still successful. This is the expected behavior because pick_first (the leaf
// policy) ignores resolver errors when it is not in TransientFailure.
// still successful. This is the expected behavior because the cluster resolver
// policy eats errors from DNS Resolver after it has returned an error.
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
Expand All @@ -612,8 +614,8 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
addrs, _ := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
// cluster. Also configure an empty endpoints resource for the EDS cluster
// that contains no endpoints.
// cluster. Also configure an endpoints resource for the EDS cluster which
// triggers a NACK.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
Expand Down Expand Up @@ -698,13 +700,160 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
}
}

// TestAggregateCluster_BadEDS_GoodToBadDNS tests the case where the top-level
// cluster is an aggregate cluster that resolves to an EDS and LOGICAL_DNS
// cluster. The test first sends an EDS response which triggers an NACK. Once
// the DNS resolver pushes an update, the test verifies that we switch to the
// DNS cluster and can make a successful RPC.
func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, _ := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
// cluster. Also configure an empty endpoints resource for the EDS cluster
// that contains no endpoints.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
{
LoadBalancingWeight: &wrapperspb.UInt32Value{
Value: 0, // causes an NACK
},
},
}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{nackEndpointResource},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs})

// Ensure that RPCs start getting routed to the first backend since the
// child policy for a LOGICAL_DNS cluster is pick_first by default.
pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0])
}

// TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
// cluster is an aggregate cluster that resolves to an LOGICAL_DNS and EDS
// cluster. When the DNS Resolver returns an error and EDS cluster returns a
// good update, this test verifies the cluster_resolver balancer correctly falls
// back from the LOGICAL_DNS cluster to the EDS cluster.
func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()

// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()

// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)

// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
// cluster. Also configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}

// Push an error through the DNS resolver.
dnsR.ReportError(errors.New("some error"))

// RPCs should work, higher level DNS cluster errors so should fallback to
// EDS cluster.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
}
}

// TestAggregateCluster_BadEDS_BadDNS tests the case where the top-level cluster
// is an aggregate cluster that resolves to an EDS and LOGICAL_DNS cluster. When
// the EDS request returns a resource that contains no endpoints, the test
// verifies that we switch to the DNS cluster. When the DNS cluster returns an
// error, the test verifies that RPCs fail with the error returned by the DNS
// resolver, and thus, ensures that pick_first (the leaf policy) does not ignore
// resolver errors.
// error, the test verifies that RPCs fail with the error triggered by the DNS
// Discovery Mechanism (from sending an empty address list down).
func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
Expand Down Expand Up @@ -769,14 +918,14 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
dnsErr := fmt.Errorf("DNS error")
dnsR.ReportError(dnsErr)

// Ensure that the error returned from the DNS resolver is reported to the
// caller of the RPC.
// Ensure that the error from the DNS Resolver leads to an empty address
// update for both priorities.
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable)
}
if err == nil || !strings.Contains(err.Error(), dnsErr.Error()) {
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
if err == nil || !strings.Contains(err.Error(), "produced zero addresses") {
t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err)
}
}

Expand Down
25 changes: 12 additions & 13 deletions xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
Expand Down Expand Up @@ -538,7 +537,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}
defer cc.Close()
testClient := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
t.Fatal(err)
}

Expand All @@ -561,7 +560,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -921,7 +920,7 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1070,31 +1069,31 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) {
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
t.Fatal(err)
}
}

// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
// TestServiceClient until they fail with an error which indicates that all
// priorities have been removed. A non-nil error is returned if the context
// expires before RPCs fail with the expected error.
func waitForAllPrioritiesRemovedError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
// TestServiceClient until they fail with an error which indicates that no
// resolver addresses have been produced. A non-nil error is returned if the
// context expires before RPCs fail with the expected error.
func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
t.Log("EmptyCall() succeeded after EDS update with no localities")
t.Log("EmptyCall() succeeded after error in Discovery Mechanism")
continue
}
if code := status.Code(err); code != codes.Unavailable {
t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
continue
}
if !strings.Contains(err.Error(), priority.ErrAllPrioritiesRemoved.Error()) {
t.Logf("EmptyCall() = %v, want %v", err, priority.ErrAllPrioritiesRemoved)
if !strings.Contains(err.Error(), "produced zero addresses") {
t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses")
continue
}
return nil
}
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and priority.ErrAllPrioritiesRemoved error")
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses")
}
9 changes: 0 additions & 9 deletions xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type resourceUpdate struct {
// from underlying concrete resolvers.
type topLevelResolver interface {
onUpdate()
onError(error)
}

// endpointsResolver wraps the functionality to resolve a given resource name to
Expand Down Expand Up @@ -277,11 +276,3 @@ func (rr *resourceResolver) onUpdate() {
rr.generateLocked()
rr.mu.Unlock()
}

func (rr *resourceResolver) onError(err error) {
select {
case <-rr.updateChannel:
default:
}
rr.updateChannel <- &resourceUpdate{err: err}
}
28 changes: 25 additions & 3 deletions xds/internal/balancer/clusterresolver/resource_resolver_dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,21 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
}
u, err := url.Parse("dns:///" + target)
if err != nil {
topLevelResolver.onError(fmt.Errorf("failed to parse dns hostname %q in clusterresolver LB policy", target))
if ret.logger.V(2) {
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
return ret
}

r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{})
if err != nil {
topLevelResolver.onError(fmt.Errorf("failed to build DNS resolver for target %q: %v", target, err))
if ret.logger.V(2) {
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
}
ret.updateReceived = true
ret.topLevelResolver.onUpdate()
return ret
}
ret.dnsR = r
Expand Down Expand Up @@ -142,7 +150,21 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.logger.Infof("DNS discovery mechanism for resource %q reported error: %v", dr.target, err)
}

dr.topLevelResolver.onError(err)
dr.mu.Lock()
// If a previous good update was received, suppress the error and continue
// using the previous update. If RPCs were succeeding prior to this, they
// will continue to do so. Also suppress errors if we previously received an
// error, since there will be no downstream effects of propagating this
// error.
if dr.updateReceived {
dr.mu.Unlock()
return
}
dr.addrs = nil
dr.updateReceived = true
dr.mu.Unlock()

dr.topLevelResolver.onUpdate()
}

func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
Expand Down

0 comments on commit 2aa2615

Please sign in to comment.