Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/weightedroundrobin: Add recording point for endpoint weight not yet usable and add metrics tests #7466

Merged
merged 5 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (p *picker) inc() uint32 {
}

func (p *picker) regenerateScheduler() {
s := p.newScheduler()
s := p.newScheduler(true)
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
}

Expand Down Expand Up @@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
w.SubConn.Connect()
case connectivity.Ready:
// If we transition back to READY state, reset nonEmptySince so that we
// apply the blackout period after we start receiving load data. Note
// that we cannot guarantee that we will never receive lingering
// callbacks for backend metric reports from the previous connection
// after the new connection has been established, but they should be
// masked by new backend metric reports from the new connection by the
// time the blackout period ends.
// apply the blackout period after we start receiving load data. Also
// reset lastUpdated to trigger endpoint weight not yet usable in the
// case endpoint gets asked what weight it is before receiving a new
// load report. Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
w.mu.Lock()
w.nonEmptySince = time.Time{}
w.lastUpdated = time.Time{}
w.mu.Unlock()
case connectivity.Shutdown:
if w.stopORCAListener != nil {
Expand All @@ -592,7 +595,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the weights of the
// other backends. If forScheduler is set to true, this function will emit
// metrics through the mtrics registry.
// metrics through the metrics registry.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout
}()
}

// The SubConn has not received a load report (i.e. just turned READY with
// no load report).
if w.lastUpdated == (time.Time{}) {
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
return 0
}

// If the most recent update was longer ago than the expiration period,
// reset nonEmptySince so that we apply the blackout period again if we
// start getting data again in the future, and return 0.
Expand Down
46 changes: 46 additions & 0 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -81,6 +82,14 @@ var (
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
testMetricsConfig = iwrr.LBConfig{
EnableOOBLoadReport: boolp(false),
OOBReportingPeriod: stringp("0.005s"),
BlackoutPeriod: stringp("0s"),
WeightExpirationPeriod: stringp("60s"),
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
)

type testServer struct {
Expand Down Expand Up @@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) {
}
}

// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It
// configures a weighted round robin balancer as the top level balancer of a
// ClientConn, and configures a fake stats handler on the ClientConn to receive
// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on
// balancer startup case which triggers the first picker and scheduler update
// before any load reports are received.
//
// Note that this test and others, metrics emission asssertions are a snapshot
// of the most recently emitted metrics. This is due to the nondeterminism of
// scheduler updates with respect to test bodies, so the assertions made are
// from the most recently synced state of the system (picker/scheduler) from the
// test body.
func (s) TestWRRMetricsBasic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder(t)
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))

if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
// RR).
func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
Expand Down
163 changes: 163 additions & 0 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package weightedroundrobin

import (
"testing"
"time"

"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestWRR_Metrics_SubConnWeight tests different scenarios for the weight call
// on a weighted SubConn, and expects certain metrics for each of these
// scenarios.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
tests := []struct {
name string
weightExpirationPeriod time.Duration
blackoutPeriod time.Duration
lastUpdated time.Time
nonEmpty time.Time
nowTime time.Time
endpointWeightStaleWant float64
endpointWeightNotYetUsableWant float64
endpointWeightWant float64
}{
// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
{
name: "no weight set",
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "weight expiration",
lastUpdated: time.Now(),
weightExpirationPeriod: 2 * time.Second,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(100 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
{
name: "in blackout period",
lastUpdated: time.Now(),
weightExpirationPeriod: time.Minute,
blackoutPeriod: 10 * time.Second,
nowTime: time.Now(),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 1,
endpointWeightWant: 0,
},
{
name: "normal weight",
lastUpdated: time.Now(),
nonEmpty: time.Now(),
weightExpirationPeriod: time.Minute,
blackoutPeriod: time.Second,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 0,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 3,
},
{
name: "weight expiration takes precdedence over blackout",
lastUpdated: time.Now(),
nonEmpty: time.Now(),
weightExpirationPeriod: time.Second,
blackoutPeriod: time.Minute,
nowTime: time.Now().Add(10 * time.Second),
endpointWeightStaleWant: 1,
endpointWeightNotYetUsableWant: 0,
endpointWeightWant: 0,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
lastUpdated: test.lastUpdated,
nonEmptySince: test.nonEmpty,
}
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)

tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
})
}

}

// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric
// for scheduler updates. It tests the case with one SubConn, and two SubConns
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t)
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 0,
}

p := &picker{
cfg: &lbConfig{
BlackoutPeriod: iserviceconfig.Duration(10 * time.Second),
WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute),
},
subConns: []*weightedSubConn{wsc},
metricsRecorder: tmr,
}
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
tmr.ClearMetrics()

// With two SubConns, if neither of them have weights, it will also fallback
// to round robin.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
wsc2 := &weightedSubConn{
target: "target",
metricsRecorder: tmr,
weightVal: 0,
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
}
12 changes: 8 additions & 4 deletions balancer/weightedroundrobin/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type scheduler interface {
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
// will return an Earliest Deadline First (EDF) scheduler implementation that
// selects the subchannels according to their weights.
func (p *picker) newScheduler() scheduler {
scWeights := p.scWeights(true)
func (p *picker) newScheduler(recordMetrics bool) scheduler {
scWeights := p.scWeights(recordMetrics)
n := len(scWeights)
if n == 0 {
return nil
}
if n == 1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: 1, inc: p.inc}
}
sum := float64(0)
Expand All @@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler {
}

if numZero >= n-1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
unscaledMean := sum / float64(n-numZero)
Expand Down
4 changes: 2 additions & 2 deletions internal/stats/metrics_recorder_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (s) TestMetricsRecorderList(t *testing.T) {

// Create two stats.Handlers which also implement MetricsRecorder, configure
// one as a global dial option and one as a local dial option.
mr1 := stats.NewTestMetricsRecorder(t, []string{})
mr2 := stats.NewTestMetricsRecorder(t, []string{})
mr1 := stats.NewTestMetricsRecorder(t)
mr2 := stats.NewTestMetricsRecorder(t)

defer internal.ClearGlobalDialOptions()
internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1))
Expand Down
Loading
Loading