Skip to content

Commit

Permalink
ringhash: port e2e tests from c-core (#7271)
Browse files Browse the repository at this point in the history
  • Loading branch information
atollena authored Jun 11, 2024
1 parent de51a63 commit 4dd7f55
Show file tree
Hide file tree
Showing 5 changed files with 910 additions and 54 deletions.
56 changes: 56 additions & 0 deletions internal/testutils/blocking_context_dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package testutils

import (
"context"
"net"
)

// BlockingDialer is a dialer that waits for Resume() to be called before
// dialing.
type BlockingDialer struct {
dialer *net.Dialer
blockCh chan struct{}
}

// NewBlockingDialer returns a dialer that waits for Resume() to be called
// before dialing.
func NewBlockingDialer() *BlockingDialer {
return &BlockingDialer{
dialer: &net.Dialer{},
blockCh: make(chan struct{}),
}
}

// DialContext implements a context dialer for use with grpc.WithContextDialer
// dial option for a BlockingDialer.
func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
select {
case <-d.blockCh:
case <-ctx.Done():
return nil, ctx.Err()
}
return d.dialer.DialContext(ctx, "tcp", addr)
}

// Resume unblocks the dialer. It panics if called more than once.
func (d *BlockingDialer) Resume() {
close(d.blockCh)
}
14 changes: 11 additions & 3 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ type LocalityOptions struct {
Weight uint32
// Backends is a set of backends belonging to this locality.
Backends []BackendOptions
// Priority is the priority of the locality. Defaults to 0.
Priority uint32
}

// BackendOptions contains options to configure individual backends in a
Expand All @@ -686,6 +688,8 @@ type BackendOptions struct {
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
// Weight sets the backend weight. Defaults to 1.
Weight uint32
}

// EndpointOptions contains options to configure an Endpoint (or
Expand All @@ -708,7 +712,7 @@ type EndpointOptions struct {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p})
bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Expand All @@ -729,6 +733,10 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, b := range locality.Backends {
// Weight defaults to 1.
if b.Weight == 0 {
b.Weight = 1
}
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
Expand All @@ -740,7 +748,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}},
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight},
})
}

Expand All @@ -752,7 +760,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
Priority: 0,
Priority: locality.Priority,
})
}

Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
// - the nodeID expected by the management server
// - the grpc channel to the test backend service
// - the manual resolver configured on the channel
// - the xDS cient used the grpc channel
// - the xDS client used the grpc channel
// - a channel on which requested cluster resource names are sent
// - a channel used to signal that previously requested cluster resources are
// no longer requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,18 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
// Returns the following:
// - a channel onto which the DNS target being resolved is written to by the
// mock DNS resolver
// - a channel to notify close of the DNS resolver
// - a channel to notify re-resolution requests to the DNS resolver
// - a manual resolver which is used to mock the actual DNS resolution
// - a cleanup function which re-registers the original DNS resolver
func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
func setupDNS(t *testing.T) (chan resolver.Target, *manual.Resolver) {
targetCh := make(chan resolver.Target, 1)
closeCh := make(chan struct{}, 1)
resolveNowCh := make(chan resolver.ResolveNowOptions, 1)

mr := manual.NewBuilderWithScheme("dns")
mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) {
targetCh <- target
}
mr.CloseCallback = func() { closeCh <- struct{}{} }
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }
mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }

dnsResolverBuilder := resolver.Get("dns")
resolver.Register(mr)

return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
t.Cleanup(func() { resolver.Register(dnsResolverBuilder) })
return targetCh, mr
}

// TestAggregateCluster_WithTwoEDSClusters tests the case where the top-level
Expand Down Expand Up @@ -471,8 +463,7 @@ func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
// cluster. The test verifies that RPCs fail until both clusters are resolved to
// endpoints, and RPCs are routed to the higher priority EDS cluster.
func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
dnsTargetCh, dnsR := setupDNS(t)

// Start an xDS management server that pushes the name of the requested EDS
// resource onto a channel.
Expand Down Expand Up @@ -661,8 +652,7 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
// still successful. This is the expected behavior because the cluster resolver
// policy eats errors from DNS Resolver after it has returned an error.
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
dnsTargetCh, dnsR := setupDNS(t)

// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
Expand Down
Loading

0 comments on commit 4dd7f55

Please sign in to comment.