Skip to content

Commit 4e672c2

Browse files
committed
Add pick first metrics
1 parent 0553bc3 commit 4e672c2

File tree

9 files changed

+370
-9
lines changed

9 files changed

+370
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package pickfirstleaf_test
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"testing"
25+
"time"
26+
27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/balancer/pickfirst"
29+
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
30+
"google.golang.org/grpc/credentials/insecure"
31+
"google.golang.org/grpc/internal"
32+
"google.golang.org/grpc/internal/envconfig"
33+
"google.golang.org/grpc/internal/stubserver"
34+
"google.golang.org/grpc/internal/testutils/stats"
35+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
36+
testpb "google.golang.org/grpc/interop/grpc_testing"
37+
"google.golang.org/grpc/resolver"
38+
"google.golang.org/grpc/resolver/manual"
39+
"google.golang.org/grpc/serviceconfig"
40+
"google.golang.org/grpc/stats/opentelemetry"
41+
42+
"go.opentelemetry.io/otel/attribute"
43+
"go.opentelemetry.io/otel/sdk/metric"
44+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
45+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
46+
)
47+
48+
var pfConfig string
49+
50+
func init() {
51+
name := pickfirst.Name
52+
if !envconfig.NewPickFirstEnabled {
53+
name = pickfirstleaf.Name
54+
}
55+
pfConfig = fmt.Sprintf(`{
56+
"loadBalancingConfig": [
57+
{
58+
%q: {
59+
}
60+
}
61+
]
62+
}`, name)
63+
}
64+
65+
// TestPickFirstMetrics tests pick first metrics. It configures a pick first
66+
// balancer, causes it to connect and then disconnect, and expects the
67+
// subsequent metrics to emit from that.
68+
func (s) TestPickFirstMetrics(t *testing.T) {
69+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
70+
defer cancel()
71+
72+
ss := &stubserver.StubServer{
73+
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
74+
return &testpb.Empty{}, nil
75+
},
76+
}
77+
ss.StartServer()
78+
defer ss.Stop()
79+
80+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
81+
82+
r := manual.NewBuilderWithScheme("whatever")
83+
r.InitialState(resolver.State{
84+
ServiceConfig: sc,
85+
Addresses: []resolver.Address{{Addr: ss.Address}}},
86+
)
87+
88+
grpcTarget := r.Scheme() + ":///"
89+
tmr := stats.NewTestMetricsRecorder()
90+
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
91+
if err != nil {
92+
t.Fatalf("NewClient() failed with error: %v", err)
93+
}
94+
defer cc.Close()
95+
96+
tsc := testgrpc.NewTestServiceClient(cc)
97+
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
98+
t.Fatalf("EmptyCall() failed: %v", err)
99+
}
100+
101+
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 {
102+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 1)
103+
}
104+
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 0 {
105+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 0)
106+
}
107+
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
108+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
109+
}
110+
111+
ss.Stop()
112+
if err = pollForDisconnectedMetrics(ctx, tmr); err != nil {
113+
t.Fatal(err)
114+
}
115+
}
116+
117+
func pollForDisconnectedMetrics(ctx context.Context, tmr *stats.TestMetricsRecorder) error {
118+
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
119+
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got == 1 {
120+
return nil
121+
}
122+
}
123+
return fmt.Errorf("error waiting for grpc.lb.pick_first.disconnections metric: %v", ctx.Err())
124+
}
125+
126+
// TestPickFirstMetricsFailure tests the connection attempts failed metric. It
127+
// configures a channel and scenario that causes a pick first connection attempt
128+
// to fail, and then expects that metric to emit.
129+
func (s) TestPickFirstMetricsFailure(t *testing.T) {
130+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
131+
defer cancel()
132+
133+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
134+
135+
r := manual.NewBuilderWithScheme("whatever")
136+
r.InitialState(resolver.State{
137+
ServiceConfig: sc,
138+
Addresses: []resolver.Address{{Addr: "bad address"}}},
139+
)
140+
grpcTarget := r.Scheme() + ":///"
141+
tmr := stats.NewTestMetricsRecorder()
142+
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
143+
if err != nil {
144+
t.Fatalf("NewClient() failed with error: %v", err)
145+
}
146+
defer cc.Close()
147+
148+
tsc := testgrpc.NewTestServiceClient(cc)
149+
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
150+
t.Fatalf("EmptyCall() passed when expected to fail")
151+
}
152+
153+
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 0 {
154+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_succeeded", got, 0)
155+
}
156+
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
157+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
158+
}
159+
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
160+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
161+
}
162+
}
163+
164+
// TestPickFirstMetricsE2E tests the pick first metrics end to end. It
165+
// configures a channel with an OpenTelemetry plugin, induces all 3 pick first
166+
// metrics to emit, and makes sure the correct OpenTelemetry metrics atoms emit.
167+
func (s) TestPickFirstMetricsE2E(t *testing.T) {
168+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
169+
defer cancel()
170+
171+
ss := &stubserver.StubServer{
172+
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
173+
return &testpb.Empty{}, nil
174+
},
175+
}
176+
ss.StartServer()
177+
defer ss.Stop()
178+
179+
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
180+
r := manual.NewBuilderWithScheme("whatever")
181+
r.InitialState(resolver.State{
182+
ServiceConfig: sc,
183+
Addresses: []resolver.Address{{Addr: "bad address"}}},
184+
) // Will trigger connection failed.
185+
186+
grpcTarget := r.Scheme() + ":///"
187+
reader := metric.NewManualReader()
188+
provider := metric.NewMeterProvider(metric.WithReader(reader))
189+
mo := opentelemetry.MetricsOptions{
190+
MeterProvider: provider,
191+
Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.pick_first.disconnections", "grpc.lb.pick_first.connection_attempts_succeeded", "grpc.lb.pick_first.connection_attempts_failed"),
192+
}
193+
194+
cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
195+
if err != nil {
196+
t.Fatalf("NewClient() failed with error: %v", err)
197+
}
198+
defer cc.Close()
199+
200+
tsc := testgrpc.NewTestServiceClient(cc)
201+
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}); err == nil {
202+
t.Fatalf("EmptyCall() passed when expected to fail")
203+
}
204+
205+
r.UpdateState(resolver.State{
206+
ServiceConfig: sc,
207+
Addresses: []resolver.Address{{Addr: ss.Address}}}) // Will trigger successful connection metric.
208+
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
209+
t.Fatalf("EmptyCall() failed: %v", err)
210+
}
211+
212+
// Stop the server, that should send signal to disconnect, which will
213+
// eventually emit disconnection metric.
214+
ss.Stop()
215+
wantMetrics := []metricdata.Metrics{
216+
{
217+
Name: "grpc.lb.pick_first.connection_attempts_succeeded",
218+
Description: "EXPERIMENTAL. Number of successful connection attempts.",
219+
Unit: "attempt",
220+
Data: metricdata.Sum[int64]{
221+
DataPoints: []metricdata.DataPoint[int64]{
222+
{
223+
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
224+
Value: 1,
225+
},
226+
},
227+
Temporality: metricdata.CumulativeTemporality,
228+
IsMonotonic: true,
229+
},
230+
},
231+
{
232+
Name: "grpc.lb.pick_first.connection_attempts_failed",
233+
Description: "EXPERIMENTAL. Number of failed connection attempts.",
234+
Unit: "attempt",
235+
Data: metricdata.Sum[int64]{
236+
DataPoints: []metricdata.DataPoint[int64]{
237+
{
238+
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
239+
Value: 1,
240+
},
241+
},
242+
Temporality: metricdata.CumulativeTemporality,
243+
IsMonotonic: true,
244+
},
245+
},
246+
}
247+
248+
gotMetrics := metricsDataFromReader(ctx, reader)
249+
for _, metric := range wantMetrics {
250+
val, ok := gotMetrics[metric.Name]
251+
if !ok {
252+
t.Fatalf("Metric %v not present in recorded metrics", metric.Name)
253+
}
254+
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
255+
t.Fatalf("Metrics data type not equal for metric: %v", metric.Name)
256+
}
257+
}
258+
// Disconnections metric will show up eventually, as asynchronous from
259+
// server stopping.
260+
wantMetrics = []metricdata.Metrics{
261+
{
262+
Name: "grpc.lb.pick_first.disconnections",
263+
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
264+
Unit: "disconnection",
265+
Data: metricdata.Sum[int64]{
266+
DataPoints: []metricdata.DataPoint[int64]{
267+
{
268+
Attributes: attribute.NewSet(attribute.String("grpc.target", grpcTarget)),
269+
Value: 1,
270+
},
271+
},
272+
Temporality: metricdata.CumulativeTemporality,
273+
IsMonotonic: true,
274+
},
275+
},
276+
}
277+
if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil {
278+
t.Fatal(err)
279+
}
280+
}
281+
282+
func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics {
283+
rm := &metricdata.ResourceMetrics{}
284+
reader.Collect(ctx, rm)
285+
gotMetrics := map[string]metricdata.Metrics{}
286+
for _, sm := range rm.ScopeMetrics {
287+
for _, m := range sm.Metrics {
288+
gotMetrics[m.Name] = m
289+
}
290+
}
291+
return gotMetrics
292+
}
293+
294+
func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error {
295+
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
296+
gotMetrics := metricsDataFromReader(ctx, reader)
297+
for _, metric := range wantMetrics {
298+
val, ok := gotMetrics[metric.Name]
299+
if !ok {
300+
break
301+
}
302+
if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) {
303+
return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name)
304+
}
305+
return nil
306+
}
307+
time.Sleep(5 * time.Millisecond)
308+
}
309+
310+
return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err())
311+
}

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

+38-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"google.golang.org/grpc/balancer"
3737
"google.golang.org/grpc/balancer/pickfirst/internal"
3838
"google.golang.org/grpc/connectivity"
39+
estats "google.golang.org/grpc/experimental/stats"
3940
"google.golang.org/grpc/grpclog"
4041
"google.golang.org/grpc/internal/envconfig"
4142
internalgrpclog "google.golang.org/grpc/internal/grpclog"
@@ -57,7 +58,28 @@ var (
5758
// Name is the name of the pick_first_leaf balancer.
5859
// It is changed to "pick_first" in init() if this balancer is to be
5960
// registered as the default pickfirst.
60-
Name = "pick_first_leaf"
61+
Name = "pick_first_leaf"
62+
pickFirstDisconnectionsMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
63+
Name: "grpc.lb.pick_first.disconnections",
64+
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
65+
Unit: "disconnection",
66+
Labels: []string{"grpc.target"},
67+
Default: false,
68+
})
69+
pickFirstConnectionAttemptsSucceeded = estats.RegisterInt64Count(estats.MetricDescriptor{
70+
Name: "grpc.lb.pick_first.connection_attempts_succeeded",
71+
Description: "EXPERIMENTAL. Number of successful connection attempts.",
72+
Unit: "attempt",
73+
Labels: []string{"grpc.target"},
74+
Default: false,
75+
})
76+
pickFirstConnectionAttemptsFailed = estats.RegisterInt64Count(estats.MetricDescriptor{
77+
Name: "grpc.lb.pick_first.connection_attempts_failed",
78+
Description: "EXPERIMENTAL. Number of failed connection attempts.",
79+
Unit: "attempt",
80+
Labels: []string{"grpc.target"},
81+
Default: false,
82+
})
6183
)
6284

6385
const (
@@ -80,9 +102,12 @@ const (
80102

81103
type pickfirstBuilder struct{}
82104

83-
func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
105+
func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
84106
b := &pickfirstBalancer{
85-
cc: cc,
107+
cc: cc,
108+
target: bo.Target.String(),
109+
metricsRecorder: bo.MetricsRecorder,
110+
86111
addressList: addressList{},
87112
subConns: resolver.NewAddressMap(),
88113
state: connectivity.Connecting,
@@ -147,8 +172,10 @@ func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
147172
type pickfirstBalancer struct {
148173
// The following fields are initialized at build time and read-only after
149174
// that and therefore do not need to be guarded by a mutex.
150-
logger *internalgrpclog.PrefixLogger
151-
cc balancer.ClientConn
175+
logger *internalgrpclog.PrefixLogger
176+
cc balancer.ClientConn
177+
target string
178+
metricsRecorder estats.MetricsRecorder
152179

153180
// The mutex is used to ensure synchronization of updates triggered
154181
// from the idle picker and the already serialized resolver,
@@ -548,7 +575,12 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
548575
return
549576
}
550577

578+
if oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.TransientFailure {
579+
pickFirstConnectionAttemptsFailed.Record(b.metricsRecorder, 1, b.target)
580+
}
581+
551582
if newState.ConnectivityState == connectivity.Ready {
583+
pickFirstConnectionAttemptsSucceeded.Record(b.metricsRecorder, 1, b.target)
552584
b.shutdownRemainingLocked(sd)
553585
if !b.addressList.seekTo(sd.addr) {
554586
// This should not fail as we should have only one SubConn after
@@ -575,6 +607,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
575607
// the first address when the picker is used.
576608
b.shutdownRemainingLocked(sd)
577609
b.state = connectivity.Idle
610+
pickFirstDisconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
578611
b.addressList.reset()
579612
b.cc.UpdateState(balancer.State{
580613
ConnectivityState: connectivity.Idle,

0 commit comments

Comments
 (0)