Skip to content

Commit

Permalink
Responded to Doug's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Aug 7, 2024
1 parent 6ff2e00 commit 761b729
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 116 deletions.
3 changes: 1 addition & 2 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,7 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
cfg := oobConfig
cfg.OOBReportingPeriod = stringp("60s")
sc := svcConfig(t, cfg)
mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
Expand Down
137 changes: 84 additions & 53 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,63 +39,94 @@ func Test(t *testing.T) {
// on a weighted SubConn, and expects certain metrics for each of these
// scenarios.
func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})

wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
tests := []struct {
name string
weightExpirationPeriod time.Duration
blackoutPeriod time.Duration
lastUpdatedSet bool
nonEmptySet bool
nowTime time.Time
endpointWeightStaleWant float64
endpointWeightNotYetUsableWant float64
endpointWeightWant float64
}{
// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
{
name: "no weight set",
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "weight expiration",
lastUpdatedSet: true,
weightExpirationPeriod: 2 * time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(100 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
{
name: "in blackout period",
lastUpdatedSet: true,
weightExpirationPeriod: time.Minute,
blackoutPeriod: 10 * time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "normal weight",
lastUpdatedSet: true,
nonEmptySet: true,
weightExpirationPeriod: time.Minute,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 3,
},
{
name: "weight expiration takes precdedence over blackout",
lastUpdatedSet: true,
nonEmptySet: true,
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Minute,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
}

// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
wsc.weight(time.Now(), time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0.
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight expires. Thus, should emit
// that endpoint weight is stale, and 0 for weight.
wsc.lastUpdated = time.Now()
wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight is in the blackout period.
// Thus, should emit that endpoint weight is not yet usable, and 0 for
// weight.
wsc.weight(time.Now(), time.Minute, 10*time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
}
if test.lastUpdatedSet {
wsc.lastUpdated = time.Now()
}
if test.nonEmptySet {
wsc.nonEmptySince = time.Now()
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

// Setup a scenario where SubConn's weight is what is persists in weight
// field. This is triggered by last update being past blackout period and
// before weight update period. Should thus emit that endpoint weight is 3,
// and no other metrics.
wsc.nonEmptySince = time.Now()
wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3)
tmr.ClearMetrics()
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
})
}

// Setup a scenario where a SubConn's weight both expires and is within the
// blackout period. In this case, weight expiry should take precedence with
// respect to metrics emitted. Thus, should emit that endpoint weight is not
// yet usable, and 0 for weight.
wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()
}

// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric
Expand Down
23 changes: 6 additions & 17 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder
tmr := &TestMetricsRecorder{
t: t,

intCountCh: testutils.NewChannelWithSize(1000),
floatCountCh: testutils.NewChannelWithSize(1000),
intHistoCh: testutils.NewChannelWithSize(1000),
floatHistoCh: testutils.NewChannelWithSize(1000),
intGaugeCh: testutils.NewChannelWithSize(1000),
intCountCh: testutils.NewChannelWithSize(10),
floatCountCh: testutils.NewChannelWithSize(10),
intHistoCh: testutils.NewChannelWithSize(10),
floatHistoCh: testutils.NewChannelWithSize(10),
intGaugeCh: testutils.NewChannelWithSize(10),

data: make(map[estats.Metric]float64),
}
Expand All @@ -79,25 +79,14 @@ func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal flo
}
}

// AssertEitherDataForMetric asserts either data point is present for metric.
// The zero value in the check is equivalent to unset.

func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) {
r.mu.Lock()
defer r.mu.Unlock()
if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 {
r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2)
}
}

// PollForDataForMetric polls the metric data for the want. Fails if context
// provided expires before data for metric is found.
func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
r.mu.Lock()
if r.data[estats.Metric(metricName)] == wantVal {
r.mu.Unlock()
break
return

Check warning on line 89 in internal/testutils/stats/test_metrics_recorder.go

View check run for this annotation

Codecov / codecov/patch

internal/testutils/stats/test_metrics_recorder.go#L84-L89

Added lines #L84 - L89 were not covered by tests
}
r.mu.Unlock()

Check warning on line 91 in internal/testutils/stats/test_metrics_recorder.go

View check run for this annotation

Codecov / codecov/patch

internal/testutils/stats/test_metrics_recorder.go#L91

Added line #L91 was not covered by tests
}
Expand Down
10 changes: 2 additions & 8 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,9 @@ func (s) TestWRRMetrics(t *testing.T) {
// scheduler.
receivedExpectedMetrics := grpcsync.NewEvent()
go func() {
for i := 0; i < 100; i++ {
for !receivedExpectedMetrics.HasFired() {
client.EmptyCall(ctx, &testpb.Empty{})
if receivedExpectedMetrics.HasFired() {
break
}
time.Sleep(2 * time.Millisecond)
}
}()

Expand Down Expand Up @@ -554,10 +552,6 @@ func (s) TestWRRMetrics(t *testing.T) {
},
}

if ctx.Err() != nil {
t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name)
}

if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil {
t.Fatal(err)
}
Expand Down
35 changes: 0 additions & 35 deletions test/xds/xds_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ import (
"testing"
"time"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
"google.golang.org/grpc/resolver"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
Expand All @@ -52,38 +49,6 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

// setupManagementServerAndResolver sets up an xDS management server, creates
// bootstrap configuration pointing to that server and creates an xDS resolver
// using that configuration.
//
// Registers a cleanup function on t to stop the management server.
//
// Returns the following:
// - the xDS management server
// - the node ID to use when talking to this management server
// - bootstrap configuration to use (if creating an xDS-enabled gRPC server)
// - xDS resolver builder (if creating an xDS-enabled gRPC client)
func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) {
// Start an xDS management server.
xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address)

// Create an xDS resolver with the above bootstrap configuration.
var r resolver.Builder
var err error
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

return xdsServer, nodeID, bc, r
}

func (s) TestClientSideXDS(t *testing.T) {
managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)

Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_client_outlier_detection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) {
// Detection present in the CDS update, but with SuccessRateEjection unset, and
// asserts that Outlier Detection is turned on and ejects upstreams.
func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) {
managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t)
managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)

// Working backend 1.
backend1 := stubserver.StartTestService(t, nil)
Expand Down

0 comments on commit 761b729

Please sign in to comment.