Skip to content

Commit

Permalink
Responded to Doug's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Aug 7, 2024
1 parent 6ff2e00 commit 5e8a314
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 105 deletions.
137 changes: 84 additions & 53 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,63 +39,94 @@ func Test(t *testing.T) {
// on a weighted SubConn, and expects certain metrics for each of these
// scenarios.
func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})

wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
tests := []struct {
name string
weightExpirationPeriod time.Duration
blackoutPeriod time.Duration
lastUpdatedSet bool
nonEmptySet bool
nowTime time.Time
endpointWeightStaleWant float64
endpointWeightNotYetUsableWant float64
endpointWeightWant float64
}{
// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
{
name: "no weight set",
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "weight expiration",
lastUpdatedSet: true,
weightExpirationPeriod: 2 * time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(100 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
{
name: "in blackout period",
lastUpdatedSet: true,
weightExpirationPeriod: time.Minute,
blackoutPeriod: 10 * time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "normal weight",
lastUpdatedSet: true,
nonEmptySet: true,
weightExpirationPeriod: time.Minute,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 3,
},
{
name: "weight expiration takes precdedence over blackout",
lastUpdatedSet: true,
nonEmptySet: true,
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Minute,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
}

// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
wsc.weight(time.Now(), time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0.
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight expires. Thus, should emit
// that endpoint weight is stale, and 0 for weight.
wsc.lastUpdated = time.Now()
wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight is in the blackout period.
// Thus, should emit that endpoint weight is not yet usable, and 0 for
// weight.
wsc.weight(time.Now(), time.Minute, 10*time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
}
if test.lastUpdatedSet {
wsc.lastUpdated = time.Now()
}
if test.nonEmptySet {
wsc.nonEmptySince = time.Now()
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

// Setup a scenario where SubConn's weight is what is persists in weight
// field. This is triggered by last update being past blackout period and
// before weight update period. Should thus emit that endpoint weight is 3,
// and no other metrics.
wsc.nonEmptySince = time.Now()
wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3)
tmr.ClearMetrics()
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
})
}

// Setup a scenario where a SubConn's weight both expires and is within the
// blackout period. In this case, weight expiry should take precedence with
// respect to metrics emitted. Thus, should emit that endpoint weight is not
// yet usable, and 0 for weight.
wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()
}

// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric
Expand Down
13 changes: 1 addition & 12 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,14 @@ func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal flo
}
}

// AssertEitherDataForMetric asserts either data point is present for metric.
// The zero value in the check is equivalent to unset.

func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) {
r.mu.Lock()
defer r.mu.Unlock()
if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 {
r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2)
}
}

// PollForDataForMetric polls the metric data for the want. Fails if context
// provided expires before data for metric is found.
func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
r.mu.Lock()
if r.data[estats.Metric(metricName)] == wantVal {
r.mu.Unlock()
break
return
}
r.mu.Unlock()
}
Expand Down
10 changes: 2 additions & 8 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,9 @@ func (s) TestWRRMetrics(t *testing.T) {
// scheduler.
receivedExpectedMetrics := grpcsync.NewEvent()
go func() {
for i := 0; i < 100; i++ {
for !receivedExpectedMetrics.HasFired() {
client.EmptyCall(ctx, &testpb.Empty{})
if receivedExpectedMetrics.HasFired() {
break
}
time.Sleep(2 * time.Millisecond)
}
}()

Expand Down Expand Up @@ -554,10 +552,6 @@ func (s) TestWRRMetrics(t *testing.T) {
},
}

if ctx.Err() != nil {
t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name)
}

if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil {
t.Fatal(err)
}
Expand Down
32 changes: 0 additions & 32 deletions test/xds/xds_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,6 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

// setupManagementServerAndResolver sets up an xDS management server, creates
// bootstrap configuration pointing to that server and creates an xDS resolver
// using that configuration.
//
// Registers a cleanup function on t to stop the management server.
//
// Returns the following:
// - the xDS management server
// - the node ID to use when talking to this management server
// - bootstrap configuration to use (if creating an xDS-enabled gRPC server)
// - xDS resolver builder (if creating an xDS-enabled gRPC client)
func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) {
// Start an xDS management server.
xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address)

// Create an xDS resolver with the above bootstrap configuration.
var r resolver.Builder
var err error
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

return xdsServer, nodeID, bc, r
}

func (s) TestClientSideXDS(t *testing.T) {
managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)

Expand Down

0 comments on commit 5e8a314

Please sign in to comment.