diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 6e50ab56d86d..ed241124219e 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -415,7 +415,7 @@ func (p *picker) inc() uint32 { } func (p *picker) regenerateScheduler() { - s := p.newScheduler() + s := p.newScheduler(true) atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s)) } @@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect w.SubConn.Connect() case connectivity.Ready: // If we transition back to READY state, reset nonEmptySince so that we - // apply the blackout period after we start receiving load data. Note - // that we cannot guarantee that we will never receive lingering - // callbacks for backend metric reports from the previous connection - // after the new connection has been established, but they should be - // masked by new backend metric reports from the new connection by the - // time the blackout period ends. + // apply the blackout period after we start receiving load data. Also + // reset lastUpdated to trigger endpoint weight not yet usable in the + // case endpoint gets asked what weight it is before receiving a new + // load report. Note that we cannot guarantee that we will never receive + // lingering callbacks for backend metric reports from the previous + // connection after the new connection has been established, but they + // should be masked by new backend metric reports from the new + // connection by the time the blackout period ends. w.mu.Lock() w.nonEmptySince = time.Time{} + w.lastUpdated = time.Time{} w.mu.Unlock() case connectivity.Shutdown: if w.stopORCAListener != nil { @@ -592,7 +595,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect // account the parameters. Returns 0 for blacked out or expired data, which // will cause the backend weight to be treated as the mean of the weights of the // other backends. If forScheduler is set to true, this function will emit -// metrics through the mtrics registry. +// metrics through the metrics registry. func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) { w.mu.Lock() defer w.mu.Unlock() @@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout }() } + // The SubConn has not received a load report (i.e. just turned READY with + // no load report). + if w.lastUpdated == (time.Time{}) { + endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality) + return 0 + } + // If the most recent update was longer ago than the expiration period, // reset nonEmptySince so that we apply the blackout period again if we // start getting data again in the future, and return 0. diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index 6567cdfb3d93..6ffddc0a7739 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/orca" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -81,6 +82,14 @@ var ( WeightUpdatePeriod: stringp(".050s"), ErrorUtilizationPenalty: float64p(0), } + testMetricsConfig = iwrr.LBConfig{ + EnableOOBLoadReport: boolp(false), + OOBReportingPeriod: stringp("0.005s"), + BlackoutPeriod: stringp("0s"), + WeightExpirationPeriod: stringp("60s"), + WeightUpdatePeriod: stringp(".050s"), + ErrorUtilizationPenalty: float64p(0), + } ) type testServer struct { @@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) { } } +// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It +// configures a weighted round robin balancer as the top level balancer of a +// ClientConn, and configures a fake stats handler on the ClientConn to receive +// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on +// balancer startup case which triggers the first picker and scheduler update +// before any load reports are received. +// +// Note that this test and others, metrics emission asssertions are a snapshot +// of the most recently emitted metrics. This is due to the nondeterminism of +// scheduler updates with respect to test bodies, so the assertions made are +// from the most recently synced state of the system (picker/scheduler) from the +// test body. +func (s) TestWRRMetricsBasic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + srv := startServer(t, reportCall) + sc := svcConfig(t, testMetricsConfig) + + mr := stats.NewTestMetricsRecorder(t) + if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + srv.callMetrics.SetQPS(float64(1)) + + if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("Error from EmptyCall: %v", err) + } + + mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn. + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted). + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + // Unusable, so no endpoint weight. Due to only one SubConn, this will never + // update the weight. Thus, this will stay 0. + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) +} + // Tests two addresses with ORCA reporting disabled (should fall back to pure // RR). func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) { diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go new file mode 100644 index 000000000000..0ff43dc26e52 --- /dev/null +++ b/balancer/weightedroundrobin/metrics_test.go @@ -0,0 +1,163 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package weightedroundrobin + +import ( + "testing" + "time" + + "google.golang.org/grpc/internal/grpctest" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils/stats" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestWRR_Metrics_SubConnWeight tests different scenarios for the weight call +// on a weighted SubConn, and expects certain metrics for each of these +// scenarios. +func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { + tests := []struct { + name string + weightExpirationPeriod time.Duration + blackoutPeriod time.Duration + lastUpdated time.Time + nonEmpty time.Time + nowTime time.Time + endpointWeightStaleWant float64 + endpointWeightNotYetUsableWant float64 + endpointWeightWant float64 + }{ + // The weighted SubConn's lastUpdated field hasn't been set, so this + // SubConn's weight is not yet usable. Thus, should emit that endpoint + // weight is not yet usable, and 0 for weight. + { + name: "no weight set", + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "weight expiration", + lastUpdated: time.Now(), + weightExpirationPeriod: 2 * time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(100 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, + { + name: "in blackout period", + lastUpdated: time.Now(), + weightExpirationPeriod: time.Minute, + blackoutPeriod: 10 * time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "normal weight", + lastUpdated: time.Now(), + nonEmpty: time.Now(), + weightExpirationPeriod: time.Minute, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 3, + }, + { + name: "weight expiration takes precdedence over blackout", + lastUpdated: time.Now(), + nonEmpty: time.Now(), + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Minute, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t) + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 3, + lastUpdated: test.lastUpdated, + nonEmptySince: test.nonEmpty, + } + wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true) + + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant) + }) + } + +} + +// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric +// for scheduler updates. It tests the case with one SubConn, and two SubConns +// with no weights. Both of these should emit a count metric for round robin +// fallback. +func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t) + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 0, + } + + p := &picker{ + cfg: &lbConfig{ + BlackoutPeriod: iserviceconfig.Duration(10 * time.Second), + WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute), + }, + subConns: []*weightedSubConn{wsc}, + metricsRecorder: tmr, + } + // There is only one SubConn, so no matter if the SubConn has a weight or + // not will fallback to round robin. + p.regenerateScheduler() + tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + tmr.ClearMetrics() + + // With two SubConns, if neither of them have weights, it will also fallback + // to round robin. + wsc2 := &weightedSubConn{ + target: "target", + metricsRecorder: tmr, + weightVal: 0, + } + p.subConns = append(p.subConns, wsc2) + p.regenerateScheduler() + tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) +} diff --git a/balancer/weightedroundrobin/scheduler.go b/balancer/weightedroundrobin/scheduler.go index a33c5591ddd5..56aa15da10d2 100644 --- a/balancer/weightedroundrobin/scheduler.go +++ b/balancer/weightedroundrobin/scheduler.go @@ -31,14 +31,16 @@ type scheduler interface { // len(scWeights)-1 are zero or there is only a single subconn, otherwise it // will return an Earliest Deadline First (EDF) scheduler implementation that // selects the subchannels according to their weights. -func (p *picker) newScheduler() scheduler { - scWeights := p.scWeights(true) +func (p *picker) newScheduler(recordMetrics bool) scheduler { + scWeights := p.scWeights(recordMetrics) n := len(scWeights) if n == 0 { return nil } if n == 1 { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + if recordMetrics { + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + } return &rrScheduler{numSCs: 1, inc: p.inc} } sum := float64(0) @@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler { } if numZero >= n-1 { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + if recordMetrics { + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + } return &rrScheduler{numSCs: uint32(n), inc: p.inc} } unscaledMean := sum / float64(n-numZero) diff --git a/internal/stats/metrics_recorder_list_test.go b/internal/stats/metrics_recorder_list_test.go index b09ad043ce65..c58266a31bf8 100644 --- a/internal/stats/metrics_recorder_list_test.go +++ b/internal/stats/metrics_recorder_list_test.go @@ -144,8 +144,8 @@ func (s) TestMetricsRecorderList(t *testing.T) { // Create two stats.Handlers which also implement MetricsRecorder, configure // one as a global dial option and one as a local dial option. - mr1 := stats.NewTestMetricsRecorder(t, []string{}) - mr2 := stats.NewTestMetricsRecorder(t, []string{}) + mr1 := stats.NewTestMetricsRecorder(t) + mr2 := stats.NewTestMetricsRecorder(t) defer internal.ClearGlobalDialOptions() internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1)) diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index 25817be50b37..f36089d47ff5 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -21,7 +21,9 @@ package stats import ( "context" + "sync" "testing" + "time" "github.com/google/go-cmp/cmp" estats "google.golang.org/grpc/experimental/stats" @@ -41,10 +43,14 @@ type TestMetricsRecorder struct { intHistoCh *testutils.Channel floatHistoCh *testutils.Channel intGaugeCh *testutils.Channel + + // The most recent update for each metric name. + mu sync.Mutex + data map[estats.Metric]float64 } -func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder { - return &TestMetricsRecorder{ +func NewTestMetricsRecorder(t *testing.T) *TestMetricsRecorder { + tmr := &TestMetricsRecorder{ t: t, intCountCh: testutils.NewChannelWithSize(10), @@ -52,7 +58,43 @@ func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder intHistoCh: testutils.NewChannelWithSize(10), floatHistoCh: testutils.NewChannelWithSize(10), intGaugeCh: testutils.NewChannelWithSize(10), + + data: make(map[estats.Metric]float64), + } + + return tmr +} + +// AssertDataForMetric asserts data is present for metric. The zero value in the +// check is equivalent to unset. +func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal float64) { + r.mu.Lock() + defer r.mu.Unlock() + if r.data[estats.Metric(metricName)] != wantVal { + r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", metricName, r.data[estats.Metric(metricName)], wantVal) + } +} + +// PollForDataForMetric polls the metric data for the want. Fails if context +// provided expires before data for metric is found. +func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + r.mu.Lock() + if r.data[estats.Metric(metricName)] == wantVal { + r.mu.Unlock() + return + } + r.mu.Unlock() } + r.t.Fatalf("Timeout waiting for data %v for metric %v", wantVal, metricName) +} + +// ClearMetrics clears the metrics data stores of the test metrics recorder by +// setting all the data to 0. +func (r *TestMetricsRecorder) ClearMetrics() { + r.mu.Lock() + defer r.mu.Unlock() + r.data = make(map[estats.Metric]float64) } type MetricsData struct { @@ -85,6 +127,10 @@ func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDataWant MetricsData) { @@ -105,6 +151,10 @@ func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHand LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = incr } func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsDataWant MetricsData) { @@ -125,6 +175,10 @@ func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDataWant MetricsData) { @@ -145,6 +199,10 @@ func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHand LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = incr } func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsDataWant MetricsData) { @@ -165,6 +223,10 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } // To implement a stats.Handler, which allows it to be set as a dial option: diff --git a/internal/testutils/xds/e2e/setup/setup.go b/internal/testutils/xds/e2e/setup/setup.go new file mode 100644 index 000000000000..f7b34f669629 --- /dev/null +++ b/internal/testutils/xds/e2e/setup/setup.go @@ -0,0 +1,62 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package setup implements setup helpers for xDS e2e tests. +package setup + +import ( + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + _ "google.golang.org/grpc/xds" // Register the xds_resolver. +) + +// ManagementServerAndResolver sets up an xDS management server, creates +// bootstrap configuration pointing to that server and creates an xDS resolver +// using that configuration. +// +// Registers a cleanup function on t to stop the management server. +// +// Returns the following: +// - the xDS management server +// - the node ID to use when talking to this management server +// - bootstrap configuration to use (if creating an xDS-enabled gRPC server) +// - xDS resolver builder (if creating an xDS-enabled gRPC client) +func ManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) { + // Start an xDS management server. + xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address) + + // Create an xDS resolver with the above bootstrap configuration. + var r resolver.Builder + var err error + if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { + r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + } + + return xdsServer, nodeID, bc, r +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 0e8558aae72e..e56c0fe94805 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -18,16 +18,35 @@ package opentelemetry_test import ( "context" + "fmt" + "io" "testing" "time" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3" + v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" + itestutils "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + setup "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/orca" "google.golang.org/grpc/stats/opentelemetry" "google.golang.org/grpc/stats/opentelemetry/internal/testutils" @@ -47,10 +66,10 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// setup creates a stub server with OpenTelemetry component configured on client +// setupStubServer creates a stub server with OpenTelemetry component configured on client // and server side. It returns a reader for metrics emitted from OpenTelemetry // component and the server. -func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() provider := metric.NewMeterProvider(metric.WithReader(reader)) ss := &stubserver.StubServer{ @@ -93,7 +112,7 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // Will allow duplex/any other type of RPC. return str != testgrpc.TestService_UnaryCall_FullMethodName } - reader, ss := setup(t, maf) + reader, ss := setupStubServer(t, maf) defer ss.Stop() // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the @@ -178,7 +197,7 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setup(t, nil) + reader, ss := setupStubServer(t, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -304,3 +323,262 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } } } + +// clusterWithLBConfiguration returns a cluster resource with the proto message +// passed Marshaled to an any and specified through the load_balancing_policy +// field. +func clusterWithLBConfiguration(t *testing.T, clusterName, edsServiceName string, secLevel e2e.SecurityLevel, m proto.Message) *v3clusterpb.Cluster { + cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) + cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: itestutils.MarshalAny(t, m), + }, + }, + }, + } + return cluster +} + +func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + return gotMetrics +} + +// TestWRRMetrics tests the metrics emitted from the WRR LB Policy. It +// configures WRR as an endpoint picking policy through xDS on a ClientConn +// alongside an OpenTelemetry stats handler. It makes a few RPC's, and then +// sleeps for a bit to allow weight to expire. It then asserts OpenTelemetry +// metrics atoms are eventually present for all four WRR Metrics, alongside the +// correct target and locality label for each metric. +func (s) TestWRRMetrics(t *testing.T) { + cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder) + backend1 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) + port1 := itestutils.ParsePort(t, backend1.Address) + defer backend1.Stop() + + cmr.SetQPS(10.0) + cmr.SetApplicationUtilization(1.0) + + backend2 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) + port2 := itestutils.ParsePort(t, backend2.Address) + defer backend2.Stop() + + const serviceName = "my-service-client-side-xds" + + // Start an xDS management server. + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + + wrrConfig := &v3wrrlocalitypb.WrrLocality{ + EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: itestutils.MarshalAny(t, &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{ + EnableOobLoadReport: &wrapperspb.BoolValue{ + Value: false, + }, + // BlackoutPeriod long enough to cause load report + // weight to trigger in the scope of test case. + // WeightExpirationPeriod will cause the load report + // weight for backend 1 to expire. + BlackoutPeriod: durationpb.New(5 * time.Millisecond), + WeightExpirationPeriod: durationpb.New(500 * time.Millisecond), + WeightUpdatePeriod: durationpb.New(time.Second), + ErrorUtilizationPenalty: &wrapperspb.FloatValue{Value: 1}, + }), + }, + }, + }, + }, + } + + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(t, clusterName, endpointsName, e2e.SecurityLevelNone, wrrConfig)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}}, + Weight: 1, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"), + OptionalLabels: []string{"grpc.lb.locality"}, + } + + target := fmt.Sprintf("xds:///%s", serviceName) + cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + + // Make 100 RPC's. The two backends will send back load reports per call + // giving the two SubChannels weights which will eventually expire. Two + // backends needed as for only one backend, WRR does not recompute the + // scheduler. + receivedExpectedMetrics := grpcsync.NewEvent() + go func() { + for !receivedExpectedMetrics.HasFired() { + client.EmptyCall(ctx, &testpb.Empty{}) + time.Sleep(2 * time.Millisecond) + } + }() + + targetAttr := attribute.String("grpc.target", target) + localityAttr := attribute.String("grpc.lb.locality", `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`) + + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.lb.wrr.rr_fallback", + Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.", + Unit: "update", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + + { + Name: "grpc.lb.wrr.endpoint_weight_not_yet_usable", + Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).", + Unit: "endpoint", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.lb.wrr.endpoint_weights", + Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.", + Unit: "endpoint", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } + + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatal(err) + } + receivedExpectedMetrics.Fire() + + // Poll for 5 seconds for weight expiration metric. No more RPC's are being + // made, so weight should expire on a subsequent scheduler update. + eventuallyWantMetric := metricdata.Metrics{ + Name: "grpc.lb.wrr.endpoint_weight_stale", + Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.", + Unit: "endpoint", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + } + + if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil { + t.Fatal(err) + } +} + +// pollForWantMetrics polls for the wantMetrics to show up on reader. Returns an +// error if metric is present but not equal to expected, or if the wantMetrics +// do not show up during the context timeout. +func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + gotMetrics := metricsDataFromReader(ctx, reader) + containsAllMetrics := true + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + containsAllMetrics = false + break + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name) + } + } + if containsAllMetrics { + return nil + } + time.Sleep(5 * time.Millisecond) + } + + return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) +} diff --git a/stats/opentelemetry/go.mod b/stats/opentelemetry/go.mod index d6af0becf5b4..549f296384e2 100644 --- a/stats/opentelemetry/go.mod +++ b/stats/opentelemetry/go.mod @@ -5,6 +5,7 @@ go 1.21 replace google.golang.org/grpc => ../.. require ( + github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 github.com/google/go-cmp v0.6.0 go.opentelemetry.io/contrib/detectors/gcp v1.27.0 go.opentelemetry.io/otel v1.27.0 @@ -20,11 +21,12 @@ require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect - github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/otel/trace v1.27.0 // indirect golang.org/x/net v0.26.0 // indirect diff --git a/stats/opentelemetry/go.sum b/stats/opentelemetry/go.sum index 845f63cad7ee..1dfe121fa2ab 100644 --- a/stats/opentelemetry/go.sum +++ b/stats/opentelemetry/go.sum @@ -6,6 +6,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0/go.mod h1:p2puVVSKjQ84Qb1gzw2XHLs34WQyHTYFZLaVxypAFYs= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -21,6 +23,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/test/xds/xds_client_affinity_test.go b/test/xds/xds_client_affinity_test.go index a8439e0673db..e7db416a7156 100644 --- a/test/xds/xds_client_affinity_test.go +++ b/test/xds/xds_client_affinity_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -83,7 +84,7 @@ func ringhashCluster(clusterName, edsServiceName string) *v3clusterpb.Cluster { // propagated to pick the ring_hash policy. It doesn't test the affinity // behavior in ring_hash policy. func (s) TestClientSideAffinitySanityCheck(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_client_certificate_providers_test.go b/test/xds/xds_client_certificate_providers_test.go index 49bd00feba0c..3f705210dd33 100644 --- a/test/xds/xds_client_certificate_providers_test.go +++ b/test/xds/xds_client_certificate_providers_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -60,7 +61,7 @@ import ( // used on the client. func (s) TestClientSideXDS_WithNoCertificateProvidersInBootstrap_Success(t *testing.T) { // Spin up an xDS management server. - mgmtServer, nodeID, _, resolverBuilder := setupManagementServerAndResolver(t) + mgmtServer, nodeID, _, resolverBuilder := setup.ManagementServerAndResolver(t) // Spin up a test backend. server := stubserver.StartTestService(t, nil) diff --git a/test/xds/xds_client_custom_lb_test.go b/test/xds/xds_client_custom_lb_test.go index d0a5e56f0534..8d87a89753c7 100644 --- a/test/xds/xds_client_custom_lb_test.go +++ b/test/xds/xds_client_custom_lb_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/roundrobin" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" @@ -222,7 +223,7 @@ func (s) TestWrrLocality(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Start an xDS management server. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) routeConfigName := "route-" + serviceName clusterName := "cluster-" + serviceName diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index 4d0346404d06..55d428d88928 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" @@ -250,7 +251,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // server and actually making an RPC ensures that the xDS client is // configured properly, and when we dial with an unknown authority in the // next step, we can be sure that the error we receive is legitimate. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() @@ -298,7 +299,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // with an authority which is not specified in the bootstrap configuration. The // test verifies that RPCs fail with an appropriate error. func (s) TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { - mgmtServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // LDS is old style name. // RDS is new style, with an unknown authority. diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go index 150171dc8161..f3b5d6e6ec43 100644 --- a/test/xds/xds_client_integration_test.go +++ b/test/xds/xds_client_integration_test.go @@ -24,15 +24,13 @@ import ( "testing" "time" - "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/resolver" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -51,40 +49,8 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) -// setupManagementServerAndResolver sets up an xDS management server, creates -// bootstrap configuration pointing to that server and creates an xDS resolver -// using that configuration. -// -// Registers a cleanup function on t to stop the management server. -// -// Returns the following: -// - the xDS management server -// - the node ID to use when talking to this management server -// - bootstrap configuration to use (if creating an xDS-enabled gRPC server) -// - xDS resolver builder (if creating an xDS-enabled gRPC client) -func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) { - // Start an xDS management server. - xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) - - // Create bootstrap configuration pointing to the above management server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address) - - // Create an xDS resolver with the above bootstrap configuration. - var r resolver.Builder - var err error - if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { - r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) - if err != nil { - t.Fatalf("Failed to create xDS resolver for testing: %v", err) - } - } - - return xdsServer, nodeID, bc, r -} - func (s) TestClientSideXDS(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index eb91b87b14ca..4df142f4a7c1 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/peer" @@ -50,7 +51,7 @@ import ( // Detection balancer. This test verifies that an RPC is able to proceed // normally with this configuration. func (s) TestOutlierDetection_NoopConfig(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := &stubserver.StubServer{ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -161,7 +162,7 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, // the unhealthy upstream is ejected, RPC's should regularly round robin across // all three upstreams. func (s) TestOutlierDetectionWithOutlier(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // Working backend 1. backend1 := stubserver.StartTestService(t, nil) @@ -242,7 +243,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) { // Detection present in the CDS update, but with SuccessRateEjection unset, and // asserts that Outlier Detection is turned on and ejects upstreams. func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // Working backend 1. backend1 := stubserver.StartTestService(t, nil) diff --git a/test/xds/xds_client_retry_test.go b/test/xds/xds_client_retry_test.go index 78c1c95d462c..f4c4ed38d507 100644 --- a/test/xds/xds_client_retry_test.go +++ b/test/xds/xds_client_retry_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/wrapperspb" @@ -41,7 +42,7 @@ func (s) TestClientSideRetry(t *testing.T) { ctr := 0 errs := []codes.Code{codes.ResourceExhausted} - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go index d0eb753a6fce..98c802fcf78e 100644 --- a/test/xds/xds_rls_clusterspecifier_plugin_test.go +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/protobuf/types/known/durationpb" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" @@ -105,7 +106,7 @@ func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) { // Set up all components and configuration necessary - management server, // xDS resolver, fake RLS Server, and xDS configuration which specifies an // RLS Balancer that communicates to this set up fake RLS Server. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_security_config_nack_test.go b/test/xds/xds_security_config_nack_test.go index 697d135f57c6..d14f9821411d 100644 --- a/test/xds/xds_security_config_nack_test.go +++ b/test/xds/xds_security_config_nack_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -147,7 +148,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_certificate_providers_test.go b/test/xds/xds_server_certificate_providers_test.go index 3359bc58d354..0932f318e1cd 100644 --- a/test/xds/xds_server_certificate_providers_test.go +++ b/test/xds/xds_server_certificate_providers_test.go @@ -34,6 +34,7 @@ import ( xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds" "google.golang.org/protobuf/types/known/wrapperspb" @@ -57,7 +58,7 @@ import ( // credentials are getting used on the server. func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Success(t *testing.T) { // Spin up an xDS management server. - mgmtServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + mgmtServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) // Spin up an xDS-enabled gRPC server that uses xDS credentials with // insecure fallback, and the above bootstrap configuration. diff --git a/test/xds/xds_server_integration_test.go b/test/xds/xds_server_integration_test.go index 08aed76137c2..1525cd1a65ee 100644 --- a/test/xds/xds_server_integration_test.go +++ b/test/xds/xds_server_integration_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" "google.golang.org/grpc/xds" @@ -143,7 +144,7 @@ func hostPortFromListener(lis net.Listener) (string, uint32, error) { // the client and the server. This results in both of them using the // configured fallback credentials (which is insecure creds in this case). func (s) TestServerSideXDS_Fallback(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -224,7 +225,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_rbac_test.go b/test/xds/xds_server_rbac_test.go index 13cce4f2ecf5..70000e2a8a2c 100644 --- a/test/xds/xds_server_rbac_test.go +++ b/test/xds/xds_server_rbac_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" @@ -59,7 +60,7 @@ import ( // (NonForwardingAction), and the RPC's matching those routes should proceed as // normal. func (s) TestServerSideXDS_RouteConfiguration(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -626,7 +627,7 @@ func (s) TestRBACHTTPFilter(t *testing.T) { } audit.RegisterLoggerBuilder(lb) - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -802,7 +803,7 @@ func serverListenerWithBadRouteConfiguration(t *testing.T, host string, port uin } func (s) TestRBACToggledOn_WithBadRouteConfiguration(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_serving_mode_test.go b/test/xds/xds_server_serving_mode_test.go index d68436e34597..40bc1f6898c0 100644 --- a/test/xds/xds_server_serving_mode_test.go +++ b/test/xds/xds_server_serving_mode_test.go @@ -31,6 +31,7 @@ import ( xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/xds" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -43,7 +44,7 @@ import ( // change callback is not invoked and client connections to the server are not // recycled. func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) if err != nil { @@ -165,7 +166,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in // the server, and also verifies behavior of clientConns under these modes. func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) // Configure xDS credentials to be used on the server-side. creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{ diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go index 20adfea7ab6f..3ede7af3cb0e 100644 --- a/test/xds/xds_server_test.go +++ b/test/xds/xds_server_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/grpc/xds" @@ -55,7 +56,7 @@ var ( // dynamically, and subsequent RPC's on that connection should start failing // with status code UNAVAILABLE. func (s) TestServeLDSRDS(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { @@ -165,7 +166,7 @@ func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.Clie // serving, successfully Accept Connections, and fail at the L7 level with a // certain error message. func (s) TestRDSNack(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) @@ -235,7 +236,7 @@ func (s) TestRDSNack(t *testing.T) { // RPC's will match to). This configuration should eventually be represented in // the Server's state, and RPCs should proceed successfully. func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) diff --git a/test/xds/xds_telemetry_labels_test.go b/test/xds/xds_telemetry_labels_test.go index 9068273628ee..7a0e76227a32 100644 --- a/test/xds/xds_telemetry_labels_test.go +++ b/test/xds/xds_telemetry_labels_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/stats" @@ -53,7 +54,7 @@ const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1 // handler asserts that subsequent HandleRPC calls from the RPC lifecycle // contain telemetry labels that it can see. func (s) TestTelemetryLabels(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop()