Skip to content

Commit ff620f6

Browse files
committed
a89 wrr changes adding backend service to per call metrics
a94 subchannel metrics with labels
1 parent e816736 commit ff620f6

File tree

14 files changed

+446
-31
lines changed

14 files changed

+446
-31
lines changed

balancer/pickfirst/metrics_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,31 @@ func (s) TestPickFirstMetrics(t *testing.T) {
102102
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
103103
}
104104

105+
//Checking for subchannel metrics as well
106+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
107+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
108+
}
109+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
110+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0)
111+
}
112+
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
113+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
114+
}
115+
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 {
116+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1)
117+
}
118+
105119
ss.Stop()
106120
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
107121
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 1 {
108122
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 1)
109123
}
124+
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 1 {
125+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 1)
126+
}
127+
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != -1 {
128+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, -1)
129+
}
110130
}
111131

112132
// TestPickFirstMetricsFailure tests the connection attempts failed metric. It

balancer/pickfirst/pickfirst_ext_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1946,6 +1946,16 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
19461946
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
19471947
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
19481948
}
1949+
1950+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 0 {
1951+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 0)
1952+
}
1953+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
1954+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
1955+
}
1956+
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
1957+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
1958+
}
19491959
}
19501960

19511961
// Test verifies that pickfirst attempts to connect to the second backend once
@@ -2006,6 +2016,16 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
20062016
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
20072017
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
20082018
}
2019+
2020+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
2021+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
2022+
}
2023+
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
2024+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
2025+
}
2026+
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 {
2027+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1)
2028+
}
20092029
}
20102030

20112031
// Test tests the pickfirst balancer by causing a SubConn to fail and then
@@ -2057,6 +2077,9 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
20572077
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
20582078
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
20592079
}
2080+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
2081+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
2082+
}
20602083
if holds[2].IsStarted() != false {
20612084
t.Fatalf("Server %d with address %q contacted unexpectedly", 2, addrs[2])
20622085
}
@@ -2080,6 +2103,13 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
20802103
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
20812104
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
20822105
}
2106+
2107+
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
2108+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
2109+
}
2110+
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
2111+
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
2112+
}
20832113
}
20842114

20852115
func (s) TestPickFirstLeaf_InterleavingIPV4Preferred(t *testing.T) {

balancer/weightedroundrobin/balancer.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var (
6262
Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
6363
Unit: "{update}",
6464
Labels: []string{"grpc.target"},
65-
OptionalLabels: []string{"grpc.lb.locality"},
65+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
6666
Default: false,
6767
})
6868

@@ -71,7 +71,7 @@ var (
7171
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
7272
Unit: "{endpoint}",
7373
Labels: []string{"grpc.target"},
74-
OptionalLabels: []string{"grpc.lb.locality"},
74+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
7575
Default: false,
7676
})
7777

@@ -80,15 +80,15 @@ var (
8080
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
8181
Unit: "{endpoint}",
8282
Labels: []string{"grpc.target"},
83-
OptionalLabels: []string{"grpc.lb.locality"},
83+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
8484
Default: false,
8585
})
8686
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
8787
Name: "grpc.lb.wrr.endpoint_weights",
8888
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
8989
Unit: "{endpoint}",
9090
Labels: []string{"grpc.target"},
91-
OptionalLabels: []string{"grpc.lb.locality"},
91+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
9292
Default: false,
9393
})
9494
)
@@ -173,6 +173,7 @@ func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) {
173173
metricsRecorder: b.metricsRecorder,
174174
target: b.target,
175175
locality: b.locality,
176+
cluster: b.clusterName,
176177
}
177178
for _, addr := range endpoint.Addresses {
178179
b.addressWeights.Set(addr, ew)
@@ -211,6 +212,7 @@ type wrrBalancer struct {
211212
mu sync.Mutex
212213
cfg *lbConfig // active config
213214
locality string
215+
clusterName string
214216
stopPicker *grpcsync.Event
215217
addressWeights *resolver.AddressMapV2[*endpointWeight]
216218
endpointToWeight *resolver.EndpointMap[*endpointWeight]
@@ -231,6 +233,11 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
231233
b.mu.Lock()
232234
b.cfg = cfg
233235
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
236+
if cluster, ok := resolver.GetBackendServiceFromState(ccs.ResolverState); !ok {
237+
b.logger.Infof("Backend service name not found in resolver state attributes.")
238+
} else {
239+
b.clusterName = cluster
240+
}
234241
b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
235242
b.mu.Unlock()
236243

@@ -288,6 +295,7 @@ func (b *wrrBalancer) UpdateState(state balancer.State) {
288295
metricsRecorder: b.metricsRecorder,
289296
locality: b.locality,
290297
target: b.target,
298+
clusterName: b.clusterName,
291299
}
292300

293301
b.stopPicker = grpcsync.NewEvent()
@@ -420,6 +428,7 @@ type picker struct {
420428
// The following fields are immutable.
421429
target string
422430
locality string
431+
clusterName string
423432
metricsRecorder estats.MetricsRecorder
424433
}
425434

@@ -499,6 +508,7 @@ type endpointWeight struct {
499508
target string
500509
metricsRecorder estats.MetricsRecorder
501510
locality string
511+
cluster string
502512

503513
// The following fields are only accessed on calls into the LB policy, and
504514
// do not need a mutex.
@@ -602,14 +612,14 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
602612

603613
if recordMetrics {
604614
defer func() {
605-
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
615+
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.cluster)
606616
}()
607617
}
608618

609619
// The endpoint has not received a load report (i.e. just turned READY with
610620
// no load report).
611621
if w.lastUpdated.Equal(time.Time{}) {
612-
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
622+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
613623
return 0
614624
}
615625

@@ -618,7 +628,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
618628
// start getting data again in the future, and return 0.
619629
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
620630
if recordMetrics {
621-
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
631+
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
622632
}
623633
w.nonEmptySince = time.Time{}
624634
return 0
@@ -627,7 +637,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
627637
// If we don't have at least blackoutPeriod worth of data, return 0.
628638
if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
629639
if recordMetrics {
630-
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
640+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
631641
}
632642
return 0
633643
}

balancer/weightedroundrobin/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
3939
}
4040
if n == 1 {
4141
if recordMetrics {
42-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
42+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
4343
}
4444
return &rrScheduler{numSCs: 1, inc: p.inc}
4545
}
@@ -58,7 +58,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
5858

5959
if numZero >= n-1 {
6060
if recordMetrics {
61-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
61+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
6262
}
6363
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
6464
}

clientconn.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
"google.golang.org/grpc/balancer/pickfirst"
3636
"google.golang.org/grpc/codes"
3737
"google.golang.org/grpc/connectivity"
38+
"google.golang.org/grpc/credentials"
39+
expstats "google.golang.org/grpc/experimental/stats"
3840
"google.golang.org/grpc/internal"
3941
"google.golang.org/grpc/internal/channelz"
4042
"google.golang.org/grpc/internal/grpcsync"
@@ -98,6 +100,41 @@ var (
98100
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
99101
)
100102

103+
var (
104+
disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
105+
Name: "grpc.subchannel.disconnections",
106+
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
107+
Unit: "{disconnection}",
108+
Labels: []string{"grpc.target"},
109+
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"},
110+
Default: false,
111+
})
112+
connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
113+
Name: "grpc.subchannel.connection_attempts_succeeded",
114+
Description: "EXPERIMENTAL. Number of successful connection attempts.",
115+
Unit: "{attempt}",
116+
Labels: []string{"grpc.target"},
117+
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
118+
Default: false,
119+
})
120+
connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
121+
Name: "grpc.subchannel.connection_attempts_failed",
122+
Description: "EXPERIMENTAL. Number of failed connection attempts.",
123+
Unit: "{attempt}",
124+
Labels: []string{"grpc.target"},
125+
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
126+
Default: false,
127+
})
128+
openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{
129+
Name: "grpc.subchannel.open_connections",
130+
Description: "EXPERIMENTAL. Number of open connections.",
131+
Unit: "{attempt}",
132+
Labels: []string{"grpc.target"},
133+
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"},
134+
Default: false,
135+
})
136+
)
137+
101138
const (
102139
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
103140
defaultClientMaxSendMessageSize = math.MaxInt32
@@ -1223,6 +1260,11 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
12231260
if ac.state == s {
12241261
return
12251262
}
1263+
locality, backendService := fetchLabels(ac)
1264+
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
1265+
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality, "unknown")
1266+
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, backendService, ac.securityLevel(), locality)
1267+
}
12261268
ac.state = s
12271269
ac.channelz.ChannelMetrics.State.Store(&s)
12281270
if lastErr == nil {
@@ -1276,10 +1318,13 @@ func (ac *addrConn) resetTransportAndUnlock() {
12761318
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
12771319
connectDeadline := time.Now().Add(dialDuration)
12781320

1321+
locality, backendService := fetchLabels(ac)
1322+
12791323
ac.updateConnectivityState(connectivity.Connecting, nil)
12801324
ac.mu.Unlock()
12811325

12821326
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1327+
connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality)
12831328
// TODO: #7534 - Move re-resolution requests into the pick_first LB policy
12841329
// to ensure one resolution request per pass instead of per subconn failure.
12851330
ac.cc.resolveNow(resolver.ResolveNowOptions{})
@@ -1319,10 +1364,43 @@ func (ac *addrConn) resetTransportAndUnlock() {
13191364
}
13201365
// Success; reset backoff.
13211366
ac.mu.Lock()
1367+
connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality)
1368+
openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, ac.securityLevel(), locality)
13221369
ac.backoffIdx = 0
13231370
ac.mu.Unlock()
13241371
}
13251372

1373+
func fetchLabels(ac *addrConn) (string, string) {
1374+
var locality, backendService string
1375+
labelsFromAddress, ok := internal.AddressToTelemetryLabels.(func(resolver.Address) map[string]string)
1376+
if len(ac.addrs) > 0 && internal.AddressToTelemetryLabels != nil && ok {
1377+
if ok {
1378+
labels := labelsFromAddress(ac.addrs[0])
1379+
locality = labels["grpc.lb.locality"]
1380+
backendService = labels["grpc.lb.backend_service"]
1381+
}
1382+
}
1383+
return locality, backendService
1384+
}
1385+
1386+
type securityLevelKey struct{}
1387+
1388+
func (ac *addrConn) securityLevel() string {
1389+
var secLevel string
1390+
if ac.transport == nil {
1391+
secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string)
1392+
return secLevel
1393+
}
1394+
authInfo := ac.transport.AuthInfo()
1395+
if ci, ok := authInfo.(interface {
1396+
GetCommonAuthInfo() credentials.CommonAuthInfo
1397+
}); ok {
1398+
secLevel = ci.GetCommonAuthInfo().SecurityLevel.String()
1399+
ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel)
1400+
}
1401+
return secLevel
1402+
}
1403+
13261404
// tryAllAddrs tries to create a connection to the addresses, and stop when at
13271405
// the first successful one. It returns an error if no address was successfully
13281406
// connected, or updates ac appropriately with the new transport.

internal/internal.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ var (
244244
// When set, the function will be called before the stream enters
245245
// the blocking state.
246246
NewStreamWaitingForResolver = func() {}
247+
248+
// AddressToTelemetryLabels is an xDS-provided function to extract telemetry
249+
// labels from a resolver.Address. Callers must assert its type before calling.
250+
AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string
247251
)
248252

249253
// HealthChecker defines the signature of the client-side LB channel health

0 commit comments

Comments
 (0)